#
tokens: 47391/50000 25/111 files (page 2/4)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 4. Use http://codebase.md/crowdstrike/falcon-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.dev.example
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug.yaml
│   │   ├── config.yml
│   │   ├── feature-request.yaml
│   │   └── question.yaml
│   └── workflows
│       ├── docker-build-push.yml
│       ├── docker-build-test.yml
│       ├── markdown-lint.yml
│       ├── python-lint.yml
│       ├── python-test-e2e.yml
│       ├── python-test.yml
│       └── release.yml
├── .gitignore
├── .markdownlint.json
├── CHANGELOG.md
├── Dockerfile
├── docs
│   ├── CODE_OF_CONDUCT.md
│   ├── CONTRIBUTING.md
│   ├── deployment
│   │   ├── amazon_bedrock_agentcore.md
│   │   └── google_cloud.md
│   ├── e2e_testing.md
│   ├── module_development.md
│   ├── resource_development.md
│   └── SECURITY.md
├── examples
│   ├── adk
│   │   ├── adk_agent_operations.sh
│   │   ├── falcon_agent
│   │   │   ├── __init__.py
│   │   │   ├── agent.py
│   │   │   ├── env.properties
│   │   │   └── requirements.txt
│   │   └── README.md
│   ├── basic_usage.py
│   ├── mcp_config.json
│   ├── sse_usage.py
│   └── streamable_http_usage.py
├── falcon_mcp
│   ├── __init__.py
│   ├── client.py
│   ├── common
│   │   ├── __init__.py
│   │   ├── api_scopes.py
│   │   ├── errors.py
│   │   ├── logging.py
│   │   └── utils.py
│   ├── modules
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── cloud.py
│   │   ├── detections.py
│   │   ├── discover.py
│   │   ├── hosts.py
│   │   ├── idp.py
│   │   ├── incidents.py
│   │   ├── intel.py
│   │   ├── sensor_usage.py
│   │   ├── serverless.py
│   │   └── spotlight.py
│   ├── registry.py
│   ├── resources
│   │   ├── __init__.py
│   │   ├── cloud.py
│   │   ├── detections.py
│   │   ├── discover.py
│   │   ├── hosts.py
│   │   ├── incidents.py
│   │   ├── intel.py
│   │   ├── sensor_usage.py
│   │   ├── serverless.py
│   │   └── spotlight.py
│   └── server.py
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── generate_e2e_report.py
│   └── test_results_viewer.html
├── SUPPORT.md
├── tests
│   ├── __init__.py
│   ├── common
│   │   ├── __init__.py
│   │   ├── test_api_scopes.py
│   │   ├── test_errors.py
│   │   ├── test_logging.py
│   │   └── test_utils.py
│   ├── conftest.py
│   ├── e2e
│   │   ├── __init__.py
│   │   ├── modules
│   │   │   ├── __init__.py
│   │   │   ├── test_cloud.py
│   │   │   ├── test_detections.py
│   │   │   ├── test_discover.py
│   │   │   ├── test_hosts.py
│   │   │   ├── test_idp.py
│   │   │   ├── test_incidents.py
│   │   │   ├── test_intel.py
│   │   │   ├── test_sensor_usage.py
│   │   │   ├── test_serverless.py
│   │   │   └── test_spotlight.py
│   │   └── utils
│   │       ├── __init__.py
│   │       └── base_e2e_test.py
│   ├── modules
│   │   ├── __init__.py
│   │   ├── test_base.py
│   │   ├── test_cloud.py
│   │   ├── test_detections.py
│   │   ├── test_discover.py
│   │   ├── test_hosts.py
│   │   ├── test_idp.py
│   │   ├── test_incidents.py
│   │   ├── test_intel.py
│   │   ├── test_sensor_usage.py
│   │   ├── test_serverless.py
│   │   ├── test_spotlight.py
│   │   └── utils
│   │       └── test_modules.py
│   ├── test_client.py
│   ├── test_registry.py
│   ├── test_server.py
│   └── test_streamable_http_transport.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/falcon_mcp/modules/hosts.py:
--------------------------------------------------------------------------------

```python
"""
Hosts module for Falcon MCP Server

This module provides tools for accessing and managing CrowdStrike Falcon hosts/devices.
"""

from textwrap import dedent
from typing import Any, Dict, List

from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field

from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.hosts import SEARCH_HOSTS_FQL_DOCUMENTATION

logger = get_logger(__name__)


class HostsModule(BaseModule):
    """Module for accessing and managing CrowdStrike Falcon hosts/devices."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.

        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.search_hosts,
            name="search_hosts",
        )

        self._add_tool(
            server=server,
            method=self.get_host_details,
            name="get_host_details",
        )

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """
        search_hosts_fql_resource = TextResource(
            uri=AnyUrl("falcon://hosts/search/fql-guide"),
            name="falcon_search_hosts_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_hosts` tool.",
            text=SEARCH_HOSTS_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            search_hosts_fql_resource,
        )

    def search_hosts(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://hosts/search/fql-guide` resource when building this filter parameter.",
            examples={"platform_name:'Windows'", "hostname:'PC*'"},
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=5000,
            description="The maximum records to return. [1-5000]",
        ),
        offset: int | None = Field(
            default=None,
            description="The offset to start retrieving records from.",
        ),
        sort: str | None = Field(
            default=None,
            description=dedent("""
                Sort hosts using these options:

                hostname: Host name/computer name
                last_seen: Timestamp when the host was last seen
                first_seen: Timestamp when the host was first seen
                modified_timestamp: When the host record was last modified
                platform_name: Operating system platform
                agent_version: CrowdStrike agent version
                os_version: Operating system version
                external_ip: External IP address

                Sort either asc (ascending) or desc (descending).
                Both formats are supported: 'hostname.desc' or 'hostname|desc'

                Examples: 'hostname.asc', 'last_seen.desc', 'platform_name.asc'
            """).strip(),
            examples={"hostname.asc", "last_seen.desc"},
        ),
    ) -> List[Dict[str, Any]]:
        """Search for hosts in your CrowdStrike environment.

        IMPORTANT: You must use the `falcon://hosts/search/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_hosts` tool.
        """
        # Prepare parameters for QueryDevicesByFilter
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
            }
        )

        # Define the operation name
        operation = "QueryDevicesByFilter"

        logger.debug("Searching hosts with params: %s", params)

        # Make the API request to get device IDs
        response = self.client.command(operation, parameters=params)

        # Use handle_api_response to get device IDs
        device_ids = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to search hosts",
            default_result=[],
        )

        # If handle_api_response returns an error dict instead of a list,
        # it means there was an error, so we return it wrapped in a list
        if self._is_error(device_ids):
            return [device_ids]

        # If we have device IDs, get the details for each one
        if device_ids:
            # Use the base method to get device details
            details = self._base_get_by_ids(
                operation="PostDeviceDetailsV2",
                ids=device_ids,
                id_key="ids",
            )

            # If handle_api_response returns an error dict instead of a list,
            # it means there was an error, so we return it wrapped in a list
            if self._is_error(details):
                return [details]

            return details

        return []

    def get_host_details(
        self,
        ids: List[str] = Field(
            description="Host device IDs to retrieve details for. You can get device IDs from the search_hosts operation, the Falcon console, or the Streaming API. Maximum: 5000 IDs per request."
        ),
    ) -> List[Dict[str, Any]] | Dict[str, Any]:
        """Retrieve detailed information for specified host device IDs.

        This tool returns comprehensive host details for one or more device IDs.
        Use this when you already have specific device IDs and need their full details.
        For searching/discovering hosts, use the `falcon_search_hosts` tool instead.
        """
        logger.debug("Getting host details for IDs: %s", ids)

        # Handle empty list case - return empty list without making API call
        if not ids:
            return []

        # Use the base method to get device details
        return self._base_get_by_ids(
            operation="PostDeviceDetailsV2",
            ids=ids,
            id_key="ids",
        )

```

--------------------------------------------------------------------------------
/falcon_mcp/client.py:
--------------------------------------------------------------------------------

```python
"""
Falcon API Client for MCP Server

This module provides the Falcon API client and authentication utilities for the Falcon MCP server.
"""

import os
import platform
import sys
from importlib.metadata import PackageNotFoundError, version
from typing import Any, Dict, Optional

# Import the APIHarnessV2 from FalconPy
from falconpy import APIHarnessV2

from falcon_mcp.common.logging import get_logger

logger = get_logger(__name__)


class FalconClient:
    """Client for interacting with the CrowdStrike Falcon API."""

    def __init__(
        self,
        base_url: Optional[str] = None,
        debug: bool = False,
        user_agent_comment: Optional[str] = None,
    ):
        """Initialize the Falcon client.

        Args:
            base_url: Falcon API base URL (defaults to FALCON_BASE_URL env var)
            debug: Enable debug logging
            user_agent_comment: Additional information to include in the User-Agent comment section
        """
        # Get credentials from environment variables
        self.client_id = os.environ.get("FALCON_CLIENT_ID")
        self.client_secret = os.environ.get("FALCON_CLIENT_SECRET")
        self.base_url = base_url or os.environ.get(
            "FALCON_BASE_URL", "https://api.crowdstrike.com"
        )
        self.debug = debug
        self.user_agent_comment = user_agent_comment or os.environ.get(
            "FALCON_MCP_USER_AGENT_COMMENT"
        )

        if not self.client_id or not self.client_secret:
            raise ValueError(
                "Falcon API credentials not provided. Set FALCON_CLIENT_ID and "
                "FALCON_CLIENT_SECRET environment variables."
            )

        # Initialize the Falcon API client using APIHarnessV2
        self.client = APIHarnessV2(
            client_id=self.client_id,
            client_secret=self.client_secret,
            base_url=self.base_url,
            debug=debug,
            user_agent=self.get_user_agent(),
        )

        logger.debug("Initialized Falcon client with base URL: %s", self.base_url)

    def authenticate(self) -> bool:
        """Authenticate with the Falcon API.

        Returns:
            bool: True if authentication was successful
        """
        return self.client.login()

    def is_authenticated(self) -> bool:
        """Check if the client is authenticated.

        Returns:
            bool: True if the client is authenticated
        """
        return self.client.token_valid

    def command(self, operation: str, **kwargs) -> Dict[str, Any]:
        """Execute a Falcon API command.

        Args:
            operation: The API operation to execute
            **kwargs: Additional arguments to pass to the API

        Returns:
            Dict[str, Any]: The API response
        """
        return self.client.command(operation, **kwargs)

    def get_user_agent(self) -> str:
        """Get RFC-compliant user agent string for API requests.

        Returns:
            str: User agent string in RFC format "falcon-mcp/VERSION (comment; falconpy/VERSION; Python/VERSION; Platform/VERSION)"
        """
        # Get falcon-mcp version
        falcon_mcp_version = get_version()

        # Get Python version
        python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"

        # Get platform information
        platform_info = f"{platform.system()}/{platform.release()}"

        # Get FalconPy version
        try:
            falconpy_version = version("crowdstrike-falconpy")
        except PackageNotFoundError:
            falconpy_version = "unknown"
            logger.debug("crowdstrike-falconpy package version not found")

        # Build comment section components (RFC-compliant format)
        comment_parts = []
        if self.user_agent_comment:
            comment_parts.append(self.user_agent_comment.strip())
        comment_parts.extend(
            [f"falconpy/{falconpy_version}", f"Python/{python_version}", platform_info]
        )

        return f"falcon-mcp/{falcon_mcp_version} ({'; '.join(comment_parts)})"

    def get_headers(self) -> Dict[str, str]:
        """Get authentication headers for API requests.

        This method returns the authentication headers from the underlying Falcon API client,
        which can be used for custom HTTP requests or advanced integration scenarios.

        Returns:
            Dict[str, str]: Authentication headers including the bearer token
        """
        return self.client.auth_headers


def get_version() -> str:
    """Get falcon-mcp version with multiple fallback methods.

    This function tries multiple methods to determine the version:
    1. importlib.metadata (works when package is properly installed)
    2. pyproject.toml (works in development/Docker environments)
    3. Hardcoded fallback

    Returns:
        str: The version string
    """
    # Try importlib.metadata first (works when properly installed)
    try:
        return version("falcon-mcp")
    except PackageNotFoundError:
        logger.debug(
            "falcon-mcp package not found via importlib.metadata, trying pyproject.toml"
        )

    # Try reading from pyproject.toml (works in development/Docker)
    try:
        import pathlib
        import tomllib  # Python 3.11+

        # Look for pyproject.toml in current directory and parent directories
        current_path = pathlib.Path(__file__).parent
        for _ in range(3):  # Check up to 3 levels up
            pyproject_path = current_path / "pyproject.toml"
            if pyproject_path.exists():
                with open(pyproject_path, "rb") as f:
                    data = tomllib.load(f)
                    version_str = data["project"]["version"]
                    logger.debug(
                        "Found version %s in pyproject.toml at %s",
                        version_str,
                        pyproject_path,
                    )
                    return version_str
            current_path = current_path.parent

        logger.debug("pyproject.toml not found in current or parent directories")
    except (KeyError, ImportError, OSError, TypeError) as e:
        logger.debug("Failed to read version from pyproject.toml: %s", e)

    # Final fallback
    fallback_version = "0.1.0"
    logger.debug("Using fallback version: %s", fallback_version)
    return fallback_version

```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_spotlight.py:
--------------------------------------------------------------------------------

```python
"""
E2E tests for the Spotlight module.
"""

import json
import unittest

import pytest

from tests.e2e.utils.base_e2e_test import BaseE2ETest


@pytest.mark.e2e
class TestSpotlightModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Spotlight Module.
    """

    def test_search_high_severity_vulnerabilities(self):
        """Verify the agent can search for high severity vulnerabilities."""

        async def test_logic():
            fixtures = [
                {
                    "operation": "combinedQueryVulnerabilities",
                    "validator": lambda kwargs: "high"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "vuln-001",
                                    "cve": {
                                        "id": "CVE-2024-1234",
                                        "base_score": 8.5,
                                        "severity": "HIGH",
                                        "exprt_rating": "HIGH",
                                        "exploit_status": 60,
                                        "is_cisa_kev": True,
                                        "description": "Critical buffer overflow vulnerability in network service",
                                    },
                                    "status": "open",
                                    "created_timestamp": "2024-01-15T10:30:00Z",
                                    "updated_timestamp": "2024-01-20T14:15:00Z",
                                    "host_info": {
                                        "hostname": "web-server-01",
                                        "platform_name": "Linux",
                                        "asset_criticality": "Critical",
                                        "internet_exposure": "Yes",
                                        "managed_by": "Falcon sensor",
                                    },
                                    "apps": {
                                        "application_name": "Apache HTTP Server",
                                        "application_version": "2.4.41",
                                    },
                                },
                                {
                                    "id": "vuln-002",
                                    "cve": {
                                        "id": "CVE-2024-5678",
                                        "base_score": 7.8,
                                        "severity": "HIGH",
                                        "exprt_rating": "MEDIUM",
                                        "exploit_status": 30,
                                        "is_cisa_kev": False,
                                        "description": "Privilege escalation vulnerability in system service",
                                    },
                                    "status": "open",
                                    "created_timestamp": "2024-01-18T08:45:00Z",
                                    "updated_timestamp": "2024-01-19T16:20:00Z",
                                    "host_info": {
                                        "hostname": "db-server-02",
                                        "platform_name": "Windows",
                                        "asset_criticality": "High",
                                        "internet_exposure": "No",
                                        "managed_by": "Falcon sensor",
                                    },
                                    "apps": {
                                        "application_name": "Microsoft SQL Server",
                                        "application_version": "2019",
                                    },
                                },
                            ]
                        },
                    },
                }
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            prompt = "Find all high severity vulnerabilities in our environment and show me their CVE details and affected hosts"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_search_vulnerabilities"
            )

            # Check for high severity filtering
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "high" in tool_input_str,
                f"Expected high severity filtering in tool input: {tool_input_str}",
            )

            # Verify both vulnerabilities are in the output
            self.assertIn("CVE-2024-1234", used_tool["output"])
            self.assertIn("CVE-2024-5678", used_tool["output"])
            self.assertIn("web-server-01", used_tool["output"])
            self.assertIn("db-server-02", used_tool["output"])

            # Verify API call was made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )

            # Check API call (combinedQueryVulnerabilities)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            self.assertTrue(
                "high" in filter_str,
                f"Expected high severity filtering in API call: {filter_str}",
            )

            # Verify result contains expected information
            self.assertIn("CVE-2024-1234", result)
            self.assertIn("CVE-2024-5678", result)
            self.assertIn("web-server-01", result)
            self.assertIn("db-server-02", result)
            self.assertIn("8.5", result)  # Should contain CVSS scores
            self.assertIn("7.8", result)

        self.run_test_with_retries(
            "test_search_high_severity_vulnerabilities", test_logic, assertions
        )


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/examples/adk/falcon_agent/agent.py:
--------------------------------------------------------------------------------

```python
import logging
import os
import sys
from typing import List, Optional, TextIO, Union

from google.adk.agents import LlmAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.models import LlmRequest, LlmResponse
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.base_toolset import ToolPredicate
from google.adk.tools.mcp_tool import MCPTool
from google.adk.tools.mcp_tool.mcp_session_manager import (
  SseConnectionParams,
  StdioConnectionParams,
  StreamableHTTPConnectionParams,
  retry_on_closed_resource,
)
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from mcp import StdioServerParameters
from mcp.types import ListToolsResult

tools_cache={}

def make_tools_compatible(tools):
  """
  This function makes the schema compatible with Gemini/Vertex AI API
  It is only needed when API used is Gemini and model is other than 2.5 models
  It is however needed for ALL models when API used is VertexAI
  """
  for tool in tools:
    for key in tool._mcp_tool.inputSchema.keys():
      if key == "properties":
          for prop_name in tool._mcp_tool.inputSchema["properties"].keys():
            if "anyOf" in tool._mcp_tool.inputSchema["properties"][prop_name].keys():
              if (tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["type"] == "array"):
                tool._mcp_tool.inputSchema["properties"][prop_name]["type"] = tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["items"]["type"]
              else:
                 tool._mcp_tool.inputSchema["properties"][prop_name]["type"] = tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["type"]
              tool._mcp_tool.inputSchema["properties"][prop_name].pop("anyOf")

  return tools


class MCPToolSetWithSchemaAccess(MCPToolset):
  """
    Added to make the MCP tools schema compatible with Vertext AI API and also older Gemini models.
    Also introduced a small performance improvement with tools caching.
  """

  def __init__(
      self,
      *,
      tool_set_name: str, # <-- new parameter
      connection_params: Union[
          StdioServerParameters,
          StdioConnectionParams,
          SseConnectionParams,
          StreamableHTTPConnectionParams,
      ],
      tool_filter: Optional[Union[ToolPredicate, List[str]]] = None,
      errlog: TextIO = sys.stderr,
  ):
    super().__init__(
        connection_params=connection_params,
        tool_filter=tool_filter,
        errlog=errlog
    )
    self.tool_set_name = tool_set_name
    logging.info(f"MCPToolSetWithSchemaAccess initialized with tool_set_name: '{self.tool_set_name}'")
    self._session = None

  @retry_on_closed_resource
  async def get_tools(
      self,
      readonly_context: Optional[ReadonlyContext] = None,
  ) -> List[BaseTool]:
    """Return all tools in the toolset based on the provided context.

    Args:
        readonly_context: Context used to filter tools available to the agent.
            If None, all tools in the toolset are returned.

    Returns:
        List[BaseTool]: A list of tools available under the specified context.
    """
    # Get session from session manager
    session = await self._mcp_session_manager.create_session()

    if self.tool_set_name in tools_cache.keys():
      logging.info(f"Tools found in cache for toolset {self.tool_set_name}, returning them")
      return tools_cache[self.tool_set_name]
    else:
      logging.info(f"No tools found in cache for toolset {self.tool_set_name}, loading")

    # Fetch available tools from the MCP server
    tools_response: ListToolsResult = await session.list_tools()

    # Apply filtering based on context and tool_filter
    tools = []
    for tool in tools_response.tools:
      mcp_tool = MCPTool(
          mcp_tool=tool,
          mcp_session_manager=self._mcp_session_manager,
          auth_scheme=self._auth_scheme,
          auth_credential=self._auth_credential,
      )

      if self._is_tool_selected(mcp_tool, readonly_context):
        tools.append(mcp_tool)

    model_version = os.environ.get("GOOGLE_MODEL").split("-")[1]
    if float(model_version) < 2.5 or os.environ.get("GOOGLE_GENAI_USE_VERTEXAI").upper() == "TRUE":
      logging.error(f"Model - {os.environ.get('GOOGLE_MODEL')} needs Gemini compatible tools, updating schema ...")
      tools = make_tools_compatible(tools)
    else:
      logging.info(f"Model - {os.environ.get('GOOGLE_MODEL')} does not need updating schema")

    tools_cache[self.tool_set_name] = tools

    return tools

# Controlling context size to improve Model response time and for cost optimization
# https://github.com/google/adk-python/issues/752#issuecomment-2948152979
def bmc_trim_llm_request(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:

    max_prev_user_interactions = int(os.environ.get("MAX_PREV_USER_INTERACTIONS","-1"))

    logging.info(f"Number of contents going to LLM - {len(llm_request.contents)}, MAX_PREV_USER_INTERACTIONS = {max_prev_user_interactions}")

    temp_processed_list = []

    if max_prev_user_interactions == -1:
        return None
    else:
        user_message_count = 0
        for i in range(len(llm_request.contents) - 1, -1, -1):
            item = llm_request.contents[i]

            if item.role == "user" and item.parts[0] and item.parts[0].text and item.parts[0].text != "For context:":
                logging.info(f"Encountered a user message => {item.parts[0].text}")
                user_message_count += 1

            if user_message_count > max_prev_user_interactions:
                logging.info(f"Breaking at user_message_count => {user_message_count}")
                temp_processed_list.append(item)
                break

            temp_processed_list.append(item)

        final_list = temp_processed_list[::-1]

        if user_message_count < max_prev_user_interactions:
            logging.info("User message count did not reach the allowed limit. List remains unchanged.")
        else:
            logging.info(f"User message count reached {max_prev_user_interactions}. List truncated.")
            llm_request.contents = final_list

    return None


root_agent = LlmAgent(
    model=os.environ.get("GOOGLE_MODEL"),
    name='falcon_agent',
    instruction=os.environ.get("FALCON_AGENT_PROMPT"),
    tools=[
        MCPToolSetWithSchemaAccess(
          tool_set_name="falcon-tools",
            connection_params=StdioConnectionParams(
                    server_params=StdioServerParameters(
                      command='falcon-mcp',
                      env={
                      "FALCON_CLIENT_ID":os.environ.get("FALCON_CLIENT_ID"),
                      "FALCON_CLIENT_SECRET":os.environ.get("FALCON_CLIENT_SECRET"),
                      "FALCON_BASE_URL":os.environ.get("FALCON_BASE_URL"),
                      }
                    )
                    )
            ),
    ],
)

```

--------------------------------------------------------------------------------
/falcon_mcp/resources/serverless.py:
--------------------------------------------------------------------------------

```python
"""
Contains Serverless Vulnerabilities resources.
"""

from falcon_mcp.common.utils import generate_md_table

# List of tuples containing filter options data: (name, type, operators, description)
SERVERLESS_VULNERABILITIES_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "application_name",
        "String",
        "Yes",
        """
        Name of the application associated with the serverless function.

        Ex: application_name:'my-lambda-app'
        """
    ),
    (
        "application_name_version",
        "String",
        "Yes",
        """
        Version of the application associated with the serverless function.

        Ex: application_name_version:'1.0.0'
        """
    ),
    (
        "cid",
        "String",
        "No",
        """
        Unique system-generated customer identifier (CID) of the account.

        Ex: cid:'0123456789ABCDEFGHIJKLMNOPQRSTUV'
        """
    ),
    (
        "cloud_account_id",
        "String",
        "Yes",
        """
        Unique identifier of the cloud account where the serverless function is deployed.

        Ex: cloud_account_id:'123456789012'
        """
    ),
    (
        "cloud_account_name",
        "String",
        "Yes",
        """
        Name of the cloud account where the serverless function is deployed.

        Ex: cloud_account_name:'production-account'
        """
    ),
    (
        "cloud_provider",
        "String",
        "Yes",
        """
        Name of the cloud service provider hosting the serverless function.
        Values: aws, azure, gcp

        Ex: cloud_provider:'aws'
        """
    ),
    (
        "cve_id",
        "String",
        "Yes",
        """
        Unique identifier for a vulnerability as cataloged in the National Vulnerability Database (NVD).
        Supports multiple values and negation.

        Ex: cve_id:['CVE-2022-1234']
        Ex: cve_id:['CVE-2022-1234','CVE-2023-5678']
        """
    ),
    (
        "cvss_base_score",
        "Number",
        "Yes",
        """
        Common Vulnerability Scoring System (CVSS) base score of the vulnerability.

        Ex: cvss_base_score:>7.0
        Ex: cvss_base_score:<5.0
        """
    ),
    (
        "exprt_rating",
        "String",
        "Yes",
        """
        ExPRT rating assigned by CrowdStrike's predictive AI rating system.
        Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL

        Ex: exprt_rating:'HIGH'
        Ex: exprt_rating:['HIGH','CRITICAL']
        """
    ),
    (
        "first_seen_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time when this vulnerability was first detected in the serverless function.

        Ex: first_seen_timestamp:>'2023-01-01'
        Ex: first_seen_timestamp:<'2023-12-31'
        """
    ),
    (
        "function_name",
        "String",
        "Yes",
        """
        Name of the serverless function where the vulnerability was detected.

        Ex: function_name:'process-payment'
        """
    ),
    (
        "function_resource_id",
        "String",
        "Yes",
        """
        Unique resource identifier of the serverless function.

        Ex: function_resource_id:'arn:aws:lambda:us-east-1:123456789012:function:my-function'
        """
    ),
    (
        "is_supported",
        "Boolean",
        "No",
        """
        Indicates if the serverless function is supported for vulnerability scanning.

        Ex: is_supported:true
        """
    ),
    (
        "is_valid_asset_id",
        "Boolean",
        "No",
        """
        Indicates if the asset ID associated with the serverless function is valid.

        Ex: is_valid_asset_id:true
        """
    ),
    (
        "layer",
        "String",
        "Yes",
        """
        Layer in the serverless function where the vulnerability was detected.

        Ex: layer:'runtime'
        Ex: layer:'dependency'
        """
    ),
    (
        "region",
        "String",
        "Yes",
        """
        Cloud region where the serverless function is deployed.

        Ex: region:'us-east-1'
        Ex: region:['us-east-1','us-west-2']
        """
    ),
    (
        "runtime",
        "String",
        "Yes",
        """
        Runtime environment of the serverless function.
        Values: nodejs, python, java, ruby, go, dotnet

        Ex: runtime:'nodejs'
        Ex: runtime:['python','nodejs']
        """
    ),
    (
        "severity",
        "String",
        "Yes",
        """
        Severity level of the vulnerability.
        Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL

        Ex: severity:'HIGH'
        Ex: severity:['HIGH','CRITICAL']
        """
    ),
    (
        "timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time when the vulnerability was last updated.

        Ex: timestamp:>'2023-06-01'
        Ex: timestamp:<'2023-12-31'
        """
    ),
    (
        "type",
        "String",
        "Yes",
        """
        Type of the vulnerability.
        Values: Vulnerability, Misconfiguration, Unsupported software

        Ex: type:'Vulnerability'
        Ex: type:!'Misconfiguration'
        """
    ),
]

SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Serverless Vulnerabilities Guide

=== BASIC SYNTAX ===
property_name:[operator]'value'

=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match

=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)

=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions

=== falcon_search_serverless_vulnerabilities FQL filter options ===

""" + generate_md_table(SERVERLESS_VULNERABILITIES_FQL_FILTERS) + """

=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches and multiple values: ['value1','value2']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
• For case-insensitive filtering, add .insensitive to field names
• Boolean values: true or false (no quotes)
• Wildcards (*) are unsupported in this API
• Some fields require specific capitalization (check individual field descriptions)

=== COMMON FILTER EXAMPLES ===
• Filter by cloud provider: cloud_provider:'aws'
• High severity vulnerabilities: severity:'HIGH'
• Recent vulnerabilities: first_seen_timestamp:>'2023-01-01'
• Filter by specific runtime: runtime:'nodejs'
• Filter by region: region:'us-east-1'
• Critical vulnerabilities in a specific account: severity:'CRITICAL'+cloud_account_id:'123456789012'
• Filter by function name: function_name:'payment-processor'
• High CVSS score vulnerabilities: cvss_base_score:>7.0
"""

```

--------------------------------------------------------------------------------
/falcon_mcp/modules/detections.py:
--------------------------------------------------------------------------------

```python
"""
Detections module for Falcon MCP Server

This module provides tools for accessing and analyzing CrowdStrike Falcon detections.
"""

from textwrap import dedent
from typing import Any, Dict, List

from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field

from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.detections import SEARCH_DETECTIONS_FQL_DOCUMENTATION

logger = get_logger(__name__)


class DetectionsModule(BaseModule):
    """Module for accessing and analyzing CrowdStrike Falcon detections."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.

        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.search_detections,
            name="search_detections",
        )

        self._add_tool(
            server=server,
            method=self.get_detection_details,
            name="get_detection_details",
        )

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """
        search_detections_fql_resource = TextResource(
            uri=AnyUrl("falcon://detections/search/fql-guide"),
            name="falcon_search_detections_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_detections` tool.",
            text=SEARCH_DETECTIONS_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            search_detections_fql_resource,
        )

    def search_detections(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://detections/search/fql-guide` resource when building this filter parameter.",
            examples={"agent_id:'77d11725xxxxxxxxxxxxxxxxxxxxc48ca19'", "status:'new'"},
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=9999,
            description="The maximum number of detections to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.",
        ),
        offset: int | None = Field(
            default=None,
            description="The first detection to return, where 0 is the latest detection. Use with the offset parameter to manage pagination of results.",
        ),
        q: str | None = Field(
            default=None,
            description="Search all detection metadata for the provided string",
        ),
        sort: str | None = Field(
            default=None,
            description=dedent("""
                Sort detections using these options:

                timestamp: Timestamp when the detection occurred
                created_timestamp: When the detection was created
                updated_timestamp: When the detection was last modified
                severity: Severity level of the detection (1-100, recommended when filtering by severity)
                confidence: Confidence level of the detection (1-100)
                agent_id: Agent ID associated with the detection

                Sort either asc (ascending) or desc (descending).
                Both formats are supported: 'severity.desc' or 'severity|desc'

                When searching for high severity detections, use 'severity.desc' to get the highest severity detections first.
                For chronological ordering, use 'timestamp.desc' for most recent detections first.

                Examples: 'severity.desc', 'timestamp.desc'
            """).strip(),
            examples={"severity.desc", "timestamp.desc"},
        ),
        include_hidden: bool = Field(default=True),
    ) -> List[Dict[str, Any]]:
        """Find and analyze detections to understand malicious activity in your environment.

        IMPORTANT: You must use the `falcon://detections/search/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_detections` tool.
        """
        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "q": q,
                "sort": sort,
            }
        )

        # Define the operation name
        operation = "GetQueriesAlertsV2"

        logger.debug("Searching detections with params: %s", params)

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Use handle_api_response to get detection IDs (now composite_ids)
        detection_ids = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to search detections",
            default_result=[],
        )

        # If handle_api_response returns an error dict instead of a list,
        # it means there was an error, so we return it wrapped in a list
        if self._is_error(detection_ids):
            return [detection_ids]

        # If we have detection IDs, get the details for each one
        if detection_ids:
            # Use the enhanced base method with composite_ids and include_hidden
            details = self._base_get_by_ids(
                operation="PostEntitiesAlertsV2",
                ids=detection_ids,
                id_key="composite_ids",
                include_hidden=include_hidden,
            )

            # If handle_api_response returns an error dict instead of a list,
            # it means there was an error, so we return it wrapped in a list
            if self._is_error(details):
                return [details]

            return details

        return []

    def get_detection_details(
        self,
        ids: List[str] = Field(
            description="Composite ID(s) to retrieve detection details for.",
        ),
        include_hidden: bool = Field(
            default=True,
            description="Whether to include hidden detections (default: True). When True, shows all detections including previously hidden ones for comprehensive visibility.",
        ),
    ) -> List[Dict[str, Any]] | Dict[str, Any]:
        """Get detection details for specific detection IDs to understand security threats.

        Use this when you already have specific detection IDs and need their full details.
        For searching/discovering detections, use the `falcon_search_detections` tool instead.
        """
        logger.debug("Getting detection details for ID(s): %s", ids)

        # Use the enhanced base method - composite_ids parameter matches ids for backward compatibility
        return self._base_get_by_ids(
            operation="PostEntitiesAlertsV2",
            ids=ids,
            id_key="composite_ids",
            include_hidden=include_hidden,
        )

```

--------------------------------------------------------------------------------
/docs/deployment/amazon_bedrock_agentcore.md:
--------------------------------------------------------------------------------

```markdown
# Deploying to Amazon Bedrock AgentCore

This guide walks you through deploying the Falcon MCP Server to Amazon Bedrock AgentCore. You'll configure the necessary AWS resources, set up IAM permissions, and prepare your environment.

## Prerequisites

Before deploying to Amazon Bedrock AgentCore, ensure you have your CrowdStrike API credentials and AWS environment properly configured.

### CrowdStrike API Credentials

You'll need to create API credentials in the CrowdStrike platform with the appropriate scopes for your intended use case.

1. **Create API Key**: Generate an API key in the CrowdStrike platform with the necessary scopes as outlined in [Available Modules, Tools & Resources](https://github.com/CrowdStrike/falcon-mcp/tree/main?tab=readme-ov-file#available-modules-tools--resources)

2. **Prepare Environment Variables**: You'll configure these values during agent deployment:
   - `FALCON_CLIENT_ID` - Your CrowdStrike API client ID
   - `FALCON_CLIENT_SECRET` - Your CrowdStrike API client secret
   - `FALCON_BASE_URL` - Your CrowdStrike API base URL (region-specific)

### AWS VPC Requirements

The MCP Server requires internet connectivity to communicate with CrowdStrike's APIs. We recommend deploying in an existing VPC used for your agentic tools.

**Required VPC Configuration:**

- **Internet Gateway or NAT Gateway** - Enables outbound internet connectivity
- **Outbound HTTPS Access** - Allow communication to `api.crowdstrike.com` on port 443
- **Security Groups** - Configure appropriate security group rules for your network requirements

## IAM Configuration

The MCP server requires specific IAM permissions to function within the Amazon Bedrock AgentCore environment. You'll create an execution role with the necessary policies and trust relationships.

> [!IMPORTANT]
> Replace all placeholder values with your specific environment details:
>
> - `{{region}}` - Your AWS region (e.g., `us-east-1`)
> - `{{accountId}}` - Your AWS account ID
> - `{{agentName}}` - Your agent name with no spaces or special characters (e.g., `falcon`). You'll need to decide the agent name **before** creating the role and AgentCore Runtime.

### Step 1: Create the IAM Execution Role

Create an IAM role with the following policy that grants the necessary permissions for container access, logging, monitoring, and Bedrock operations:

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "ECRImageAccess",
      "Effect": "Allow",
      "Action": [
        "ecr:BatchGetImage",
        "ecr:GetDownloadUrlForLayer"
      ],
      "Resource": [
        "arn:aws:ecr:us-east-1:709825985650:repository/crowdstrike/falcon-mcp"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams",
        "logs:CreateLogGroup"
      ],
      "Resource": [
        "arn:aws:logs:{{region}}:{{accountId}}:log-group:/aws/bedrock-agentcore/runtimes/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:{{region}}:{{accountId}}:log-group:*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:{{region}}:{{accountId}}:log-group:/aws/bedrock-agentcore/runtimes/*:log-stream:*"
      ]
    },
    {
      "Sid": "ECRTokenAccess",
      "Effect": "Allow",
      "Action": [
        "ecr:GetAuthorizationToken"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "xray:PutTraceSegments",
        "xray:PutTelemetryRecords",
        "xray:GetSamplingRules",
        "xray:GetSamplingTargets"
      ],
      "Resource": [
        "*"
      ]
    },
    {
      "Effect": "Allow",
      "Resource": "*",
      "Action": "cloudwatch:PutMetricData",
      "Condition": {
        "StringEquals": {
          "cloudwatch:namespace": "bedrock-agentcore"
        }
      }
    },
    {
      "Sid": "GetAgentAccessToken",
      "Effect": "Allow",
      "Action": [
        "bedrock-agentcore:GetWorkloadAccessToken",
        "bedrock-agentcore:GetWorkloadAccessTokenForJWT",
        "bedrock-agentcore:GetWorkloadAccessTokenForUserId"
      ],
      "Resource": [
        "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:workload-identity-directory/default",
        "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:workload-identity-directory/default/workload-identity/{{agentName}}-*"
      ]
    },
    {
      "Sid": "BedrockModelInvocation",
      "Effect": "Allow",
      "Action": [
        "bedrock:InvokeModel",
        "bedrock:InvokeModelWithResponseStream"
      ],
      "Resource": [
        "arn:aws:bedrock:*::foundation-model/*",
        "arn:aws:bedrock:{{region}}:{{accountId}}:*"
      ]
    }
  ]
}
```

> [!NOTE]
> Save the ARN of the IAM role - you'll need it for the deployment of the Amazon Bedrock AgentCore agent.

### Step 2: Create the IAM Trust Policy

Create a trust policy that allows the Bedrock AgentCore service to assume the execution role:

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AssumeRolePolicy",
      "Effect": "Allow",
      "Principal": {
        "Service": "bedrock-agentcore.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "{{accountId}}"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:*"
        }
      }
    }
  ]
}
```

### Step 3: Associate the Trust Policy

Attach the trust policy to the IAM execution role you created in Step 1. This completes the IAM configuration required for the MCP server to operate within Amazon Bedrock AgentCore.

## Next Steps

### Important Variables

To host this agent in Amazon Bedrock AgentCore, the following variables will need to be known:

| Variable | Description |
| :--- | :--- |
| `FALCON_CLIENT_ID` | The Client ID for your Falcon API credentials |
| `FALCON_CLIENT_SECRET` | The Client Secret for your Falcon API credentials |
| `FALCON_BASE_URL` | The base URL for your Falcon API environment |
| `AGENT_NAME` | The name of the agent (_ex: falconmcp_) |
| `AGENT_DESCRIPTION` | A description of the agent |
| `AGENT_ROLE_ARN` | The ARN of the IAM execution role created in Step 1 |

With your IAM configuration complete and variables prepared, you can now return to the **AWS Marketplace listing** to complete the deployment of your Falcon MCP Server agent in Amazon Bedrock AgentCore.

#### Example Deployment

```bash
aws bedrock-agentcore-control create-agent-runtime \
  --region us-east-1 \
  --agent-runtime-name "falconmcp" \
  --description "Falcon MCP Server Agent" \
  --agent-runtime-artifact '{
    "containerConfiguration": {
      "containerUri": "709825985650.dkr.ecr.us-east-1.amazonaws.com/crowdstrike/falcon-mcp:0.1.1"
    }
  }' \
  --role-arn "arn:aws:iam::example:role/bedrock-core-falcon-role" \
  --network-configuration '{
    "networkMode": "PUBLIC"
  }' \
  --protocol-configuration '{
    "serverProtocol": "MCP"
  }' \
  --environment-variables '{
    "FALCON_CLIENT_ID": "FALCON_CLIENT_ID_VALUE",
    "FALCON_CLIENT_SECRET": "FALCON_CLIENT_SECRET_VALUE",
    "FALCON_BASE_URL": "https://api.crowdstrike.com"
  }'
```

```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_intel.py:
--------------------------------------------------------------------------------

```python
"""
E2E tests for the Intel module.
"""

import unittest

import pytest

from tests.e2e.utils.base_e2e_test import BaseE2ETest, ensure_dict


@pytest.mark.e2e
class TestIntelModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Intel Module.
    """

    def test_search_actors_with_filter(self):
        """Verify the agent can search for actors with a filter."""

        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryIntelActorEntities",
                    "validator": lambda kwargs: "animal_classifier:'BEAR'"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "actor-1",
                                    "animal_classifier": "BEAR",
                                    "short_description": "Actor ELDERLY BEAR",
                                },
                                {
                                    "id": "actor-2",
                                    "animal_classifier": "BEAR",
                                    "short_description": "Actor CONSTANT BEAR",
                                },
                            ]
                        },
                    },
                }
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            prompt = "Find all threat actors with animal_classifier BEAR"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_actors")

            # Verify the tool input contains the filter
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("animal_classifier", tool_input.get("filter", ""))

            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                1,
                "Expected at least 1 API call",
            )
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            self.assertIn("animal_classifier:'BEAR'", api_call_params.get("filter", ""))

            # Verify result contains actor information
            self.assertIn("BEAR", result)
            self.assertIn("ELDERLY BEAR", result)
            self.assertIn("Actor CONSTANT BEAR", result)

        self.run_test_with_retries(
            "test_search_actors_with_filter", test_logic, assertions
        )

    def test_search_indicators_with_filter(self):
        """Verify the agent can search for indicators with a filter."""

        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryIntelIndicatorEntities",
                    "validator": lambda kwargs: "type:'hash_sha256'"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {"id": "indicator-1", "type": "hash_sha256"},
                                {"id": "indicator-2", "type": "hash_sha256"},
                            ]
                        },
                    },
                }
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            prompt = "Find all indicators of type hash_sha256"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_search_indicators"
            )

            # Verify the tool input contains the filter
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("hash_sha256", tool_input.get("filter", ""))

            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                1,
                "Expected at least 1 API call",
            )
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            self.assertIn("type:'hash_sha256'", api_call_params.get("filter", ""))

            # Verify result contains indicator information
            self.assertIn("indicator-1", result)
            self.assertIn("indicator-2", result)
            self.assertIn("hash_sha256", result)

        self.run_test_with_retries(
            "test_search_indicators_with_filter", test_logic, assertions
        )

    def test_search_reports_with_filter(self):
        """Verify the agent can search for reports with a filter."""

        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryIntelReportEntities",
                    "validator": lambda kwargs: "slug:'malware-analysis-report-1'"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "report-1",
                                    "name": "Malware Analysis Report 1",
                                    "slug": "malware-analysis-report-1",
                                },
                            ]
                        },
                    },
                }
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            prompt = "Find report with slug malware-analysis-report-1"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_reports")

            # Verify the tool input contains the filter
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("slug", tool_input.get("filter", ""))

            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                1,
                "Expected at least 1 API call",
            )
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            self.assertIn(
                "slug:'malware-analysis-report-1'", api_call_params.get("filter", "")
            )

            # Verify result contains report information
            self.assertIn("Malware Analysis Report 1", result)

        self.run_test_with_retries(
            "test_search_reports_with_filter", test_logic, assertions
        )


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Falcon MCP server.
"""

import unittest
from unittest.mock import MagicMock, patch

from falcon_mcp import registry
from falcon_mcp.server import FalconMCPServer


class TestFalconMCPServer(unittest.TestCase):
    """Test cases for the Falcon MCP server."""

    def setUp(self):
        """Set up test fixtures before each test method."""
        # Ensure modules are discovered before each test
        registry.discover_modules()

    @patch("falcon_mcp.server.FalconClient")
    @patch("falcon_mcp.server.FastMCP")
    def test_server_initialization(self, mock_fastmcp, mock_client):
        """Test server initialization with default settings."""
        # Setup mocks
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = True
        mock_client.return_value = mock_client_instance

        mock_server_instance = MagicMock()
        mock_fastmcp.return_value = mock_server_instance

        # Create server
        server = FalconMCPServer(
            base_url="https://api.test.crowdstrike.com",
            debug=True,
        )

        # Verify client initialization with direct parameters
        mock_client.assert_called_once()
        # Extract the arguments
        call_args = mock_client.call_args[1]
        self.assertEqual(call_args["base_url"], "https://api.test.crowdstrike.com")
        self.assertTrue(call_args["debug"])

        # Verify authentication
        mock_client_instance.authenticate.assert_called_once()

        # Verify server initialization
        mock_fastmcp.assert_called_once_with(
            name="Falcon MCP Server",
            instructions="This server provides access to CrowdStrike Falcon capabilities.",
            debug=True,
            log_level="DEBUG",
        )

        # Verify modules initialization
        available_module_names = registry.get_module_names()
        self.assertEqual(len(server.modules), len(available_module_names))
        for module_name in available_module_names:
            self.assertIn(module_name, server.modules)

    @patch("falcon_mcp.server.FalconClient")
    @patch("falcon_mcp.server.FastMCP")
    def test_server_with_specific_modules(self, mock_fastmcp, mock_client):
        """Test server initialization with specific modules."""
        # Setup mocks
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = True
        mock_client.return_value = mock_client_instance

        mock_server_instance = MagicMock()
        mock_fastmcp.return_value = mock_server_instance

        # Create server with only the detections module
        server = FalconMCPServer(enabled_modules={"detections"})

        # Verify modules initialization
        self.assertEqual(len(server.modules), 1)
        self.assertIn("detections", server.modules)

    @patch("falcon_mcp.server.FalconClient")
    def test_authentication_failure(self, mock_client):
        """Test server initialization with authentication failure."""
        # Setup mock
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = False
        mock_client.return_value = mock_client_instance

        # Verify authentication failure raises RuntimeError
        with self.assertRaises(RuntimeError):
            FalconMCPServer()

    @patch("falcon_mcp.server.FalconClient")
    def test_falcon_check_connectivity(self, mock_client):
        """Test checking Falcon API connectivity."""
        # Setup mock
        mock_client_instance = MagicMock()
        mock_client_instance.is_authenticated.return_value = True
        mock_client.return_value = mock_client_instance
        mock_client_instance.authenticate.return_value = True

        # Create server with mock client
        server = FalconMCPServer()

        # Call falcon_check_connectivity
        result = server.falcon_check_connectivity()

        # Verify client method was called
        mock_client_instance.is_authenticated.assert_called_once()

        # Verify result
        expected_result = {"connected": True}
        self.assertEqual(result, expected_result)

    @patch("falcon_mcp.server.FalconClient")
    def test_list_enabled_modules(self, mock_client):
        """Test listing enabled modules."""
        # Setup mock
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = True
        mock_client.return_value = mock_client_instance

        # Create server
        server = FalconMCPServer()

        # Call list_enabled_modules
        result = server.list_enabled_modules()

        # Get the actual module names from the registry
        expected_modules = registry.get_module_names()

        # Verify result matches registry (since all modules are enabled by default)
        self.assertEqual(set(result["modules"]), set(expected_modules))

    @patch("falcon_mcp.server.FalconClient")
    def test_list_enabled_modules_with_limited_modules(self, mock_client):
        """Test listing enabled modules with limited module set."""
        # Setup mock
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = True
        mock_client.return_value = mock_client_instance

        # Create server with only specific modules
        server = FalconMCPServer(enabled_modules={"detections", "cloud"})

        # Call list_enabled_modules
        result = server.list_enabled_modules()

        # Should only return enabled modules
        self.assertEqual(set(result["modules"]), {"detections", "cloud"})

        # Verify return type is correct
        self.assertIsInstance(result["modules"], list)

        # Verify each module name is a string
        for module_name in result["modules"]:
            self.assertIsInstance(module_name, str)

    @patch("falcon_mcp.server.FalconClient")
    def test_list_modules(self, mock_client):
        """Test listing all available modules."""
        # Setup mock
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = True
        mock_client.return_value = mock_client_instance

        # Create server with limited modules
        server = FalconMCPServer(enabled_modules={"detections", "cloud"})

        # Call list_modules
        result = server.list_modules()

        # Should return ALL modules from registry regardless of what's enabled
        expected_modules = registry.get_module_names()
        self.assertEqual(set(result["modules"]), set(expected_modules))

        # Verify return type is correct
        self.assertIsInstance(result["modules"], list)

        # Verify each module name is a string
        for module_name in result["modules"]:
            self.assertIsInstance(module_name, str)

    @patch("falcon_mcp.server.FalconClient")
    def test_list_modules_consistency(self, mock_client):
        """Test that list_modules always returns the same result."""
        # Setup mock
        mock_client_instance = MagicMock()
        mock_client_instance.authenticate.return_value = True
        mock_client.return_value = mock_client_instance

        # Create two servers with different enabled modules
        server1 = FalconMCPServer(enabled_modules={"detections"})
        server2 = FalconMCPServer(enabled_modules={"cloud", "intel"})

        # Both should return the same available modules
        result1 = server1.list_modules()
        result2 = server2.list_modules()

        self.assertEqual(set(result1["modules"]), set(result2["modules"]))

        # And both should match the registry
        expected_modules = registry.get_module_names()
        self.assertEqual(set(result1["modules"]), set(expected_modules))


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/modules/test_detections.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Detections module.
"""

import unittest

from falcon_mcp.modules.detections import DetectionsModule
from tests.modules.utils.test_modules import TestModules


class TestDetectionsModule(TestModules):
    """Test cases for the Detections module."""

    def setUp(self):
        """Set up test fixtures."""
        self.setup_module(DetectionsModule)

    def test_register_tools(self):
        """Test registering tools with the server."""
        expected_tools = [
            "falcon_search_detections",
            "falcon_get_detection_details",
        ]
        self.assert_tools_registered(expected_tools)

    def test_register_resources(self):
        """Test registering resources with the server."""
        expected_resources = [
            "falcon_search_detections_fql_guide",
        ]
        self.assert_resources_registered(expected_resources)

    def test_search_detections(self):
        """Test searching for detections."""
        # Setup mock responses for both API calls
        query_response = {
            "status_code": 200,
            "body": {"resources": ["detection1", "detection2"]},
        }
        details_response = {
            "status_code": 200,
            "body": {"resources": []},  # Empty resources for PostEntitiesAlertsV2
        }
        self.mock_client.command.side_effect = [query_response, details_response]

        # Call search_detections
        result = self.module.search_detections(
            filter="test query", limit=10, include_hidden=True
        )

        # Verify client commands were called correctly
        self.assertEqual(self.mock_client.command.call_count, 2)

        # Check that the first call was to GetQueriesAlertsV2 with the right filter and limit
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(first_call[0][0], "GetQueriesAlertsV2")
        self.assertEqual(first_call[1]["parameters"]["filter"], "test query")
        self.assertEqual(first_call[1]["parameters"]["limit"], 10)
        self.mock_client.command.assert_any_call(
            "PostEntitiesAlertsV2",
            body={
                "composite_ids": ["detection1", "detection2"],
                "include_hidden": True,
            },
        )

        # Verify result
        self.assertEqual(
            result, []
        )  # Empty list because PostEntitiesAlertsV2 returned empty resources

    def test_search_detections_with_details(self):
        """Test searching for detections with details."""
        # Setup mock responses
        query_response = {
            "status_code": 200,
            "body": {"resources": ["detection1", "detection2"]},
        }
        details_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {"id": "detection1", "name": "Test Detection 1"},
                    {"id": "detection2", "name": "Test Detection 2"},
                ]
            },
        }
        self.mock_client.command.side_effect = [query_response, details_response]

        # Call search_detections
        result = self.module.search_detections(
            filter="test query", limit=10, include_hidden=True
        )

        # Verify client commands were called correctly
        self.assertEqual(self.mock_client.command.call_count, 2)

        # Check that the first call was to GetQueriesAlertsV2 with the right filter and limit
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(first_call[0][0], "GetQueriesAlertsV2")
        self.assertEqual(first_call[1]["parameters"]["filter"], "test query")
        self.assertEqual(first_call[1]["parameters"]["limit"], 10)
        self.mock_client.command.assert_any_call(
            "PostEntitiesAlertsV2",
            body={
                "composite_ids": ["detection1", "detection2"],
                "include_hidden": True,
            },
        )

        # Verify result
        expected_result = [
            {"id": "detection1", "name": "Test Detection 1"},
            {"id": "detection2", "name": "Test Detection 2"},
        ]
        self.assertEqual(result, expected_result)

    def test_search_detections_error(self):
        """Test searching for detections with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Invalid query"}]},
        }
        self.mock_client.command.return_value = mock_response

        # Call search_detections
        result = self.module.search_detections(filter="invalid query")

        # Verify result contains error
        self.assertEqual(len(result), 1)
        self.assertIn("error", result[0])
        self.assertIn("details", result[0])

    def test_get_detection_details(self):
        """Test getting detection details."""
        # Setup mock response
        mock_response = {
            "status_code": 200,
            "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
        }
        self.mock_client.command.return_value = mock_response

        # Call get_detection_details
        result = self.module.get_detection_details(["detection1"], include_hidden=True)

        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "PostEntitiesAlertsV2",
            body={"composite_ids": ["detection1"], "include_hidden": True},
        )

        # Verify result - handle_api_response returns a list of resources
        expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
        self.assertEqual(result, expected_result)

    def test_get_detection_details_not_found(self):
        """Test getting detection details for non-existent detection."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response

        # Call get_detection_details
        result = self.module.get_detection_details(["nonexistent"])

        # For empty resources, handle_api_response returns the default_result (empty list)
        # We should check that the result is empty
        self.assertEqual(result, [])

    def test_search_detections_include_hidden_false(self):
        """Test searching for detections with include_hidden=False."""
        # Setup mock responses for both API calls
        query_response = {
            "status_code": 200,
            "body": {"resources": ["detection1", "detection2"]},
        }
        details_response = {
            "status_code": 200,
            "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
        }
        self.mock_client.command.side_effect = [query_response, details_response]

        # Call search_detections with include_hidden=False
        result = self.module.search_detections(
            filter="test query", include_hidden=False
        )

        # Verify client commands were called correctly
        self.assertEqual(self.mock_client.command.call_count, 2)

        # Check that the second call includes include_hidden=False
        self.mock_client.command.assert_any_call(
            "PostEntitiesAlertsV2",
            body={
                "composite_ids": ["detection1", "detection2"],
                "include_hidden": False,
            },
        )

        # Verify result
        expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
        self.assertEqual(result, expected_result)

    def test_get_detection_details_include_hidden_false(self):
        """Test getting detection details with include_hidden=False."""
        # Setup mock response
        mock_response = {
            "status_code": 200,
            "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
        }
        self.mock_client.command.return_value = mock_response

        # Call get_detection_details with include_hidden=False
        result = self.module.get_detection_details(["detection1"], include_hidden=False)

        # Verify client command was called correctly with include_hidden=False
        self.mock_client.command.assert_called_once_with(
            "PostEntitiesAlertsV2",
            body={"composite_ids": ["detection1"], "include_hidden": False},
        )

        # Verify result
        expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
        self.assertEqual(result, expected_result)


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/falcon_mcp/modules/discover.py:
--------------------------------------------------------------------------------

```python
"""
Discover module for Falcon MCP Server

This module provides tools for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets.
"""

from textwrap import dedent
from typing import Any, Dict, List

from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field

from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.discover import (
    SEARCH_APPLICATIONS_FQL_DOCUMENTATION,
    SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION,
)

logger = get_logger(__name__)


class DiscoverModule(BaseModule):
    """Module for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.

        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.search_applications,
            name="search_applications",
        )

        self._add_tool(
            server=server,
            method=self.search_unmanaged_assets,
            name="search_unmanaged_assets",
        )

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """
        search_applications_fql_resource = TextResource(
            uri=AnyUrl("falcon://discover/applications/fql-guide"),
            name="falcon_search_applications_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_applications` tool.",
            text=SEARCH_APPLICATIONS_FQL_DOCUMENTATION,
        )

        search_unmanaged_assets_fql_resource = TextResource(
            uri=AnyUrl("falcon://discover/hosts/fql-guide"),
            name="falcon_search_unmanaged_assets_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_unmanaged_assets` tool.",
            text=SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            search_applications_fql_resource,
        )

        self._add_resource(
            server,
            search_unmanaged_assets_fql_resource,
        )

    def search_applications(
        self,
        filter: str = Field(
            description="FQL filter expression used to limit the results. IMPORTANT: use the `falcon://discover/applications/fql-guide` resource when building this filter parameter.",
            examples={"name:'Chrome'", "vendor:'Microsoft Corporation'"},
        ),
        facet: str | None = Field(
            default=None,
            description=dedent("""
                Type of data to be returned for each application entity. The facet filter allows you to limit the response to just the information you want.

                Possible values:
                • browser_extension
                • host_info
                • install_usage

                Note: Requests that do not include the host_info or browser_extension facets still return host.ID, browser_extension.ID, and browser_extension.enabled in the response.
            """).strip(),
            examples={"browser_extension", "host_info", "install_usage"},
        ),
        limit: int = Field(
            default=100,
            ge=1,
            le=1000,
            description="Maximum number of items to return: 1-1000. Default is 100.",
        ),
        sort: str | None = Field(
            default=None,
            description="Property used to sort the results. All properties can be used to sort unless otherwise noted in their property descriptions.",
            examples={"name.asc", "vendor.desc", "last_updated_timestamp.desc"},
        ),
    ) -> List[Dict[str, Any]]:
        """Search for applications in your CrowdStrike environment.

        IMPORTANT: You must use the `falcon://discover/applications/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_applications` tool.
        """
        # Prepare parameters for combined_applications
        params = prepare_api_parameters(
            {
                "filter": filter,
                "facet": facet,
                "limit": limit,
                "sort": sort,
            }
        )

        # Define the operation name
        operation = "combined_applications"

        logger.debug("Searching applications with params: %s", params)

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Use handle_api_response to get application data
        applications = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to search applications",
            default_result=[],
        )

        # If handle_api_response returns an error dict instead of a list,
        # it means there was an error, so we return it wrapped in a list
        if self._is_error(applications):
            return [applications]

        return applications

    def search_unmanaged_assets(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL filter expression used to limit the results. IMPORTANT: use the `falcon://discover/hosts/fql-guide` resource when building this filter parameter. Note: entity_type:'unmanaged' is automatically applied.",
            examples={"platform_name:'Windows'", "criticality:'Critical'"},
        ),
        limit: int = Field(
            default=100,
            ge=1,
            le=5000,
            description="Maximum number of items to return: 1-5000. Default is 100.",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return results.",
        ),
        sort: str | None = Field(
            default=None,
            description=dedent("""
                Sort unmanaged assets using these options:

                hostname: Host name/computer name
                last_seen_timestamp: Timestamp when the asset was last seen
                first_seen_timestamp: Timestamp when the asset was first seen
                platform_name: Operating system platform
                os_version: Operating system version
                external_ip: External IP address
                country: Country location
                criticality: Criticality level

                Sort either asc (ascending) or desc (descending).
                Both formats are supported: 'hostname.desc' or 'hostname|desc'

                Examples: 'hostname.asc', 'last_seen_timestamp.desc', 'criticality.desc'
            """).strip(),
            examples={"hostname.asc", "last_seen_timestamp.desc", "criticality.desc"},
        ),
    ) -> List[Dict[str, Any]]:
        """Search for unmanaged assets (hosts) in your CrowdStrike environment.

        These are systems that do not have the Falcon sensor installed but have been
        discovered by systems that do have a Falcon sensor installed.

        IMPORTANT: You must use the `falcon://discover/hosts/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_unmanaged_assets` tool.

        The tool automatically filters for unmanaged assets only by adding entity_type:'unmanaged' to all queries.
        You do not need to (and cannot) specify entity_type in your filter - it is always set to 'unmanaged'.
        """
        # Always enforce entity_type:'unmanaged' filter
        base_filter = "entity_type:'unmanaged'"

        # Combine with user filter if provided
        if filter:
            combined_filter = f"{base_filter}+{filter}"
        else:
            combined_filter = base_filter

        # Prepare parameters for combined_hosts
        params = prepare_api_parameters(
            {
                "filter": combined_filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
            }
        )

        # Define the operation name
        operation = "combined_hosts"

        logger.debug("Searching unmanaged assets with params: %s", params)

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Use handle_api_response to get unmanaged asset data
        assets = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to search unmanaged assets",
            default_result=[],
        )

        # If handle_api_response returns an error dict instead of a list,
        # it means there was an error, so we return it wrapped in a list
        if self._is_error(assets):
            return [assets]

        return assets

```

--------------------------------------------------------------------------------
/tests/modules/test_serverless.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Serverless module.
"""

import unittest
from unittest.mock import MagicMock, patch

from falcon_mcp.modules.serverless import ServerlessModule
from tests.modules.utils.test_modules import TestModules


class TestServerlessModule(TestModules):
    """Test cases for the Serverless module."""

    def setUp(self):
        """Set up test fixtures."""
        self.setup_module(ServerlessModule)

    def test_register_tools(self):
        """Test registering tools with the server."""
        expected_tools = [
            "falcon_search_serverless_vulnerabilities",
        ]
        self.assert_tools_registered(expected_tools)

    def test_register_resources(self):
        """Test registering resources with the server."""
        expected_resources = [
            "falcon_serverless_vulnerabilities_fql_guide",
        ]
        self.assert_resources_registered(expected_resources)

    def test_search_serverless_vulnerabilities_success(self):
        """Test searching serverless vulnerabilities with successful response."""
        # Setup mock response with sample vulnerability data
        mock_response = {
            "status_code": 200,
            "body": {
                "runs": [
                    {
                        "tool": {
                            "driver": {
                                "name": "CrowdStrike",
                                "informationUri": "https://www.crowdstrike.com/",
                                "rules": [
                                    {
                                        "id": "CVE-2023-12345",
                                        "name": "PythonPackageVulnerability",
                                        "shortDescription": {"text": "Test vulnerability description"},
                                        "fullDescription": {"text": "Test vulnerability full description"},
                                        "help": {"text": "Package: test-package\nVulnerability: CVE-2023-12345"},
                                        "properties": {
                                            "severity": "HIGH",
                                            "cvssBaseScore": 8.5,
                                            "remediations": ["Upgrade to version 2.0.0"]
                                        }
                                    }
                                ]
                            }
                        }
                    }
                ]
            },
        }
        self.mock_client.command.return_value = mock_response

        # Mock the prepare_api_parameters function to return a simple dict
        self.module.search_serverless_vulnerabilities = MagicMock(return_value=mock_response["body"]["runs"])

        # Call search_serverless_vulnerabilities with test parameters
        result = self.module.search_serverless_vulnerabilities(
            filter="cloud_provider:'aws'"
        )

        # Verify result contains expected values
        self.assertEqual(len(result), 1)
        self.assertEqual(result[0]["tool"]["driver"]["name"], "CrowdStrike")
        self.assertEqual(
            result[0]["tool"]["driver"]["rules"][0]["id"], "CVE-2023-12345"
        )
        self.assertEqual(
            result[0]["tool"]["driver"]["rules"][0]["properties"]["severity"], "HIGH"
        )

    def test_search_serverless_vulnerabilities_no_filter(self):
        """Test searching serverless vulnerabilities with no filter parameter."""
        # In the serverless module, filter is a required parameter with no default
        # So we should test that it's properly required

        # Create a mock implementation that raises TypeError when filter is not provided
        def mock_search(*args, **kwargs):
            if "filter" not in kwargs:
                raise TypeError("filter is a required parameter")
            return []

        # Replace the method with our mock
        self.module.search_serverless_vulnerabilities = mock_search

        # This should raise a TypeError since filter is a required parameter
        with self.assertRaises(TypeError):
            self.module.search_serverless_vulnerabilities()

    def test_search_serverless_vulnerabilities_empty_response(self):
        """Test searching serverless vulnerabilities with empty response."""
        # Mock the method to return empty runs
        self.module.search_serverless_vulnerabilities = MagicMock(return_value=[])

        # Call search_serverless_vulnerabilities
        result = self.module.search_serverless_vulnerabilities(
            filter="cloud_provider:'aws'"
        )

        # Verify result is an empty list
        self.assertEqual(result, [])

    def test_search_serverless_vulnerabilities_error(self):
        """Test searching serverless vulnerabilities with API error."""
        # Setup mock error response
        error_response = {
            "error": "Failed to search serverless vulnerabilities: Request failed with status code 400",
            "details": {"errors": [{"message": "Invalid query"}]},
        }

        # Mock the method to return the error
        self.module.search_serverless_vulnerabilities = MagicMock(return_value=[error_response])

        # Call search_serverless_vulnerabilities
        results = self.module.search_serverless_vulnerabilities(
            filter="invalid query"
        )
        result = results[0]

        # Verify result contains error
        self.assertIn("error", result)
        self.assertIn("details", result)
        # Check that the error message starts with the expected prefix
        self.assertTrue(
            result["error"].startswith("Failed to search serverless vulnerabilities")
        )

    def test_search_serverless_vulnerabilities_with_all_params(self):
        """Test searching serverless vulnerabilities with all parameters."""
        # Setup mock response
        mock_response = [
            {
                "tool": {
                    "driver": {
                        "name": "CrowdStrike",
                        "rules": [
                            {
                                "id": "CVE-2023-12345",
                                "properties": {"severity": "HIGH"}
                            }
                        ]
                    }
                }
            }
        ]

        # Mock the method to return the mock response
        self.module.search_serverless_vulnerabilities = MagicMock(return_value=mock_response)

        # Call search_serverless_vulnerabilities with all parameters
        result = self.module.search_serverless_vulnerabilities(
            filter="cloud_provider:'aws'",
            limit=5,
            offset=10,
            sort="severity",
        )

        # Verify result contains expected values
        self.assertEqual(len(result), 1)
        self.assertEqual(result[0]["tool"]["driver"]["name"], "CrowdStrike")

    def test_search_serverless_vulnerabilities_missing_runs(self):
        """Test searching serverless vulnerabilities with missing runs in response."""
        # Setup mock response with missing runs
        error_response = {
            "error": "Failed to search serverless vulnerabilities: Missing 'runs' in response",
            "details": {"body": {}}
        }

        # Mock the method to return the error
        self.module.search_serverless_vulnerabilities = MagicMock(return_value=[error_response])

        # Call search_serverless_vulnerabilities
        result = self.module.search_serverless_vulnerabilities(
            filter="cloud_provider:'aws'"
        )

        # Verify result is a list with one item containing error info
        self.assertEqual(len(result), 1)
        self.assertIn("error", result[0])

    def test_search_serverless_vulnerabilities_none_runs(self):
        """Test searching serverless vulnerabilities when 'runs' key exists but is None."""
        # We need to mock handle_api_response to return a dict with runs=None
        # This simulates the case where the API returns a response with runs=None

        # Create a mock response that handle_api_response will process
        mock_api_response = {
            "status_code": 200,
            "body": {
                # The actual structure doesn't matter as we'll mock handle_api_response
            }
        }
        self.mock_client.command.return_value = mock_api_response

        # Mock handle_api_response to return a dict with runs=None
        mock_processed_response = {"runs": None}  # This is what we want to test

        with patch('falcon_mcp.modules.serverless.handle_api_response', return_value=mock_processed_response):
            # Call search_serverless_vulnerabilities
            result = self.module.search_serverless_vulnerabilities(
                filter="cloud_provider:'aws'"
            )

            # Verify result is an empty list
            self.assertEqual(result, [])

            # Verify the API was called with the correct parameters
            self.mock_client.command.assert_called_once()
            call_args = self.mock_client.command.call_args[1]
            self.assertEqual(call_args["parameters"]["filter"], "cloud_provider:'aws'")


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_detections.py:
--------------------------------------------------------------------------------

```python
"""
E2E tests for the Detections module.
"""

import json
import unittest

import pytest

from tests.e2e.utils.base_e2e_test import BaseE2ETest


@pytest.mark.e2e
class TestDetectionsModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Detections Module.
    """

    def test_get_top_3_high_severity_detections(self):
        """Verify the agent can retrieve the top 3 high-severity detections."""

        async def test_logic():
            fixtures = [
                {
                    "operation": "GetQueriesAlertsV2",
                    "validator": lambda kwargs: "severity:"
                    in kwargs.get("parameters", {}).get("filter", "").lower()
                    and kwargs.get("parameters", {}).get("limit", 0) <= 10,
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": ["detection-1", "detection-2", "detection-3"]
                        },
                    },
                },
                {
                    "operation": "PostEntitiesAlertsV2",
                    "validator": lambda kwargs: "detection-1"
                    in kwargs.get("body", {}).get("composite_ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "detection-1",
                                    "composite_id": "detection-1",
                                    "status": "new",
                                    "severity": 90,
                                    "severity_name": "Critical",
                                    "confidence": 85,
                                    "description": "A critical detection for E2E testing.",
                                    "created_timestamp": "2024-01-20T10:00:00Z",
                                    "agent_id": "test-agent-001",
                                },
                                {
                                    "id": "detection-2",
                                    "composite_id": "detection-2",
                                    "status": "new",
                                    "severity": 70,
                                    "severity_name": "High",
                                    "confidence": 80,
                                    "description": "A high severity detection for E2E testing.",
                                    "created_timestamp": "2024-01-20T09:30:00Z",
                                    "agent_id": "test-agent-002",
                                },
                                {
                                    "id": "detection-3",
                                    "composite_id": "detection-3",
                                    "status": "new",
                                    "severity": 70,
                                    "severity_name": "High",
                                    "confidence": 75,
                                    "description": "Another high severity detection for E2E testing.",
                                    "created_timestamp": "2024-01-20T09:00:00Z",
                                    "agent_id": "test-agent-003",
                                },
                            ]
                        },
                    },
                },
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            prompt = "Give me the details of the top 3 high severity detections, return only detection id and descriptions"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_search_detections"
            )
            # Check for severity-related filtering (numeric or text-based)
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "severity:" in tool_input_str or "high" in tool_input_str,
                f"Expected severity filtering in tool input: {tool_input_str}",
            )
            self.assertIn("detection-1", used_tool["output"])
            self.assertIn("detection-2", used_tool["output"])
            self.assertIn("detection-3", used_tool["output"])

            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
            )
            api_call_1_params = self._mock_api_instance.command.call_args_list[0][
                1
            ].get("parameters", {})
            filter_str = api_call_1_params.get("filter", "").lower()
            # Accept either numeric severity filters or text-based filters
            self.assertTrue(
                "severity:" in filter_str or "high" in filter_str,
                f"Expected severity filtering in API call: {filter_str}",
            )
            self.assertEqual(api_call_1_params.get("limit"), 3)
            self.assertIn("severity.desc", api_call_1_params.get("sort", ""))
            api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
                "body", {}
            )
            self.assertEqual(
                api_call_2_body.get("composite_ids"),
                ["detection-1", "detection-2", "detection-3"],
            )

            self.assertIn("detection-1", result)
            self.assertIn("detection-2", result)
            self.assertIn("detection-3", result)

        self.run_test_with_retries(
            "test_get_top_3_high_severity_detections",
            test_logic,
            assertions,
        )

    def test_get_highest_detection_for_ip(self):
        """Verify the agent can find the highest-severity detection for a specific IP."""

        async def test_logic():
            fixtures = [
                {
                    "operation": "GetQueriesAlertsV2",
                    "validator": lambda kwargs: "10.0.0.1"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {"resources": ["detection-4"]},
                    },
                },
                {
                    "operation": "PostEntitiesAlertsV2",
                    "validator": lambda kwargs: "detection-4"
                    in kwargs.get("body", {}).get("composite_ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "detection-4",
                                    "composite_id": "detection-4",
                                    "status": "new",
                                    "severity": 90,
                                    "severity_name": "Critical",
                                    "confidence": 95,
                                    "description": "A critical detection on a specific IP.",
                                    "created_timestamp": "2024-01-20T11:00:00Z",
                                    "agent_id": "test-agent-004",
                                    "local_ip": "10.0.0.1",
                                }
                            ]
                        },
                    },
                },
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            prompt = "What is the highest detection for the device with local_ip 10.0.0.1? Return the detection id as well"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            self.assertGreaterEqual(
                len(tools), 1, f"Expected 1 tool call, but got {len(tools)}"
            )
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_search_detections"
            )
            self.assertIn("10.0.0.1", json.dumps(used_tool["input"]["tool_input"]))
            self.assertIn("detection-4", used_tool["output"])

            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
            )
            api_call_1_params = self._mock_api_instance.command.call_args_list[0][
                1
            ].get("parameters", {})
            self.assertIn("10.0.0.1", api_call_1_params.get("filter"))
            api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
                "body", {}
            )
            self.assertEqual(api_call_2_body.get("composite_ids"), ["detection-4"])

            self.assertIn("detection-4", result)
            self.assertNotIn("detection-1", result)

        self.run_test_with_retries(
            "test_get_highest_detection_for_ip", test_logic, assertions
        )


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/docs/resource_development.md:
--------------------------------------------------------------------------------

```markdown
# Falcon MCP Server Resource Development Guide

This guide provides instructions for implementing and registering resources for the Falcon MCP server.

## What are Resources?

Resources in the Falcon MCP server represent data sources that can be accessed by the model. Unlike tools, which perform actions, resources provide context or information that the model can reference. Resources are particularly useful for:

1. Providing documentation or guides that the model can reference
2. Exposing structured data that tools can use
3. Making reference information available without requiring a tool call

## Resource Structure

Each resource should:

1. Be created using an appropriate resource class (e.g., `TextResource`)
2. Have a unique URI that identifies it
3. Include a descriptive name and description
4. Be registered with the MCP server through a module's `register_resources` method

## Step-by-Step Implementation Guide

### 1. Create Resource Content

First, define the content for your resource. This could be:

- Documentation text in a separate file
- Structured data in a Python dictionary
- Reference information in a dedicated module

For text-based resources, it's recommended to store the content in a separate file in the `falcon_mcp/resources` directory:

```python
# falcon_mcp/resources/your_resource.py
YOUR_RESOURCE_CONTENT = """
Detailed documentation or reference information goes here.
This can be multi-line text with formatting.

## Section Headers

- Bullet points
- And other formatting

Code examples:
...

"""

```

### 2. Register Resources in Your Module

In your module class, implement the `register_resources` method:

```python
from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl

from ..resources.your_resource import YOUR_RESOURCE_CONTENT
from .base import BaseModule


class YourModule(BaseModule):
    """Your module description."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server."""
        # Tool registration code...

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """
        your_resource = TextResource(
            uri=AnyUrl("falcon://your-module/resource-name"),
            name="your_resource_name",
            description="Description of what this resource provides.",
            text=YOUR_RESOURCE_CONTENT,
        )

        self._add_resource(
            server,
            your_resource,
        )
```

### 3. Resource URI Conventions

Resource URIs should follow a consistent pattern:

- Start with `falcon://` as the scheme
- Include the module name as the first path segment
- Use descriptive path segments for the resource
- Use hyphens to separate words in path segments

Examples:

- `falcon://intel/query_actor_entities/fql-guide`
- `falcon://detections/search/fql-guide`
- `falcon://incidents/status-codes`

### 4. Resource Types

The MCP server supports several resource types:

#### TextResource

Used for providing text-based documentation or reference information:

```python
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl

text_resource = TextResource(
    uri=AnyUrl("falcon://module/resource-name"),
    name="resource_name",
    description="Resource description",
    text="Resource content",
)
```

#### Other Resource Types

Additional resource types may be available depending on the MCP server implementation. Consult the MCP server documentation for details on other resource types.

## Best Practices

### Resource Content

1. **Comprehensive Documentation**: Provide detailed information that covers all aspects of the topic
2. **Structured Format**: Use clear section headers, bullet points, and code examples
3. **Examples**: Include practical examples that demonstrate usage
4. **Consistent Style**: Follow a consistent documentation style across all resources

### Resource Registration

1. **Descriptive Names**: Use clear, descriptive names for resources
2. **Detailed Descriptions**: Provide informative descriptions that explain what the resource contains
3. **Logical Organization**: Group related resources together with consistent URI patterns
4. **Reference from Tools**: Reference resources in tool documentation where appropriate

### Resource Usage

1. **Tool Integration**: Design resources to complement tools by providing context or documentation
2. **Self-Contained**: Resources should be self-contained and not require additional context
3. **Versioning**: Consider versioning strategies for resources that may change over time

## Example: Intel Module Resources

The Intel module provides a good example of resource implementation:

```python
from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl

from ..resources.intel import QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION
from .base import BaseModule


class IntelModule(BaseModule):
    """Module for accessing and analyzing CrowdStrike Falcon intelligence data."""

    def register_resources(self, server) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """

        query_actor_entities_resource = TextResource(
            uri=AnyUrl("falcon://intel/query_actor_entities/fql-guide"),
            name="falcon_query_actor_entities_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_actors` tool.",
            text=QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            query_actor_entities_resource,
        )
```

In this example:

1. The resource content (`QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION`) is defined in a separate file (`falcon_mcp/resources/intel.py`)
2. The resource is created as a `TextResource` with a clear URI, name, and description
3. The resource is registered with the server using the `_add_resource` method
4. The resource complements the `search_actors` tool by providing documentation for its `filter` parameter

## Integrating Resources with Tools

Resources can be particularly valuable when integrated with tools. Here's how the Intel module references its resource in a tool method:

```python
def query_actor_entities(
    self,
    filter: Optional[str] = Field(
        default=None,
        description="FQL query expression that should be used to limit the results. IMPORTANT: use the 'falcon://query_actor_entities_fql_documentation' resource when building this parameter.",
    ),
    # Other parameters...
) -> List[Dict[str, Any]]:
    """Get info about actors that match provided FQL filters.

    IMPORTANT: You must call the FQL Guide for Intel Query Actor Entities (falcon://intel/query_actor_entities/fql-guide) resource first

    Returns:
        Information about actors that match the provided filters.
    """
    # Method implementation...
```

Note how:

1. The resource URI is referenced in the parameter description
2. The docstring explicitly mentions the resource that should be consulted
3. This creates a clear link between the tool and its supporting documentation

## Contributing Resource Changes

When contributing new resources or changes to existing resources, please follow these guidelines:

### Conventional Commits for Resources

This project uses [Conventional Commits](https://www.conventionalcommits.org/) for automated releases and clear commit history. When contributing resource-related changes, use these commit message patterns:

**Adding New Resources:**

```bash
git commit -m "feat(resources): add FQL guide for [module-name] module"
git commit -m "feat(resources): add documentation for [specific-topic]"
# Examples:
git commit -m "feat(resources): add FQL guide for cloud module"
git commit -m "feat(resources): add hosts search documentation"
```

**Modifying Existing Resources:**

```bash
git commit -m "refactor(resources): reword FQL guide in cloud resource"
git commit -m "fix(resources): correct formatting in intel FQL documentation"
git commit -m "docs(resources): update resource development guide"
# Examples:
git commit -m "refactor(resources): improve clarity in detections FQL guide"
git commit -m "fix(resources): correct syntax examples in incidents resource"
```

**Resource Tests and Infrastructure:**

```bash
git commit -m "test(resources): add validation tests for resource content"
git commit -m "chore(resources): update resource registration patterns"
```

See the main [CONTRIBUTING.md](CONTRIBUTING.md) guide for complete conventional commits guidelines.

## Conclusion

Resources are a powerful way to provide context and documentation to the model. By following the guidelines in this document, you can create effective resources that complement your tools and enhance the overall functionality of the Falcon MCP server.

When developing resources:

1. Focus on providing clear, comprehensive information
2. Follow consistent naming and URI conventions
3. Integrate resources with related tools
4. Test resource registration to ensure proper functionality

Resources, when used effectively alongside tools, create a more powerful and user-friendly experience by providing the necessary context and documentation for complex operations.

```

--------------------------------------------------------------------------------
/tests/modules/test_discover.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests for the Discover module.
"""

import unittest
from unittest.mock import MagicMock, patch

from mcp.server import FastMCP

from falcon_mcp.client import FalconClient
from falcon_mcp.modules.discover import DiscoverModule


class TestDiscoverModule(unittest.TestCase):
    """Test cases for the Discover module."""

    def setUp(self):
        """Set up test fixtures."""
        self.client = MagicMock(spec=FalconClient)
        self.module = DiscoverModule(self.client)
        self.server = MagicMock(spec=FastMCP)

    def test_register_tools(self):
        """Test that tools are registered correctly."""
        self.module.register_tools(self.server)
        self.assertEqual(self.server.add_tool.call_count, 2)
        self.assertEqual(len(self.module.tools), 2)
        self.assertEqual(self.module.tools[0], "falcon_search_applications")
        self.assertEqual(self.module.tools[1], "falcon_search_unmanaged_assets")

    def test_register_resources(self):
        """Test that resources are registered correctly."""
        self.module.register_resources(self.server)
        self.assertEqual(self.server.add_resource.call_count, 2)
        self.assertEqual(len(self.module.resources), 2)
        self.assertEqual(
            str(self.module.resources[0]), "falcon://discover/applications/fql-guide"
        )
        self.assertEqual(
            str(self.module.resources[1]), "falcon://discover/hosts/fql-guide"
        )

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_applications(self, mock_handle_response, mock_prepare_params):
        """Test search_applications method."""
        # Setup mocks
        mock_prepare_params.return_value = {"filter": "name:'Chrome'"}
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        mock_handle_response.return_value = [{"id": "app1", "name": "Chrome"}]

        # Call the method
        result = self.module.search_applications(filter="name:'Chrome'")

        # Assertions
        # Don't check the exact arguments, just verify it was called once
        self.assertEqual(mock_prepare_params.call_count, 1)
        self.client.command.assert_called_once_with(
            "combined_applications", parameters={"filter": "name:'Chrome'"}
        )
        mock_handle_response.assert_called_once_with(
            mock_response,
            operation="combined_applications",
            error_message="Failed to search applications",
            default_result=[],
        )
        self.assertEqual(result, [{"id": "app1", "name": "Chrome"}])

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_applications_with_error(self, mock_handle_response, mock_prepare_params):
        """Test search_applications method when an error occurs."""
        # Setup mocks
        mock_prepare_params.return_value = {"filter": "name:'Chrome'"}
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        error_response = {"error": "API Error", "message": "Something went wrong"}
        mock_handle_response.return_value = error_response

        # Call the method
        result = self.module.search_applications(filter="name:'Chrome'")

        # Assertions
        self.assertEqual(result, [error_response])

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_applications_with_all_params(self, mock_handle_response, mock_prepare_params):
        """Test search_applications method with all parameters."""
        # Setup mocks
        mock_prepare_params.return_value = {
            "filter": "name:'Chrome'",
            "facet": "host_info",
            "limit": 50,
            "sort": "name.asc",
        }
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        mock_handle_response.return_value = [{"id": "app1", "name": "Chrome"}]

        # Call the method
        result = self.module.search_applications(
            filter="name:'Chrome'",
            facet="host_info",
            limit=50,
            sort="name.asc",
        )

        # Assertions
        # Don't check the exact arguments, just verify it was called once
        self.assertEqual(mock_prepare_params.call_count, 1)
        self.client.command.assert_called_once_with(
            "combined_applications",
            parameters={
                "filter": "name:'Chrome'",
                "facet": "host_info",
                "limit": 50,
                "sort": "name.asc",
            },
        )
        self.assertEqual(result, [{"id": "app1", "name": "Chrome"}])

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_unmanaged_assets(self, mock_handle_response, mock_prepare_params):
        """Test search_unmanaged_assets method."""
        # Setup mocks
        mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]

        # Call the method
        result = self.module.search_unmanaged_assets(filter="platform_name:'Windows'")

        # Assertions
        # Don't check the exact arguments, just verify it was called once
        self.assertEqual(mock_prepare_params.call_count, 1)
        self.client.command.assert_called_once_with(
            "combined_hosts", parameters={"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
        )
        mock_handle_response.assert_called_once_with(
            mock_response,
            operation="combined_hosts",
            error_message="Failed to search unmanaged assets",
            default_result=[],
        )
        self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_unmanaged_assets_without_filter(self, mock_handle_response, mock_prepare_params):
        """Test search_unmanaged_assets method without user filter."""
        # Setup mocks
        mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'"}
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]

        # Call the method with no filter
        result = self.module.search_unmanaged_assets()

        # Assertions
        # Don't check the exact arguments, just verify it was called once
        self.assertEqual(mock_prepare_params.call_count, 1)
        self.client.command.assert_called_once_with(
            "combined_hosts", parameters={"filter": "entity_type:'unmanaged'"}
        )
        self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_unmanaged_assets_with_error(self, mock_handle_response, mock_prepare_params):
        """Test search_unmanaged_assets method when an error occurs."""
        # Setup mocks
        mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        error_response = {"error": "API Error", "message": "Something went wrong"}
        mock_handle_response.return_value = error_response

        # Call the method
        result = self.module.search_unmanaged_assets(filter="platform_name:'Windows'")

        # Assertions
        self.assertEqual(result, [error_response])

    @patch("falcon_mcp.modules.discover.prepare_api_parameters")
    @patch("falcon_mcp.modules.discover.handle_api_response")
    def test_search_unmanaged_assets_with_all_params(self, mock_handle_response, mock_prepare_params):
        """Test search_unmanaged_assets method with all parameters."""
        # Setup mocks
        mock_prepare_params.return_value = {
            "filter": "entity_type:'unmanaged'+criticality:'Critical'",
            "limit": 50,
            "offset": 10,
            "sort": "hostname.asc",
        }
        mock_response = MagicMock()
        self.client.command.return_value = mock_response
        mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]

        # Call the method
        result = self.module.search_unmanaged_assets(
            filter="criticality:'Critical'",
            limit=50,
            offset=10,
            sort="hostname.asc",
        )

        # Assertions
        # Don't check the exact arguments, just verify it was called once
        self.assertEqual(mock_prepare_params.call_count, 1)
        self.client.command.assert_called_once_with(
            "combined_hosts",
            parameters={
                "filter": "entity_type:'unmanaged'+criticality:'Critical'",
                "limit": 50,
                "offset": 10,
                "sort": "hostname.asc",
            },
        )
        self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_idp.py:
--------------------------------------------------------------------------------

```python
"""
E2E tests for the Identity Protection (IDP) module.
"""

import unittest

import pytest

from tests.e2e.utils.base_e2e_test import BaseE2ETest


@pytest.mark.e2e
class TestIdpModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Identity Protection Module.
    """

    def test_investigate_entity_comprehensive(self):
        """Test comprehensive entity investigation - What can you tell me about the user 'Wallace Muniz'?"""

        async def test_logic():
            fixtures = [
                # First call: Entity name resolution (search for Wallace Muniz)
                {
                    "operation": "api_preempt_proxy_post_graphql",
                    "validator": lambda kwargs: (
                        "primaryDisplayNames" in kwargs.get("body", {}).get("query", "")
                        and "Wallace Muniz" in kwargs.get("body", {}).get("query", "")
                    ),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "data": {
                                "entities": {
                                    "nodes": [
                                        {
                                            "entityId": "wallace-muniz-001",
                                            "primaryDisplayName": "Wallace Muniz",
                                        }
                                    ]
                                }
                            }
                        },
                    },
                },
                # Second call: Comprehensive entity details (default investigation includes entity_details)
                {
                    "operation": "api_preempt_proxy_post_graphql",
                    "validator": lambda kwargs: (
                        "entityIds" in kwargs.get("body", {}).get("query", "")
                        and "wallace-muniz-001"
                        in kwargs.get("body", {}).get("query", "")
                        and "riskFactors" in kwargs.get("body", {}).get("query", "")
                    ),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "data": {
                                "entities": {
                                    "nodes": [
                                        {
                                            "entityId": "wallace-muniz-001",
                                            "primaryDisplayName": "Wallace Muniz",
                                            "secondaryDisplayName": "[email protected]",
                                            "type": "USER",
                                            "riskScore": 85.5,
                                            "riskScoreSeverity": "HIGH",
                                            "riskFactors": [
                                                {
                                                    "type": "EXCESSIVE_PRIVILEGES",
                                                    "severity": "HIGH",
                                                },
                                                {
                                                    "type": "SUSPICIOUS_ACTIVITY",
                                                    "severity": "MEDIUM",
                                                },
                                            ],
                                            "associations": [
                                                {
                                                    "bindingType": "MEMBER_OF",
                                                    "entity": {
                                                        "entityId": "admin-group-001",
                                                        "primaryDisplayName": "Domain Admins",
                                                        "secondaryDisplayName": "CORP.LOCAL\\Domain Admins",
                                                        "type": "ENTITY_CONTAINER",
                                                    },
                                                }
                                            ],
                                            "accounts": [
                                                {
                                                    "domain": "CORP.LOCAL",
                                                    "samAccountName": "wmuniz",
                                                    "passwordAttributes": {
                                                        "lastChange": "2024-01-10T08:30:00Z",
                                                        "strength": "STRONG",
                                                    },
                                                }
                                            ],
                                            "openIncidents": {
                                                "nodes": [
                                                    {
                                                        "type": "SUSPICIOUS_LOGIN",
                                                        "startTime": "2024-01-15T10:30:00Z",
                                                        "compromisedEntities": [
                                                            {
                                                                "entityId": "wallace-muniz-001",
                                                                "primaryDisplayName": "Wallace Muniz",
                                                            }
                                                        ],
                                                    }
                                                ]
                                            },
                                        }
                                    ]
                                }
                            }
                        },
                    },
                },
            ]

            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )

            # Comprehensive question that should trigger entity investigation
            prompt = "What can you tell me about the user Wallace Muniz?"
            return await self._run_agent_stream(prompt)

        def assertions(tools, result):
            # Basic checks - tool was called and we got a result
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")

            # Check that the IDP investigate entity tool was used
            used_tool = tools[-1]  # Get the last tool used
            tool_name = used_tool["input"]["tool_name"]
            self.assertEqual(
                tool_name,
                "falcon_idp_investigate_entity",
                f"Expected idp_investigate_entity tool, got: {tool_name}",
            )

            # Check that the tool was called with Wallace Muniz in entity_names
            tool_input = used_tool["input"]["tool_input"]
            self.assertIn(
                "entity_names",
                tool_input,
                "Tool should be called with entity_names parameter",
            )

            entity_names = tool_input.get("entity_names", [])
            self.assertTrue(
                any("Wallace Muniz" in name for name in entity_names),
                f"Tool should be called with Wallace Muniz in entity_names: {entity_names}",
            )

            # Check that we got comprehensive result mentioning the entity details
            result_lower = result.lower()
            self.assertIn("wallace", result_lower, "Result should mention Wallace")

            # Should mention some key details from the comprehensive response
            self.assertTrue(
                any(
                    keyword in result_lower
                    for keyword in ["risk", "high", "privileges", "admin"]
                ),
                "Result should mention risk-related information",
            )

            # Check that the mock API was called at least twice (entity resolution + details)
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                2,
                "API should be called at least twice for comprehensive investigation",
            )

            # Verify the API calls were made with expected GraphQL queries
            api_calls = self._mock_api_instance.command.call_args_list

            # First call should be entity name search
            first_call_query = api_calls[0][1].get("body", {}).get("query", "")
            self.assertIn(
                "primaryDisplayNames",
                first_call_query,
                "First call should search by primaryDisplayNames",
            )
            self.assertIn(
                "Wallace Muniz",
                first_call_query,
                "First call should search for Wallace Muniz",
            )

            # Second call should be detailed entity lookup
            second_call_query = api_calls[1][1].get("body", {}).get("query", "")
            self.assertIn(
                "entityIds", second_call_query, "Second call should lookup by entityIds"
            )
            self.assertIn(
                "riskFactors",
                second_call_query,
                "Second call should include risk factors",
            )

        self.run_test_with_retries(
            "test_investigate_entity_comprehensive", test_logic, assertions
        )


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/falcon_mcp/modules/cloud.py:
--------------------------------------------------------------------------------

```python
"""
Cloud module for Falcon MCP Server

This module provides tools for accessing and analyzing CrowdStrike Falcon cloud resources like
Kubernetes & Containers Inventory, Images Vulnerabilities, Cloud Assets.
"""

from textwrap import dedent
from typing import Any, Dict, List

from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field

from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.cloud import (
    IMAGES_VULNERABILITIES_FQL_DOCUMENTATION,
    KUBERNETES_CONTAINERS_FQL_DOCUMENTATION,
)

logger = get_logger(__name__)


class CloudModule(BaseModule):
    """Module for accessing and analyzing CrowdStrike Falcon cloud resources."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.

        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.search_kubernetes_containers,
            name="search_kubernetes_containers",
        )

        # fmt: off
        self._add_tool(
            server=server,
            method=self.count_kubernetes_containers,
            name="count_kubernetes_containers",
        )

        self._add_tool(
            server=server,
            method=self.search_images_vulnerabilities,
            name="search_images_vulnerabilities",
        )

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.
        Args:
            server: MCP server instance
        """
        kubernetes_containers_fql_resource = TextResource(
            uri=AnyUrl("falcon://cloud/kubernetes-containers/fql-guide"),
            name="falcon_kubernetes_containers_fql_filter_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_kubernetes_containers` and `falcon_count_kubernetes_containers` tools.",
            text=KUBERNETES_CONTAINERS_FQL_DOCUMENTATION,
        )

        images_vulnerabilities_fql_resource = TextResource(
            uri=AnyUrl("falcon://cloud/images-vulnerabilities/fql-guide"),
            name="falcon_images_vulnerabilities_fql_filter_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_images_vulnerabilities` tool.",
            text=IMAGES_VULNERABILITIES_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            kubernetes_containers_fql_resource,
        )
        self._add_resource(
            server,
            images_vulnerabilities_fql_resource,
        )

    def search_kubernetes_containers(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://cloud/kubernetes-containers/fql-guide` resource when building this filter parameter.",
            examples={"cloud:'AWS'", "cluster_name:'prod'"},
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=9999,
            description="The maximum number of containers to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return containers.",
        ),
        sort: str | None = Field(
            default=None,
            description=dedent(
                """
                Sort kubernetes containers using these options:

                cloud_name: Cloud provider name
                cloud_region: Cloud region name
                cluster_name: Kubernetes cluster name
                container_name: Kubernetes container name
                namespace: Kubernetes namespace name
                last_seen: Timestamp when the container was last seen
                first_seen: Timestamp when the container was first seen
                running_status: Container running status which is either true or false

                Sort either asc (ascending) or desc (descending).
                Both formats are supported: 'container_name.desc' or 'container_name|desc'

                When searching containers running vulnerable images, use 'image_vulnerability_count.desc' to get container with most images vulnerabilities.

                Examples: 'container_name.desc', 'last_seen.desc'
            """
            ).strip(),
            examples={"container_name.desc", "last_seen.desc"},
        ),
    ) -> List[Dict[str, Any]]:
        """Search for kubernetes containers in your CrowdStrike Kubernetes & Containers Inventory

        IMPORTANT: You must use the `falcon://cloud/kubernetes-containers/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for `falcon_search_kubernetes_containers` tool.
        """

        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
            }
        )

        # Define the operation name
        operation = "ReadContainerCombined"

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Handle the response
        return handle_api_response(
            response,
            operation=operation,
            error_message="Failed to perform operation",
            default_result=[],
        )

    def count_kubernetes_containers(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://cloud/kubernetes-containers/fql-guide` resource when building this filter parameter.",
            examples={"cloud:'Azure'", "container_name:'service'"},
        ),
    ) -> int:
        """Count kubernetes containers in your CrowdStrike Kubernetes & Containers Inventory

        IMPORTANT: You must use the `falcon://cloud/kubernetes-containers/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for `falcon_count_kubernetes_containers` tool.
        """

        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
            }
        )

        # Define the operation name
        operation = "ReadContainerCount"

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Handle the response
        return handle_api_response(
            response,
            operation=operation,
            error_message="Failed to perform operation",
            default_result=[],
        )

    def search_images_vulnerabilities(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://cloud/images-vulnerabilities/fql-guide` resource when building this filter parameter.",
            examples={"cve_id:*'*2025*'", "cvss_score:>5"},
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=9999,
            description="The maximum number of containers to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return containers.",
        ),
        sort: str | None = Field(
            default=None,
            description=dedent(
                """
                Sort images vulnerabilities using these options:

                cps_current_rating: CSP rating of the image vulnerability
                cve_id: CVE ID of the image vulnerability
                cvss_score: CVSS score of the image vulnerability
                images_impacted: Number of images impacted by the vulnerability

                Sort either asc (ascending) or desc (descending).
                Both formats are supported: 'container_name.desc' or 'container_name|desc'

                Examples: 'cvss_score.desc', 'cps_current_rating.asc'
            """
            ).strip(),
            examples={"cvss_score.desc", "cps_current_rating.asc"},
        ),
    ) -> List[Dict[str, Any]]:
        """Search for images vulnerabilities in your CrowdStrike Image Assessments

        IMPORTANT: You must use the `falcon://cloud/images-vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for `falcon_search_images_vulnerabilities` tool.
        """

        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
            }
        )

        # Define the operation name
        operation = "ReadCombinedVulnerabilities"

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Handle the response
        return handle_api_response(
            response,
            operation=operation,
            error_message="Failed to perform operation",
            default_result=[],
        )

```

--------------------------------------------------------------------------------
/falcon_mcp/resources/incidents.py:
--------------------------------------------------------------------------------

```python
"""
Contains Incidents resources.
"""

from falcon_mcp.common.utils import generate_md_table

# List of tuples containing filter options data: (name, type, operators, description)
CROWD_SCORE_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "id",
        "String",
        "Yes",
        """
        Unique identifier for the CrowdScore entity.
        """
    ),
    (
        "cid",
        "String",
        "No",
        """
        Customer ID.
        """
    ),
    (
        "timestamp",
        "Timestamp",
        "Yes",
        """
        Time when the CrowdScore was recorded (UTC).

        Ex: timestamp:>'2023-01-01T00:00:00Z'
        """
    ),
    (
        "score",
        "Number",
        "Yes",
        """
        The CrowdScore value.

        Ex: score:>50
        """
    ),
    (
        "adjusted_score",
        "Number",
        "Yes",
        """
        The adjusted CrowdScore value.
        """
    ),
    (
        "modified_timestamp",
        "Timestamp",
        "Yes",
        """
        Time when the CrowdScore was last modified (UTC).

        Ex: modified_timestamp:>'2023-01-01T00:00:00Z'
        """
    ),
]

CROWD_SCORE_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - CrowdScore Guide

=== BASIC SYNTAX ===
property_name:[operator]'value'

=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
• * = wildcard matching (one or more characters)

=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '*partial*'

=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions

=== falcon_show_crowd_score FQL filter options ===

""" + generate_md_table(CROWD_SCORE_FQL_FILTERS) + """

=== EXAMPLE USAGE ===

• score:>50
• timestamp:>'2023-01-01T00:00:00Z'
• modified_timestamp:>'2023-01-01T00:00:00Z'+score:>70

=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches: ['exact_value']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
"""

# List of tuples containing filter options data: (name, type, operators, description)
SEARCH_INCIDENTS_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "host_ids",
        "String",
        "No",
        """
        The device IDs of all the hosts on which the incident occurred.

        Ex: 9a07d39f8c9f430eb3e474d1a0c16ce9
        """
    ),
    (
        "lm_host_ids",
        "String",
        "No",
        """
        If lateral movement has occurred, this field shows the remote
        device IDs of the hosts on which the lateral movement occurred.

        Ex: c4e9e4643999495da6958ea9f21ee597
        """
    ),
    (
        "lm_hosts_capped",
        "Boolean",
        "No",
        """
        Indicates that the list of lateral movement hosts has been
        truncated. The limit is 15 hosts.

        Ex: True
        """
    ),
    (
        "name",
        "String",
        "Yes",
        """
        The name of the incident. Initially the name is assigned by
        CrowdScore, but it can be updated through the API.

        Ex: Incident on DESKTOP-27LTE3R at 2019-12-20T19:56:16Z
        """
    ),
    (
        "description",
        "String",
        "Yes",
        """
        The description of the incident. Initially the description is
        assigned by CrowdScore, but it can be updated through the API.

        Ex: Objectives in this incident: Keep Access.
            Techniques: Masquerading.
            Involved hosts and end users: DESKTOP-27LTE3R.
        """
    ),
    (
        "users",
        "String",
        "Yes",
        """
        The usernames of the accounts associated with the incident.

        Ex: someuser
        """
    ),
    (
        "tags",
        "String",
        "Yes",
        """
        Tags associated with the incident. CrowdScore will assign an
        initial set of tags, but tags can be added or removed through
        the API.

        Ex: Objective/Keep Access
        """
    ),
    (
        "final_score",
        "Number",
        "Yes",
        """
        The incident score. Divide the integer by 10 to match the
        displayed score for the incident.

        Ex: 56
        """
    ),
    (
        "start",
        "Timestamp",
        "Yes",
        """
        The recorded time of the earliest behavior.

        Ex: 2017-01-31T22:36:11Z
        """
    ),
    (
        "end",
        "Timestamp",
        "Yes",
        """
        The recorded time of the latest behavior.

        Ex: 2017-01-31T22:36:11Z
        """
    ),
    (
        "assigned_to_name",
        "String",
        "Yes",
        """
        The name of the user the incident is assigned to.
        """
    ),
    (
        "state",
        "String",
        "No",
        """
        The incident state: "open" or "closed"

        Ex: open
        """
    ),
    (
        "status",
        "Number",
        "No",
        """
        The incident status as a number:
        - 20: New
        - 25: Reopened
        - 30: In Progress
        - 40: Closed

        Ex: 20
        """
    ),
    (
        "modified_timestamp",
        "Timestamp",
        "Yes",
        """
        The most recent time a user has updated the incident.

        Ex: 2021-02-04T05:57:04Z
        """
    ),
]

SEARCH_INCIDENTS_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Search Incidents Guide

=== BASIC SYNTAX ===
property_name:[operator]'value'

=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
• * = wildcard matching (one or more characters)

=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '*partial*'

=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions

=== falcon_search_incidents FQL filter options ===

""" + generate_md_table(SEARCH_INCIDENTS_FQL_FILTERS) + """

=== EXAMPLE USAGE ===

• state:'open'
• status:'20'
• final_score:>50
• tags:'Objective/Keep Access'
• modified_timestamp:>'2023-01-01T00:00:00Z'
• state:'open'+final_score:>50

=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches: ['exact_value']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
• Status values: 20: New, 25: Reopened, 30: In Progress, 40: Closed
"""

# List of tuples containing filter options data: (name, type, operators, description)
SEARCH_BEHAVIORS_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "aid",
        "String",
        "No",
        """
        Agent ID of the host where the behavior was detected.

        Ex: 9a07d39f8c9f430eb3e474d1a0c16ce9
        """
    ),
    (
        "behavior_id",
        "String",
        "No",
        """
        Unique identifier for the behavior.
        """
    ),
    (
        "incident_id",
        "String",
        "No",
        """
        Incident ID that this behavior is associated with.
        """
    ),
    (
        "tactic",
        "String",
        "Yes",
        """
        MITRE ATT&CK tactic associated with the behavior.

        Ex: Defense Evasion
        """
    ),
    (
        "technique",
        "String",
        "Yes",
        """
        MITRE ATT&CK technique associated with the behavior.

        Ex: Masquerading
        """
    ),
    (
        "objective",
        "String",
        "Yes",
        """
        Attack objective associated with the behavior.

        Ex: Keep Access
        """
    ),
    (
        "timestamp",
        "Timestamp",
        "Yes",
        """
        When the behavior occurred (UTC).

        Ex: 2023-01-01T00:00:00Z
        """
    ),
    (
        "confidence",
        "Number",
        "Yes",
        """
        Confidence level of the behavior detection.

        Ex: 80
        """
    ),
]

SEARCH_BEHAVIORS_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Search Behaviors Guide

=== BASIC SYNTAX ===
property_name:[operator]'value'

=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
• * = wildcard matching (one or more characters)

=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '*partial*'

=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions

=== falcon_search_behaviors FQL filter options ===

""" + generate_md_table(SEARCH_BEHAVIORS_FQL_FILTERS) + """

=== EXAMPLE USAGE ===

• tactic:'Defense Evasion'
• technique:'Masquerading'
• timestamp:>'2023-01-01T00:00:00Z'
• tactic:'Persistence'+confidence:>80
• objective:'Keep Access'

=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches: ['exact_value']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
"""

```

--------------------------------------------------------------------------------
/scripts/test_results_viewer.html:
--------------------------------------------------------------------------------

```html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Test Results</title>
    <style>
        body {
            font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
            background-color: #f4f7f9;
            color: #333;
            margin: 0;
            padding: 20px;
        }
        h1, h2, h3 {
            color: #2c3e50;
        }
        h1 {
            text-align: center;
            margin-bottom: 30px;
        }
        #summary {
            background-color: #fff;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
            margin-bottom: 30px;
        }
        #results-container {
            display: flex;
            flex-direction: column;
            gap: 20px;
        }
        .test-group {
            background-color: #fff;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .module-group {
            background-color: #fff;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
            margin-bottom: 20px;
        }
        .module-group > h2 {
            color: #1976d2;
            border-bottom: 2px solid #e3f2fd;
            padding-bottom: 10px;
            margin-bottom: 20px;
        }
        .test-group {
            background-color: #f8f9fa;
            padding: 15px;
            border-radius: 6px;
            margin-bottom: 15px;
            border-left: 4px solid #2196f3;
        }
        .test-group > h3 {
            color: #2c3e50;
            margin-top: 0;
            margin-bottom: 15px;
        }
        .model-group {
            margin-top: 15px;
            padding-left: 20px;
            border-left: 3px solid #e0e0e0;
        }
        .model-group > h4 {
            color: #555;
            margin-bottom: 10px;
        }
        .run-grid {
            display: grid;
            grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
            gap: 15px;
            margin-top: 10px;
        }
        .test-run {
            padding: 15px;
            border-radius: 6px;
            border: 1px solid;
        }
        .test-run.success {
            background-color: #e8f5e9;
            border-color: #4caf50;
        }
        .test-run.failure {
            background-color: #ffebee;
            border-color: #f44336;
        }
        .test-run h4 {
            margin-top: 0;
        }
        .test-run h5 {
            margin-top: 0;
            margin-bottom: 10px;
            color: #333;
            font-size: 14px;
        }
        .failure-reason {
            color: #c62828;
            font-family: monospace;
            white-space: pre-wrap;
            background: #fff0f0;
            padding: 5px;
            border-radius: 4px;
        }
        details {
            margin-top: 10px;
        }
        summary {
            cursor: pointer;
            font-weight: bold;
        }
        .tools-content, .agent-result {
            margin-top: 10px;
            background: #fdfdfd;
            padding: 10px;
            border-radius: 4px;
            border: 1px solid #eee;
        }
        pre {
            white-space: pre-wrap;
            word-wrap: break-word;
            font-size: 13px;
        }
        code {
            font-family: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, Courier, monospace;
        }
    </style>
</head>
<body>
    <h1>MCP E2E Test Results</h1>
    <div id="summary">
        <h2>Summary</h2>
        <p>Total Tests Run: <span id="total-tests"></span></p>
        <p>Success Rate: <span id="success-rate"></span>%</p>
    </div>
    <div id="results-container"></div>

    <script>
        document.addEventListener('DOMContentLoaded', () => {
            fetch('test_results.json')
                .then(response => {
                    if (!response.ok) {
                        throw new Error('Network response was not ok');
                    }
                    return response.json();
                })
                .then(data => {
                    renderResults(data);
                })
                .catch(error => {
                    console.error('Error fetching or parsing test_results.json:', error);
                    document.getElementById('results-container').innerHTML = '<p style="color: red;">Could not load test results. Please ensure test_results.json is in the same directory and is valid JSON.</p>';
                });
        });

        function renderResults(data) {
            const container = document.getElementById('results-container');
            if (!data || data.length === 0) {
                container.innerHTML = '<p>No test results found.</p>';
                return;
            }

            // Calculate summary
            const totalRuns = data.length;
            const successfulRuns = data.filter(run => run.status === 'success').length;
            const successRate = totalRuns > 0 ? (successfulRuns / totalRuns * 100).toFixed(2) : 0;
            document.getElementById('total-tests').textContent = totalRuns;
            document.getElementById('success-rate').textContent = successRate;

            // Group first by module, then by test name
            const groupedByModule = data.reduce((acc, run) => {
                const moduleName = run.module_name || 'Unknown Module';
                const testName = run.test_name;
                
                if (!acc[moduleName]) {
                    acc[moduleName] = {};
                }
                if (!acc[moduleName][testName]) {
                    acc[moduleName][testName] = [];
                }
                acc[moduleName][testName].push(run);
                return acc;
            }, {});

            // Render modules
            for (const moduleName in groupedByModule) {
                const moduleGroupEl = document.createElement('div');
                moduleGroupEl.className = 'module-group';
                moduleGroupEl.innerHTML = `<h2>${moduleName}</h2>`;

                const tests = groupedByModule[moduleName];
                
                for (const testName in tests) {
                    const testGroupEl = document.createElement('div');
                    testGroupEl.className = 'test-group';
                    testGroupEl.innerHTML = `<h3>${testName}</h3>`;

                    const groupedByModel = tests[testName].reduce((acc, run) => {
                        const modelName = run.model_name;
                        if (!acc[modelName]) {
                            acc[modelName] = [];
                        }
                        acc[modelName].push(run);
                        return acc;
                    }, {});

                    for (const modelName in groupedByModel) {
                        const modelGroupEl = document.createElement('div');
                        modelGroupEl.className = 'model-group';
                        modelGroupEl.innerHTML = `<h4>${modelName}</h4>`;

                        const runGridEl = document.createElement('div');
                        runGridEl.className = 'run-grid';

                        groupedByModel[modelName].forEach(run => {
                            const runEl = document.createElement('div');
                            runEl.className = `test-run ${run.status}`;
                            let runContent = `<h5>Run ${run.run_number} - ${run.status.toUpperCase()}</h5>`;
                            
                            if (run.status === 'failure' && run.failure_reason) {
                                runContent += `<p><strong>Failure Reason:</strong></p><pre class="failure-reason"><code>${escapeHtml(run.failure_reason)}</code></pre>`;
                            }

                            runContent += `
                                <details>
                                    <summary>Agent Result</summary>
                                    <div class="agent-result"><pre><code>${escapeHtml(run.agent_result || 'No result')}</code></pre></div>
                                </details>
                            `;

                            if (run.tools_used && run.tools_used.length > 0) {
                                runContent += `
                                    <details>
                                        <summary>Tools Used (${run.tools_used.length})</summary>
                                        <div class="tools-content">
                                            <pre><code>${escapeHtml(JSON.stringify(run.tools_used, null, 2))}</code></pre>
                                        </div>
                                    </details>
                                `;
                            } else {
                                runContent += `<p>No tools were used.</p>`;
                            }

                            runEl.innerHTML = runContent;
                            runGridEl.appendChild(runEl);
                        });

                        modelGroupEl.appendChild(runGridEl);
                        testGroupEl.appendChild(modelGroupEl);
                    }

                    moduleGroupEl.appendChild(testGroupEl);
                }

                container.appendChild(moduleGroupEl);
            }
        }

        function escapeHtml(unsafe) {
            if (unsafe === null || typeof unsafe === 'undefined') {
                return '';
            }
            return unsafe
                .toString()
                .replace(/&/g, "&amp;")
                .replace(/</g, "&lt;")
                .replace(/>/g, "&gt;")
                .replace(/"/g, "&quot;")
                .replace(/'/g, "&#039;");
        }

    </script>
</body>
</html> 
```

--------------------------------------------------------------------------------
/falcon_mcp/resources/hosts.py:
--------------------------------------------------------------------------------

```python
"""
Contains Hosts resources.
"""

from falcon_mcp.common.utils import generate_md_table

# List of tuples containing filter options data: (name, type, operators, description)
SEARCH_HOSTS_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "device_id",
        "String",
        "No",
        """
        The ID of the device.

        Ex: 061a51ec742c44624a176f079d742052
        """
    ),
    (
        "agent_load_flags",
        "String",
        "No",
        """
        Agent configuration field
        """
    ),
    (
        "agent_version",
        "String",
        "No",
        """
        Agent version.

        Ex: 7.26.17905.0
        """
    ),
    (
        "bios_manufacturer",
        "String",
        "No",
        """
        BIOS manufacturer.

        Ex: Phoenix Technologies LTD
        """
    ),
    (
        "bios_version",
        "String",
        "No",
        """
        BIOS version.

        Ex: 6.00
        """
    ),
    (
        "config_id_base",
        "String",
        "No",
        """
        Agent configuration field
        """
    ),
    (
        "config_id_build",
        "String",
        "No",
        """
        Agent configuration field
        """
    ),
    (
        "config_id_platform",
        "String",
        "No",
        """
        Agent configuration field
        """
    ),
    (
        "cpu_signature",
        "String",
        "Yes",
        """
        CPU signature.

        Ex: GenuineIntel
        """
    ),
    (
        "cid",
        "String",
        "No",
        """
        Customer ID
        """
    ),
    (
        "deployment_type",
        "String",
        "Yes",
        """
        Linux deployment type: Standard, DaemonSet
        """
    ),
    (
        "external_ip",
        "IP Address",
        "Yes",
        """
        External IP address.

        Ex: 192.0.2.100
        """
    ),
    (
        "first_seen",
        "Timestamp",
        "Yes",
        """
        First connection timestamp (UTC).

        Ex: first_seen:>'2016-07-19T11:14:15Z'
        """
    ),
    (
        "groups",
        "String",
        "No",
        """
        Host group ID.

        Ex: groups:'0bd018b7bd8b47cc8834228a294eabf2'
        """
    ),
    (
        "hostname",
        "String",
        "No",
        """
        The name of the machine. ⚠️ LIMITED wildcard support:
        - hostname:'PC*' (prefix) - ✅ WORKS
        - hostname:'*-01' (suffix) - ✅ WORKS
        - hostname:'*server*' (contains) - ❌ FAILS

        Ex: hostname:'WinPC9251' or hostname:'PC*'
        """
    ),
    (
        "instance_id",
        "String",
        "No",
        """
        Cloud resource information (EC2 instance ID, Azure VM ID,
        GCP instance ID, etc.).

        Ex: instance_id:'i-0dc41d0939384cd15'
        Ex: instance_id:'f9d3cef9-0123-4567-8901-123456789def'
        """
    ),
    (
        "kernel_version",
        "String",
        "No",
        """
        Kernel version of the host OS.

        Ex: kernel_version:'6.1.7601.18741'
        """
    ),
    (
        "last_login_timestamp",
        "Timestamp",
        "Yes",
        """
        User logon event timestamp, once a week.
        """
    ),
    (
        "last_seen",
        "Timestamp",
        "Yes",
        """
        Last connection timestamp (UTC).

        Ex: last_seen:<'2016-07-19T11:14:15Z'
        """
    ),
    (
        "linux_sensor_mode",
        "String",
        "Yes",
        """
        Linux sensor mode: Kernel Mode, User Mode
        """
    ),
    (
        "local_ip",
        "IP Address",
        "No",
        """
        Local IP address.

        Ex: 192.0.2.1
        """
    ),
    (
        "local_ip.raw",
        "IP Address with wildcards",
        "No",
        """
        Local IP with wildcard support. Use * prefix:

        Ex: local_ip.raw:*'192.0.2.*'
        Ex: local_ip.raw:*'*.0.2.100'
        """
    ),
    (
        "mac_address",
        "String",
        "No",
        """
        The MAC address of the device

        Ex: 2001:db8:ffff:ffff:ffff:ffff:ffff:ffff
        """
    ),
    (
        "machine_domain",
        "String",
        "No",
        """
        Active Directory domain name.
        """
    ),
    (
        "major_version",
        "String",
        "No",
        """
        Major version of the Operating System
        """
    ),
    (
        "minor_version",
        "String",
        "No",
        """
        Minor version of the Operating System
        """
    ),
    (
        "modified_timestamp",
        "Timestamp",
        "Yes",
        """
        Last record update timestamp (UTC)
        """
    ),
    (
        "os_version",
        "String",
        "No",
        """
        Operating system version.

        Ex: Windows 7
        """
    ),
    (
        "ou",
        "String",
        "No",
        """
        Active Directory organizational unit name
        """
    ),
    (
        "platform_id",
        "String",
        "No",
        """
        Agent configuration field
        """
    ),
    (
        "platform_name",
        "String",
        "No",
        """
        Operating system platform:
        Windows, Mac, Linux
        """
    ),
    (
        "product_type_desc",
        "String",
        "No",
        """
        Product type: Server, Workstation
        """
    ),
    (
        "reduced_functionality_mode",
        "String",
        "Yes",
        """
        Reduced functionality mode status: yes, no, or ""

        Ex: reduced_functionality_mode:'no'
        """
    ),
    (
        "release_group",
        "String",
        "No",
        """
        Deployment group name
        """
    ),
    (
        "serial_number",
        "String",
        "Yes",
        """
        Serial number of the device.

        Ex: C42AFKEBM563
        """
    ),
    (
        "service_provider",
        "String",
        "No",
        """
        The cloud service provider.

        Available options:
        - AWS_EC2_V2
        - AZURE
        - GCP

        Ex: service_provider:'AZURE'
        """
    ),
    (
        "service_provider_account_id",
        "String",
        "No",
        """
        The cloud account ID (AWS Account ID, Azure Subscription ID,
        GCP Project ID, etc.).

        Ex: service_provider_account_id:'99841e6a-b123-4567-8901-123456789abc'
        """
    ),
    (
        "site_name",
        "String",
        "No",
        """
        Active Directory site name.
        """
    ),
    (
        "status",
        "String",
        "No",
        """
        Containment Status of the machine. "Normal" denotes good
        operations; other values might mean reduced functionality
        or support.

        Possible values:
        - normal
        - containment_pending
        - contained
        - lift_containment_pending
        """
    ),
    (
        "system_manufacturer",
        "String",
        "No",
        """
        Name of system manufacturer

        Ex: VMware, Inc.
        """
    ),
    (
        "system_product_name",
        "String",
        "No",
        """
        Name of system product

        Ex: VMware Virtual Platform
        """
    ),
    (
        "tags",
        "String",
        "No",
        """
        Falcon grouping tags
        """
    ),
]

SEARCH_HOSTS_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Search Hosts Guide

=== BASIC SYNTAX ===
property_name:[operator]'value'

=== AVAILABLE OPERATORS ===

✅ **WORKING OPERATORS:**
• No operator = equals (default) - ALL FIELDS
• ! = not equal to - ALL FIELDS
• > = greater than - TIMESTAMP FIELDS ONLY
• >= = greater than or equal - TIMESTAMP FIELDS ONLY
• < = less than - TIMESTAMP FIELDS ONLY
• <= = less than or equal - TIMESTAMP FIELDS ONLY
• ~ = text match (case insensitive) - TEXT FIELDS ONLY
• * = wildcard matching - LIMITED SUPPORT (see examples below)

❌ **NON-WORKING OPERATORS:**
• !~ = does not text match - NOT SUPPORTED
• Simple wildcards (field:*) - NOT SUPPORTED

=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '*partial*'

=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions

=== falcon_search_hosts FQL filter options ===

""" + generate_md_table(SEARCH_HOSTS_FQL_FILTERS) + """

=== ✅ WORKING PATTERNS ===

**Basic Equality:**
• platform_name:'Windows', platform_name:'Linux', platform_name:'Mac'
• product_type_desc:'Server', product_type_desc:'Workstation'
• status:'normal', reduced_functionality_mode:'no'
• service_provider:'AZURE', service_provider:'AWS_EC2_V2', service_provider:'GCP'

**Combined Conditions:**
• service_provider:'AZURE'+platform_name:'Linux'
• platform_name:'Linux'+product_type_desc:'Server'
• (service_provider:'AZURE',service_provider:'AWS_EC2_V2')+platform_name:'Linux'

**Timestamp Comparisons:**
• first_seen:>'2020-01-01T00:00:00Z'
• first_seen:>='2020-01-01T00:00:00Z'
• last_seen:<='2024-12-31T23:59:59Z'

**Inequality Filters:**
• platform_name:!'Windows' (non-Windows hosts)
• service_provider_account_id:!'' (not empty)
• instance_id:!'' (not empty)

**Hostname Wildcards (Limited):**
• hostname:'PC*' (prefix) ✅
• hostname:'*-01' (suffix) ✅
• hostname:'*server*' (contains) ❌ Does NOT work

**IP Address Wildcards:**
• local_ip.raw:*'192.168.*'
• local_ip.raw:*'10.*'

**Text Match:**
• hostname:~'server'
• os_version:~'windows'

=== ❌ PATTERNS TO AVOID ===
• Simple wildcards: service_provider_account_id:*, hostname:*, etc.
• Contains wildcards: hostname:'*server*'
• Wrong IP syntax: local_ip:*

=== 💡 SYNTAX RULES ===
• Use single quotes around string values: 'value'
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
• Combine conditions with + (AND) or , (OR)
• Use parentheses for grouping: (condition1,condition2)+condition3
"""

```

--------------------------------------------------------------------------------
/tests/modules/test_intel.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Intel module.
"""

import unittest

from falcon_mcp.modules.intel import IntelModule
from tests.modules.utils.test_modules import TestModules


class TestIntelModule(TestModules):
    """Test cases for the Intel module."""

    def setUp(self):
        """Set up test fixtures."""
        self.setup_module(IntelModule)

    def test_register_tools(self):
        """Test registering tools with the server."""
        expected_tools = [
            "falcon_search_actors",
            "falcon_search_indicators",
            "falcon_search_reports",
        ]
        self.assert_tools_registered(expected_tools)

    def test_register_resources(self):
        """Test registering resources with the server."""
        expected_resources = [
            "falcon_search_actors_fql_guide",
            "falcon_search_indicators_fql_guide",
            "falcon_search_reports_fql_guide",
        ]
        self.assert_resources_registered(expected_resources)

    def test_search_actors_success(self):
        """Test searching actors with successful response."""
        # Setup mock response with sample actors
        mock_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {"id": "actor1", "name": "Actor 1", "description": "Description 1"},
                    {"id": "actor2", "name": "Actor 2", "description": "Description 2"},
                ]
            },
        }
        self.mock_client.command.return_value = mock_response

        # Call search_actors with test parameters
        result = self.module.query_actor_entities(
            filter="name:'Actor*'",
            limit=100,
            offset=0,
            sort="name.asc",
            q="test",
        )

        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "QueryIntelActorEntities",
            parameters={
                "filter": "name:'Actor*'",
                "limit": 100,
                "offset": 0,
                "sort": "name.asc",
                "q": "test",
            },
        )

        # Verify result contains expected values
        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]["id"], "actor1")
        self.assertEqual(result[1]["id"], "actor2")

    def test_search_actors_empty_response(self):
        """Test searching actors with empty response."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response

        # Call search_actors
        result = self.module.query_actor_entities()

        # Verify client command was called with the correct operation
        self.assertEqual(self.mock_client.command.call_count, 1)
        call_args = self.mock_client.command.call_args
        self.assertEqual(call_args[0][0], "QueryIntelActorEntities")

        # Verify result is an empty list
        self.assertEqual(result, [])

    def test_search_actors_error(self):
        """Test searching actors with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Invalid query"}]},
        }
        self.mock_client.command.return_value = mock_response

        # Call search_actors
        results = self.module.query_actor_entities(filter="invalid query")
        result = results[0]

        # Verify result contains error
        self.assertIn("error", result)
        self.assertIn("details", result)
        # Check that the error message starts with the expected prefix
        self.assertTrue(result["error"].startswith("Failed to search actors"))

    def test_query_indicator_entities_success(self):
        """Test querying indicator entities with successful response."""
        # Setup mock response with sample indicators
        mock_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "id": "indicator1",
                        "indicator": "malicious.com",
                        "type": "domain",
                    },
                    {
                        "id": "indicator2",
                        "indicator": "192.168.1.1",
                        "type": "ip_address",
                    },
                ]
            },
        }
        self.mock_client.command.return_value = mock_response

        # Call query_indicator_entities with test parameters
        result = self.module.query_indicator_entities(
            filter="type:'domain'",
            limit=100,
            offset=0,
            sort="published_date.desc",
            q="malicious",
            include_deleted=True,
            include_relations=True,
        )

        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "QueryIntelIndicatorEntities",
            parameters={
                "filter": "type:'domain'",
                "limit": 100,
                "offset": 0,
                "sort": "published_date.desc",
                "q": "malicious",
                "include_deleted": True,
                "include_relations": True,
            },
        )

        # Verify result contains expected values
        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]["id"], "indicator1")
        self.assertEqual(result[1]["id"], "indicator2")

    def test_query_indicator_entities_empty_response(self):
        """Test querying indicator entities with empty response."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response

        # Call query_indicator_entities
        result = self.module.query_indicator_entities()

        # Verify client command was called with the correct operation
        self.assertEqual(self.mock_client.command.call_count, 1)
        call_args = self.mock_client.command.call_args
        self.assertEqual(call_args[0][0], "QueryIntelIndicatorEntities")

        # Verify result is an empty list
        self.assertEqual(result, [])

    def test_query_indicator_entities_error(self):
        """Test querying indicator entities with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Invalid query"}]},
        }
        self.mock_client.command.return_value = mock_response

        # Call query_indicator_entities
        result = self.module.query_indicator_entities(filter="invalid query")

        # Verify result contains error
        self.assertEqual(len(result), 1)
        self.assertIn("error", result[0])
        self.assertIn("details", result[0])
        # Check that the error message starts with the expected prefix
        self.assertTrue(result[0]["error"].startswith("Failed to search indicators"))

    def test_query_report_entities_success(self):
        """Test querying report entities with successful response."""
        # Setup mock response with sample reports
        mock_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "id": "report1",
                        "name": "Report 1",
                        "description": "Description 1",
                    },
                    {
                        "id": "report2",
                        "name": "Report 2",
                        "description": "Description 2",
                    },
                ]
            },
        }
        self.mock_client.command.return_value = mock_response

        # Call query_report_entities with test parameters
        result = self.module.query_report_entities(
            filter="name:'Report*'",
            limit=100,
            offset=0,
            sort="created_date.desc",
            q="test",
        )

        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "QueryIntelReportEntities",
            parameters={
                "filter": "name:'Report*'",
                "limit": 100,
                "offset": 0,
                "sort": "created_date.desc",
                "q": "test",
            },
        )

        # Verify result contains expected values
        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]["id"], "report1")
        self.assertEqual(result[1]["id"], "report2")

    def test_query_report_entities_empty_response(self):
        """Test querying report entities with empty response."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response

        # Call query_report_entities
        result = self.module.query_report_entities()

        # Verify client command was called with the correct operation
        self.assertEqual(self.mock_client.command.call_count, 1)
        call_args = self.mock_client.command.call_args
        self.assertEqual(call_args[0][0], "QueryIntelReportEntities")

        # Verify result is an empty list
        self.assertEqual(result, [])

    def test_query_report_entities_error(self):
        """Test querying report entities with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Invalid query"}]},
        }
        self.mock_client.command.return_value = mock_response

        # Call query_report_entities
        result = self.module.query_report_entities(filter="invalid query")

        # Verify result contains error
        self.assertEqual(len(result), 1)
        self.assertIn("error", result[0])
        self.assertIn("details", result[0])
        # Check that the error message starts with the expected prefix
        self.assertTrue(result[0]["error"].startswith("Failed to search reports"))


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/test_client.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Falcon API client.
"""

import platform
import sys
import unittest
from unittest.mock import MagicMock, patch

from falcon_mcp.client import FalconClient


class TestFalconClient(unittest.TestCase):
    """Test cases for the Falcon API client."""

    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_client_initialization(self, mock_apiharness, mock_environ_get):
        """Test client initialization with base URL."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Create client with base URL
        _client = FalconClient(base_url="https://api.test.crowdstrike.com", debug=True)

        # Verify APIHarnessV2 was initialized correctly with config values
        mock_apiharness.assert_called_once()
        call_args = mock_apiharness.call_args[1]
        self.assertEqual(call_args["client_id"], "test-client-id")
        self.assertEqual(call_args["client_secret"], "test-client-secret")
        self.assertEqual(call_args["base_url"], "https://api.test.crowdstrike.com")
        self.assertTrue(call_args["debug"])

    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_client_initialization_with_env_vars(
        self, mock_apiharness, mock_environ_get
    ):
        """Test client initialization with environment variables."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "env-client-id",
            "FALCON_CLIENT_SECRET": "env-client-secret",
            "FALCON_BASE_URL": "https://api.env.crowdstrike.com",
        }.get(key, default)

        # Create client with environment variables
        _client = FalconClient()

        # Verify APIHarnessV2 was initialized correctly
        mock_apiharness.assert_called_once()
        call_args = mock_apiharness.call_args[1]
        self.assertEqual(call_args["client_id"], "env-client-id")
        self.assertEqual(call_args["client_secret"], "env-client-secret")
        self.assertEqual(call_args["base_url"], "https://api.env.crowdstrike.com")
        self.assertFalse(call_args["debug"])

    @patch("falcon_mcp.client.os.environ.get")
    def test_client_initialization_missing_credentials(self, mock_environ_get):
        """Test client initialization with missing credentials."""
        # Setup mock environment variables (missing credentials)
        mock_environ_get.return_value = None

        # Verify ValueError is raised when credentials are missing
        with self.assertRaises(ValueError):
            FalconClient()

    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_authenticate(self, mock_apiharness, mock_environ_get):
        """Test authenticate method."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Setup mock
        mock_instance = MagicMock()
        mock_instance.login.return_value = True
        mock_apiharness.return_value = mock_instance

        # Create client and authenticate
        client = FalconClient()
        result = client.authenticate()

        # Verify login was called and result is correct
        mock_instance.login.assert_called_once()
        self.assertTrue(result)

    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_is_authenticated(self, mock_apiharness, mock_environ_get):
        """Test is_authenticated method."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Setup mock
        mock_instance = MagicMock()
        mock_instance.token_valid = True
        mock_apiharness.return_value = mock_instance

        # Create client and check authentication status
        client = FalconClient()
        result = client.is_authenticated()

        # Verify result is correct
        self.assertTrue(result)

    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_get_headers(self, mock_apiharness, mock_environ_get):
        """Test get_headers method."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Setup mock
        mock_instance = MagicMock()
        mock_instance.auth_headers = {"Authorization": "Bearer test-token"}
        mock_apiharness.return_value = mock_instance

        # Create client and get headers
        client = FalconClient()
        headers = client.get_headers()

        # Verify headers are correct
        self.assertEqual(headers, {"Authorization": "Bearer test-token"})

    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_command(self, mock_apiharness, mock_environ_get):
        """Test command method."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Setup mock
        mock_instance = MagicMock()
        mock_instance.command.return_value = {
            "status_code": 200,
            "body": {"resources": [{"id": "test"}]},
        }
        mock_apiharness.return_value = mock_instance

        # Create client and execute command
        client = FalconClient()
        response = client.command("TestOperation", parameters={"filter": "test"})

        # Verify command was called with correct arguments
        mock_instance.command.assert_called_once_with(
            "TestOperation", parameters={"filter": "test"}
        )

        # Verify response is correct
        self.assertEqual(response["status_code"], 200)
        self.assertEqual(response["body"]["resources"][0]["id"], "test")

    @patch("falcon_mcp.client.version")
    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_get_user_agent_best_case_scenario(
        self, mock_apiharness, mock_environ_get, mock_version
    ):
        """Test get_user_agent method in the best case scenario with all packages installed."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Setup mock version calls for best case scenario
        def version_side_effect(package_name):
            if package_name == "falcon-mcp":
                return "1.2.3"
            if package_name == "crowdstrike-falconpy":
                return "1.3.4"
            raise ValueError(f"Unexpected package: {package_name}")

        mock_version.side_effect = version_side_effect
        mock_apiharness.return_value = MagicMock()

        # Create client and get user agent
        client = FalconClient()
        user_agent = client.get_user_agent()

        # Verify user agent format and content
        python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
        platform_info = f"{platform.system()}/{platform.release()}"
        expected = f"falcon-mcp/1.2.3 (falconpy/1.3.4; Python/{python_version}; {platform_info})"

        self.assertEqual(user_agent, expected)

        # Verify user agent is properly used in APIHarnessV2 initialization
        mock_apiharness.assert_called_once()
        call_args = mock_apiharness.call_args[1]
        self.assertEqual(call_args["user_agent"], expected)

        # Verify format components
        self.assertTrue(user_agent.startswith("falcon-mcp/1.2.3"))
        self.assertIn(f"Python/{python_version}", user_agent)
        self.assertIn(platform_info, user_agent)
        self.assertIn("falconpy/1.3.4", user_agent)

    @patch("falcon_mcp.client.version")
    @patch("falcon_mcp.client.os.environ.get")
    @patch("falcon_mcp.client.APIHarnessV2")
    def test_get_user_agent_with_user_agent_comment(
        self, mock_apiharness, mock_environ_get, mock_version
    ):
        """Test get_user_agent method with a user agent comment."""
        # Setup mock environment variables
        mock_environ_get.side_effect = lambda key, default=None: {
            "FALCON_CLIENT_ID": "test-client-id",
            "FALCON_CLIENT_SECRET": "test-client-secret",
        }.get(key, default)

        # Setup mock version calls
        def version_side_effect(package_name):
            if package_name == "falcon-mcp":
                return "1.2.3"
            if package_name == "crowdstrike-falconpy":
                return "1.3.4"
            raise ValueError(f"Unexpected package: {package_name}")

        mock_version.side_effect = version_side_effect
        mock_apiharness.return_value = MagicMock()

        # Create client with user agent comment
        client = FalconClient(user_agent_comment="CustomApp/1.0")
        user_agent = client.get_user_agent()

        # Verify user agent format and content (RFC-compliant format)
        python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
        platform_info = f"{platform.system()}/{platform.release()}"
        expected = f"falcon-mcp/1.2.3 (CustomApp/1.0; falconpy/1.3.4; Python/{python_version}; {platform_info})"

        self.assertEqual(user_agent, expected)

        # Verify user agent is properly used in APIHarnessV2 initialization
        mock_apiharness.assert_called_once()
        call_args = mock_apiharness.call_args[1]
        self.assertEqual(call_args["user_agent"], expected)


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/falcon_mcp/modules/intel.py:
--------------------------------------------------------------------------------

```python
"""
Intel module for Falcon MCP Server

This module provides tools for accessing and analyzing CrowdStrike Falcon intelligence data.
"""

from typing import Any, Dict, List

from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field

from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.intel import (
    QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION,
    QUERY_INDICATOR_ENTITIES_FQL_DOCUMENTATION,
    QUERY_REPORT_ENTITIES_FQL_DOCUMENTATION,
)

logger = get_logger(__name__)


class IntelModule(BaseModule):
    """Module for accessing and analyzing CrowdStrike Falcon intelligence data."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.

        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.query_actor_entities,
            name="search_actors",
        )

        self._add_tool(
            server=server,
            method=self.query_indicator_entities,
            name="search_indicators",
        )

        self._add_tool(
            server=server,
            method=self.query_report_entities,
            name="search_reports",
        )

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """
        search_actors_fql_resource = TextResource(
            uri=AnyUrl("falcon://intel/actors/fql-guide"),
            name="falcon_search_actors_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_actors` tool.",
            text=QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION,
        )

        search_indicators_fql_resource = TextResource(
            uri=AnyUrl("falcon://intel/indicators/fql-guide"),
            name="falcon_search_indicators_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_indicators` tool.",
            text=QUERY_INDICATOR_ENTITIES_FQL_DOCUMENTATION,
        )

        search_reports_fql_resource = TextResource(
            uri=AnyUrl("falcon://intel/reports/fql-guide"),
            name="falcon_search_reports_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_reports` tool.",
            text=QUERY_REPORT_ENTITIES_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            search_actors_fql_resource,
        )
        self._add_resource(
            server,
            search_indicators_fql_resource,
        )
        self._add_resource(
            server,
            search_reports_fql_resource,
        )

    def query_actor_entities(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL query expression that should be used to limit the results. IMPORTANT: use the `falcon://intel/actors/fql-guide` resource when building this filter parameter.",
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=5000,
            description="Maximum number of records to return. Max 5000",
            examples={10, 20, 100},
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return ids.",
            examples=[0, 10],
        ),
        sort: str | None = Field(
            default=None,
            description="The field and direction to sort results on. The format is {field}|{asc/desc}. Valid values include: name, target_countries, target_industries, type, created_date, last_activity_date and last_modified_date. Ex: created_date|desc",
            examples={"created_date|desc"},
        ),
        q: str | None = Field(
            default=None,
            description="Free text search across all indexed fields.",
            examples={"BEAR"},
        ),
    ) -> List[Dict[str, Any]]:
        """Research threat actors and adversary groups tracked by CrowdStrike intelligence.

        IMPORTANT: You must use the `falcon://intel/actors/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_actors` tool.
        """
        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
                "q": q,
            }
        )

        # Define the operation name
        operation = "QueryIntelActorEntities"

        logger.debug("Searching actors with params: %s", params)

        # Make the API request
        command_response = self.client.command(operation, parameters=params)

        # Handle the response
        api_response = handle_api_response(
            command_response,
            operation=operation,
            error_message="Failed to search actors",
            default_result=[],
        )

        if self._is_error(api_response):
            return [api_response]

        return api_response

    def query_indicator_entities(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL query expression that should be used to limit the results. IMPORTANT: use the `falcon://intel/indicators/fql-guide` resource when building this filter parameter.",
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=5000,
            description="Maximum number of records to return. (Max: 5000)",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return ids.",
        ),
        sort: str | None = Field(
            default=None,
            description="The field and direction to sort results on. The format is {field}|{asc/desc}. Valid values are: id, indicator, type, published_date, last_updated, and _marker. Ex: published_date|desc",
            examples={"published_date|desc"},
        ),
        q: str | None = Field(
            default=None,
            description="Free text search across all indexed fields.",
        ),
        include_deleted: bool = Field(
            default=False,
            description="Flag indicating if both published and deleted indicators should be returned.",
        ),
        include_relations: bool = Field(
            default=False,
            description="Flag indicating if related indicators should be returned.",
        ),
    ) -> List[Dict[str, Any]]:
        """Search for threat indicators and indicators of compromise (IOCs) from CrowdStrike intelligence.

        IMPORTANT: You must use the `falcon://intel/indicators/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_indicators` tool.
        """
        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
                "q": q,
                "include_deleted": include_deleted,
                "include_relations": include_relations,
            }
        )

        # Define the operation name
        operation = "QueryIntelIndicatorEntities"

        logger.debug("Searching indicators with params: %s", params)

        # Make the API request
        command_response = self.client.command(operation, parameters=params)

        # Handle the response
        api_response = handle_api_response(
            command_response,
            operation=operation,
            error_message="Failed to search indicators",
            default_result=[],
        )

        if self._is_error(api_response):
            return [api_response]

        return api_response

    def query_report_entities(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL query expression that should be used to limit the results. IMPORTANT: use the `falcon://intel/reports/fql-guide` resource when building this filter parameter.",
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=5000,
            description="Maximum number of records to return. (Max: 5000)",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return ids.",
        ),
        sort: str | None = Field(
            default=None,
            description="The field and direction to sort results on in the format of: {field}.{asc}or {field}.{desc}. Valid values include: name, target_countries, target_industries, type, created_date, last_modified_date. Ex: created_date|desc",
            examples={"created_date|desc"},
        ),
        q: str | None = Field(
            default=None,
            description="Free text search across all indexed fields.",
        ),
    ) -> List[Dict[str, Any]]:
        """Access CrowdStrike intelligence publications and threat reports.

        This tool returns comprehensive intelligence report details based on your search criteria.
        Use this when you need to find CrowdStrike intelligence publications matching specific conditions.
        For guidance on building FQL filters, use the `falcon://intel/reports/fql-guide` resource.

        IMPORTANT: You must use the `falcon://intel/reports/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_reports` tool.
        """
        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
                "q": q,
            }
        )

        # Define the operation name
        operation = "QueryIntelReportEntities"

        logger.debug("Searching reports with params: %s", params)

        # Make the API request
        command_response = self.client.command(operation, parameters=params)

        # Handle the response
        api_response = handle_api_response(
            command_response,
            operation=operation,
            error_message="Failed to search reports",
            default_result=[],
        )

        # If handle_api_response returns an error dict instead of a list,
        # it means there was an error, so we return it wrapped in a list
        if self._is_error(api_response):
            return [api_response]

        return api_response

```

--------------------------------------------------------------------------------
/falcon_mcp/server.py:
--------------------------------------------------------------------------------

```python
"""
Falcon MCP Server - Main entry point

This module provides the main server class for the Falcon MCP server
and serves as the entry point for the application.
"""

import argparse
import os
import sys
from typing import Dict, List, Optional, Set

import uvicorn
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP

from falcon_mcp import registry
from falcon_mcp.client import FalconClient
from falcon_mcp.common.logging import configure_logging, get_logger

logger = get_logger(__name__)


class FalconMCPServer:
    """Main server class for the Falcon MCP server."""

    def __init__(
        self,
        base_url: Optional[str] = None,
        debug: bool = False,
        enabled_modules: Optional[Set[str]] = None,
        user_agent_comment: Optional[str] = None,
    ):
        """Initialize the Falcon MCP server.

        Args:
            base_url: Falcon API base URL
            debug: Enable debug logging
            enabled_modules: Set of module names to enable (defaults to all modules)
            user_agent_comment: Additional information to include in the User-Agent comment section
        """
        # Store configuration
        self.base_url = base_url
        self.debug = debug
        self.user_agent_comment = user_agent_comment

        self.enabled_modules = enabled_modules or set(registry.get_module_names())

        # Configure logging
        configure_logging(debug=self.debug)
        logger.info("Initializing Falcon MCP Server")

        # Initialize the Falcon client
        self.falcon_client = FalconClient(
            base_url=self.base_url,
            debug=self.debug,
            user_agent_comment=self.user_agent_comment,
        )

        # Authenticate with the Falcon API
        if not self.falcon_client.authenticate():
            logger.error("Failed to authenticate with the Falcon API")
            raise RuntimeError("Failed to authenticate with the Falcon API")

        # Initialize the MCP server
        self.server = FastMCP(
            name="Falcon MCP Server",
            instructions="This server provides access to CrowdStrike Falcon capabilities.",
            debug=self.debug,
            log_level="DEBUG" if self.debug else "INFO",
        )

        # Initialize and register modules
        self.modules = {}
        available_modules = registry.get_available_modules()
        for module_name in self.enabled_modules:
            if module_name in available_modules:
                module_class = available_modules[module_name]
                self.modules[module_name] = module_class(self.falcon_client)
                logger.debug("Initialized module: %s", module_name)

        # Register tools and resources from modules
        tool_count = self._register_tools()
        tool_word = "tool" if tool_count == 1 else "tools"

        resource_count = self._register_resources()
        resource_word = "resource" if resource_count == 1 else "resources"

        # Count modules and tools with proper grammar
        module_count = len(self.modules)
        module_word = "module" if module_count == 1 else "modules"

        logger.info(
            "Initialized %d %s with %d %s and %d %s",
            module_count,
            module_word,
            tool_count,
            tool_word,
            resource_count,
            resource_word,
        )

    def _register_tools(self) -> int:
        """Register tools from all modules.

        Returns:
            int: Number of tools registered
        """
        # Register core tools directly
        self.server.add_tool(
            self.falcon_check_connectivity,
            name="falcon_check_connectivity",
        )

        self.server.add_tool(
            self.list_enabled_modules,
            name="falcon_list_enabled_modules",
        )

        self.server.add_tool(
            self.list_modules,
            name="falcon_list_modules",
        )

        tool_count = 3  # the tools added above

        # Register tools from modules
        for module in self.modules.values():
            module.register_tools(self.server)

        tool_count += sum(len(getattr(m, "tools", [])) for m in self.modules.values())

        return tool_count

    def _register_resources(self) -> int:
        """Register resources from all modules.

        Returns:
            int: Number of resources registered
        """
        # Register resources from modules
        for module in self.modules.values():
            # Check if the module has a register_resources method
            if hasattr(module, "register_resources") and callable(module.register_resources):
                module.register_resources(self.server)

        return sum(len(getattr(m, "resources", [])) for m in self.modules.values())

    def falcon_check_connectivity(self) -> Dict[str, bool]:
        """Check connectivity to the Falcon API."""
        return {"connected": self.falcon_client.is_authenticated()}

    def list_enabled_modules(self) -> Dict[str, List[str]]:
        """Lists enabled modules in the falcon-mcp server.

        These modules are determined by the --modules flag when starting the server.
        If no modules are specified, all available modules are enabled.
        """
        return {"modules": list(self.modules.keys())}

    def list_modules(self) -> Dict[str, List[str]]:
        """Lists all available modules in the falcon-mcp server."""
        return {"modules": registry.get_module_names()}

    def run(self, transport: str = "stdio", host: str = "127.0.0.1", port: int = 8000):
        """Run the MCP server.

        Args:
            transport: Transport protocol to use ("stdio", "sse", or "streamable-http")
            host: Host to bind to for HTTP transports (default: 127.0.0.1)
            port: Port to listen on for HTTP transports (default: 8000)
        """
        if transport == "streamable-http":
            # For streamable-http, use uvicorn directly for custom host/port
            logger.info("Starting streamable-http server on %s:%d", host, port)

            # Get the ASGI app from FastMCP (handles /mcp path automatically)
            app = self.server.streamable_http_app()

            # Run with uvicorn for custom host/port configuration
            uvicorn.run(
                app,
                host=host,
                port=port,
                log_level="info" if not self.debug else "debug",
            )
        elif transport == "sse":
            # For sse, use uvicorn directly for custom host/port (same pattern as streamable-http)
            logger.info("Starting sse server on %s:%d", host, port)

            # Get the ASGI app from FastMCP
            app = self.server.sse_app()

            # Run with uvicorn for custom host/port configuration
            uvicorn.run(
                app,
                host=host,
                port=port,
                log_level="info" if not self.debug else "debug",
            )
        else:
            # For stdio, use the default FastMCP run method (no host/port needed)
            self.server.run(transport)


def parse_modules_list(modules_string):
    """Parse and validate comma-separated module list.

    Args:
        modules_string: Comma-separated string of module names

    Returns:
        List of validated module names (returns all available modules if empty string)

    Raises:
        argparse.ArgumentTypeError: If any module names are invalid
    """
    # Get available modules
    available_modules = registry.get_module_names()

    # If empty string, return all available modules (default behavior)
    if not modules_string:
        return available_modules

    # Split by comma and clean up whitespace
    modules = [m.strip() for m in modules_string.split(",") if m.strip()]

    # Validate against available modules
    invalid_modules = [m for m in modules if m not in available_modules]
    if invalid_modules:
        raise argparse.ArgumentTypeError(
            f"Invalid modules: {', '.join(invalid_modules)}. "
            f"Available modules: {', '.join(available_modules)}"
        )

    return modules


def parse_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description="Falcon MCP Server")

    # Transport options
    parser.add_argument(
        "--transport",
        "-t",
        choices=["stdio", "sse", "streamable-http"],
        default=os.environ.get("FALCON_MCP_TRANSPORT", "stdio"),
        help="Transport protocol to use (default: stdio, env: FALCON_MCP_TRANSPORT)",
    )

    # Module selection
    available_modules = registry.get_module_names()

    parser.add_argument(
        "--modules",
        "-m",
        type=parse_modules_list,
        default=parse_modules_list(os.environ.get("FALCON_MCP_MODULES", "")),
        metavar="MODULE1,MODULE2,...",
        help=f"Comma-separated list of modules to enable. Available: [{', '.join(available_modules)}] "
        f"(default: all modules, env: FALCON_MCP_MODULES)",
    )

    # Debug mode
    parser.add_argument(
        "--debug",
        "-d",
        action="store_true",
        default=os.environ.get("FALCON_MCP_DEBUG", "").lower() == "true",
        help="Enable debug logging (env: FALCON_MCP_DEBUG)",
    )

    # API base URL
    parser.add_argument(
        "--base-url",
        default=os.environ.get("FALCON_BASE_URL"),
        help="Falcon API base URL (env: FALCON_BASE_URL)",
    )

    # HTTP transport configuration
    parser.add_argument(
        "--host",
        default=os.environ.get("FALCON_MCP_HOST", "127.0.0.1"),
        help="Host to bind to for HTTP transports (default: 127.0.0.1, env: FALCON_MCP_HOST)",
    )

    parser.add_argument(
        "--port",
        "-p",
        type=int,
        default=int(os.environ.get("FALCON_MCP_PORT", "8000")),
        help="Port to listen on for HTTP transports (default: 8000, env: FALCON_MCP_PORT)",
    )

    parser.add_argument(
        "--user-agent-comment",
        default=os.environ.get("FALCON_MCP_USER_AGENT_COMMENT"),
        help="Additional information to include in the User-Agent comment section (env: FALCON_MCP_USER_AGENT_COMMENT)",
    )

    return parser.parse_args()


def main():
    """Main entry point for the Falcon MCP server."""
    # Load environment variables
    load_dotenv()

    # Parse command line arguments (includes environment variable defaults)
    args = parse_args()

    try:
        # Create and run the server
        server = FalconMCPServer(
            base_url=args.base_url,
            debug=args.debug,
            enabled_modules=set(args.modules),
            user_agent_comment=args.user_agent_comment,
        )
        logger.info("Starting server with %s transport", args.transport)
        server.run(args.transport, host=args.host, port=args.port)
    except RuntimeError as e:
        logger.error("Runtime error: %s", e)
        sys.exit(1)
    except ValueError as e:
        logger.error("Configuration error: %s", e)
        sys.exit(1)
    except KeyboardInterrupt:
        logger.info("Server stopped by user")
        sys.exit(0)
    except Exception as e:
        # Catch any other exceptions to ensure graceful shutdown
        logger.error("Unexpected error running server: %s", e)
        sys.exit(1)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/falcon_mcp/modules/incidents.py:
--------------------------------------------------------------------------------

```python
"""
Incidents module for Falcon MCP Server

This module provides tools for accessing and analyzing CrowdStrike Falcon incidents.
"""

from typing import Any, Dict, List

from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field

from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.incidents import (
    CROWD_SCORE_FQL_DOCUMENTATION,
    SEARCH_BEHAVIORS_FQL_DOCUMENTATION,
    SEARCH_INCIDENTS_FQL_DOCUMENTATION,
)


class IncidentsModule(BaseModule):
    """Module for accessing and analyzing CrowdStrike Falcon incidents."""

    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.

        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.show_crowd_score,
            name="show_crowd_score",
        )

        self._add_tool(
            server=server,
            method=self.search_incidents,
            name="search_incidents",
        )

        self._add_tool(
            server=server,
            method=self.get_incident_details,
            name="get_incident_details",
        )

        self._add_tool(
            server=server,
            method=self.search_behaviors,
            name="search_behaviors",
        )

        self._add_tool(
            server=server,
            method=self.get_behavior_details,
            name="get_behavior_details",
        )

    def register_resources(self, server: FastMCP) -> None:
        """Register resources with the MCP server.

        Args:
            server: MCP server instance
        """
        crowd_score_fql_resource = TextResource(
            uri=AnyUrl("falcon://incidents/crowd-score/fql-guide"),
            name="falcon_show_crowd_score_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_show_crowd_score` tool.",
            text=CROWD_SCORE_FQL_DOCUMENTATION,
        )

        search_incidents_fql_resource = TextResource(
            uri=AnyUrl("falcon://incidents/search/fql-guide"),
            name="falcon_search_incidents_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_incidents` tool.",
            text=SEARCH_INCIDENTS_FQL_DOCUMENTATION,
        )

        search_behaviors_fql_resource = TextResource(
            uri=AnyUrl("falcon://incidents/behaviors/fql-guide"),
            name="falcon_search_behaviors_fql_guide",
            description="Contains the guide for the `filter` param of the `falcon_search_behaviors` tool.",
            text=SEARCH_BEHAVIORS_FQL_DOCUMENTATION,
        )

        self._add_resource(
            server,
            crowd_score_fql_resource,
        )
        self._add_resource(
            server,
            search_incidents_fql_resource,
        )
        self._add_resource(
            server,
            search_behaviors_fql_resource,
        )

    def show_crowd_score(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://incidents/crowd-score/fql-guide` resource when building this filter parameter.",
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=2500,
            description="Maximum number of records to return. (Max: 2500)",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return ids.",
        ),
        sort: str | None = Field(
            default=None,
            description="The property to sort by. (Ex: modified_timestamp.desc)",
            examples={"modified_timestamp.desc"},
        ),
    ) -> Dict[str, Any]:
        """View calculated CrowdScores and security posture metrics for your environment.

        IMPORTANT: You must use the `falcon://incidents/crowd-score/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_show_crowd_score` tool.
        """
        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
            }
        )

        # Define the operation name (used for error handling)
        operation = "CrowdScore"

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Handle the response
        api_response = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to perform operation",
            default_result=[],
        )

        # Check if we received an error response
        if self._is_error(api_response):
            # Return the error response as is
            return api_response

        # Initialize result with all scores
        result = {
            "average_score": 0,
            "average_adjusted_score": 0,
            "scores": api_response,  # Include all the scores in the result
        }

        if api_response:  # If we have scores (list of score objects)
            score_sum = 0
            adjusted_score_sum = 0
            count = len(api_response)

            for item in api_response:
                score_sum += item.get("score", 0)
                adjusted_score_sum += item.get("adjusted_score", 0)

            if count > 0:
                # Round to ensure integer output
                result["average_score"] = round(score_sum / count)
                result["average_adjusted_score"] = round(adjusted_score_sum / count)

        return result

    def search_incidents(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://incidents/search/fql-guide` resource when building this filter parameter.",
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=500,
            description="Maximum number of records to return. (Max: 500)",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return ids.",
        ),
        sort: str | None = Field(
            default=None,
            description="The property to sort by. FQL syntax. Ex: state.asc, name.desc",
        ),
    ) -> List[Dict[str, Any]]:
        """Find and analyze security incidents to understand coordinated activity in your environment.

        IMPORTANT: You must use the `falcon://incidents/search/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_incidents` tool.
        """
        incident_ids = self._base_query(
            operation="QueryIncidents",
            filter=filter,
            limit=limit,
            offset=offset,
            sort=sort,
        )

        if self._is_error(incident_ids):
            return [incident_ids]

        # If we have incident IDs, get the details for each one
        if incident_ids:
            return self.get_incident_details(incident_ids)

        return []

    def get_incident_details(
        self,
        ids: List[str] = Field(description="Incident ID(s) to retrieve."),
    ) -> List[Dict[str, Any]]:
        """Get comprehensive incident details to understand attack patterns and coordinated activities.

        This tool returns comprehensive incident details for one or more incident IDs.
        Use this when you already have specific incident IDs and need their full details.
        For searching/discovering incidents, use the `falcon_search_incidents` tool instead.
        """
        incidents = self._base_get_by_ids(
            operation="GetIncidents",
            ids=ids,
        )

        if self._is_error(incidents):
            return [incidents]

        return incidents

    def search_behaviors(
        self,
        filter: str | None = Field(
            default=None,
            description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://incidents/behaviors/fql-guide` resource when building this filter parameter.",
        ),
        limit: int = Field(
            default=10,
            ge=1,
            le=500,
            description="Maximum number of records to return. (Max: 500)",
        ),
        offset: int | None = Field(
            default=None,
            description="Starting index of overall result set from which to return ids.",
        ),
        sort: str | None = Field(
            default=None,
            description="The property to sort by. (Ex: modified_timestamp.desc)",
        ),
    ) -> List[Dict[str, Any]]:
        """Find and analyze behaviors to understand suspicious activity in your environment.

        Use this when you need to find behaviors matching certain criteria rather than retrieving specific behaviors by ID.
        For retrieving details of known behavior IDs, use falcon_get_behavior_details instead.

        IMPORTANT: You must use the `falcon://incidents/behaviors/fql-guide` resource when you need to use the `filter` parameter.
        This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_behaviors` tool.
        """
        behavior_ids = self._base_query(
            operation="QueryBehaviors",
            filter=filter,
            limit=limit,
            offset=offset,
            sort=sort,
        )

        if self._is_error(behavior_ids):
            return [behavior_ids]

        # If we have behavior IDs, get the details for each one
        if behavior_ids:
            return self.get_behavior_details(behavior_ids)

        return []

    def get_behavior_details(
        self,
        ids: List[str] = Field(description="Behavior ID(s) to retrieve."),
    ) -> List[Dict[str, Any]]:
        """Get detailed behavior information to understand attack techniques and tactics.

        Use this when you already know the specific behavior ID(s) and need to retrieve their details.
        For searching behaviors based on criteria, use the `falcon_search_behaviors` tool instead.
        """
        behaviors = self._base_get_by_ids(
            operation="GetBehaviors",
            ids=ids,
        )

        if self._is_error(behaviors):
            return [behaviors]

        return behaviors

    def _base_query(
        self,
        operation: str,
        filter: str | None = None,
        limit: int = 100,
        offset: int | None = None,
        sort: str | None = None,
    ) -> List[str] | Dict[str, Any]:
        # Prepare parameters
        params = prepare_api_parameters(
            {
                "filter": filter,
                "limit": limit,
                "offset": offset,
                "sort": sort,
            }
        )

        # Make the API request
        response = self.client.command(operation, parameters=params)

        # Handle the response
        return handle_api_response(
            response,
            operation=operation,
            error_message="Failed to perform operation",
            default_result=[],
        )

```
Page 2/4FirstPrevNextLast