This is page 3 of 4. Use http://codebase.md/crowdstrike/falcon-mcp?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.dev.example
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug.yaml
│   │   ├── config.yml
│   │   ├── feature-request.yaml
│   │   └── question.yaml
│   └── workflows
│       ├── docker-build-push.yml
│       ├── docker-build-test.yml
│       ├── markdown-lint.yml
│       ├── python-lint.yml
│       ├── python-test-e2e.yml
│       ├── python-test.yml
│       └── release.yml
├── .gitignore
├── .markdownlint.json
├── CHANGELOG.md
├── Dockerfile
├── docs
│   ├── CODE_OF_CONDUCT.md
│   ├── CONTRIBUTING.md
│   ├── deployment
│   │   ├── amazon_bedrock_agentcore.md
│   │   └── google_cloud.md
│   ├── e2e_testing.md
│   ├── module_development.md
│   ├── resource_development.md
│   └── SECURITY.md
├── examples
│   ├── adk
│   │   ├── adk_agent_operations.sh
│   │   ├── falcon_agent
│   │   │   ├── __init__.py
│   │   │   ├── agent.py
│   │   │   ├── env.properties
│   │   │   └── requirements.txt
│   │   └── README.md
│   ├── basic_usage.py
│   ├── mcp_config.json
│   ├── sse_usage.py
│   └── streamable_http_usage.py
├── falcon_mcp
│   ├── __init__.py
│   ├── client.py
│   ├── common
│   │   ├── __init__.py
│   │   ├── api_scopes.py
│   │   ├── errors.py
│   │   ├── logging.py
│   │   └── utils.py
│   ├── modules
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── cloud.py
│   │   ├── detections.py
│   │   ├── discover.py
│   │   ├── hosts.py
│   │   ├── idp.py
│   │   ├── incidents.py
│   │   ├── intel.py
│   │   ├── sensor_usage.py
│   │   ├── serverless.py
│   │   └── spotlight.py
│   ├── registry.py
│   ├── resources
│   │   ├── __init__.py
│   │   ├── cloud.py
│   │   ├── detections.py
│   │   ├── discover.py
│   │   ├── hosts.py
│   │   ├── incidents.py
│   │   ├── intel.py
│   │   ├── sensor_usage.py
│   │   ├── serverless.py
│   │   └── spotlight.py
│   └── server.py
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── generate_e2e_report.py
│   └── test_results_viewer.html
├── SUPPORT.md
├── tests
│   ├── __init__.py
│   ├── common
│   │   ├── __init__.py
│   │   ├── test_api_scopes.py
│   │   ├── test_errors.py
│   │   ├── test_logging.py
│   │   └── test_utils.py
│   ├── conftest.py
│   ├── e2e
│   │   ├── __init__.py
│   │   ├── modules
│   │   │   ├── __init__.py
│   │   │   ├── test_cloud.py
│   │   │   ├── test_detections.py
│   │   │   ├── test_discover.py
│   │   │   ├── test_hosts.py
│   │   │   ├── test_idp.py
│   │   │   ├── test_incidents.py
│   │   │   ├── test_intel.py
│   │   │   ├── test_sensor_usage.py
│   │   │   ├── test_serverless.py
│   │   │   └── test_spotlight.py
│   │   └── utils
│   │       ├── __init__.py
│   │       └── base_e2e_test.py
│   ├── modules
│   │   ├── __init__.py
│   │   ├── test_base.py
│   │   ├── test_cloud.py
│   │   ├── test_detections.py
│   │   ├── test_discover.py
│   │   ├── test_hosts.py
│   │   ├── test_idp.py
│   │   ├── test_incidents.py
│   │   ├── test_intel.py
│   │   ├── test_sensor_usage.py
│   │   ├── test_serverless.py
│   │   ├── test_spotlight.py
│   │   └── utils
│   │       └── test_modules.py
│   ├── test_client.py
│   ├── test_registry.py
│   ├── test_server.py
│   └── test_streamable_http_transport.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/adk/adk_agent_operations.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# This script runs or deploys an AI agent based on the provided operation mode.
# It loads environment variables from ./falcon_agent/.env and validates required variables
# for the chosen mode.
# IMPORTANT: If you want the exported environment variables to persist in your
# calling shell (e.g., your terminal session) after this script finishes,
# you MUST 'source' this script, like:
# source ./adk_agent_operations.sh local_run
# --- Configuration ---
ENV_DIR="./falcon_agent"
ENV_FILE="${ENV_DIR}/.env"
ENV_PROPERTIES_TEMPLATE="${ENV_DIR}/env.properties"
INVALID_VALUE="NOT_SET" # The literal string considered an invalid value
ENV_BACKUP_FILE="${ENV_FILE}.bak"
# Define required variables for each operation mode
# These are comma-separated lists of variable names
vars_for_local_run="GOOGLE_GENAI_USE_VERTEXAI,GOOGLE_API_KEY,GOOGLE_MODEL,FALCON_CLIENT_ID,FALCON_CLIENT_SECRET,FALCON_BASE_URL,FALCON_AGENT_PROMPT"
vars_for_cloudrun_deploy="GOOGLE_GENAI_USE_VERTEXAI,GOOGLE_MODEL,FALCON_CLIENT_ID,FALCON_CLIENT_SECRET,FALCON_BASE_URL,FALCON_AGENT_PROMPT,PROJECT_ID,REGION"
vars_for_agent_engine_deploy="GOOGLE_GENAI_USE_VERTEXAI,GOOGLE_MODEL,FALCON_CLIENT_ID,FALCON_CLIENT_SECRET,FALCON_BASE_URL,FALCON_AGENT_PROMPT,PROJECT_ID,REGION,AGENT_ENGINE_STAGING_BUCKET"
vars_for_agentspace_register="GOOGLE_GENAI_USE_VERTEXAI,GOOGLE_MODEL,FALCON_CLIENT_ID,FALCON_CLIENT_SECRET,FALCON_BASE_URL,FALCON_AGENT_PROMPT,PROJECT_ID,REGION,PROJECT_NUMBER,AGENT_LOCATION,REASONING_ENGINE_NUMBER,AGENT_SPACE_APP_NAME" # Added AGENT_SPACE_APP_NAME
# --- Functions ---
# Function to display script usage
usage() {
  echo "Usage: $0 <operation_mode>"
  echo ""
  echo "Operation Modes:"
  echo "  local_run"
  echo "  cloudrun_deploy"
  echo "  agent_engine_deploy"
  echo "  agentspace_register"
  echo ""
  echo "Example: source $0 local_run"
  exit 1 # Exit with error code
}
# Function to validate required environment variables
# Arguments: $1 = comma-separated string of required variable names
validate_required_vars() {
  local required_vars_string="$1"
  IFS=',' read -r -a required_vars_array <<< "$required_vars_string" # Split string into array
  local all_vars_valid=true
  echo "--- Validating required environment variables for '$OPERATION_MODE' mode ---"
  for var_name in "${required_vars_array[@]}"; do
    # Check if variable is set and not empty
    if [[ -z "${!var_name}" ]]; then
      echo "ERROR: Required environment variable '$var_name' is missing or empty."
      all_vars_valid=false
    # Check if variable's value is the literal INVALID_VALUE string
    elif [[ "${!var_name}" == "$INVALID_VALUE" ]]; then
      echo "ERROR: Required environment variable '$var_name' has an invalid value: '$INVALID_VALUE'."
      all_vars_valid=false
    else
      echo "INFO: Variable '$var_name' is set and valid."
    fi
  done
  if ! $all_vars_valid; then
    echo "--- Validation FAILED. Please check your '$ENV_FILE' file. ---"
    return 1 # Indicate validation failure within the function
  fi
  echo "--- All required environment variables are VALID. ---"
  return 0 # Indicate validation success within the function
}
# Function to backup, modify, and restore .env file
# This function is intended to be called with 'trap' for cleanup.
cleanup_env_on_exit() {
    if [[ -f "$ENV_BACKUP_FILE" ]]; then
        echo "INFO: Restoring .env file from backup: '$ENV_BACKUP_FILE'."
        mv "$ENV_BACKUP_FILE" "$ENV_FILE" || echo "WARNING: Failed to restore .env file from backup. Manual intervention may be required."
    fi
}
# --- Main Script Logic ---
# Handle case: no arguments provided
if [ "$#" -eq 0 ]; then
    if [ ! -f "$ENV_FILE" ]; then
        echo "INFO: No operation mode provided and '$ENV_FILE' is not found."
        echo "INFO: Attempting to copy template '$ENV_PROPERTIES_TEMPLATE' to '$ENV_FILE'."
        # Ensure the directory exists
        mkdir -p "$ENV_DIR" || { echo "ERROR: Failed to create directory '$ENV_DIR'."; exit 1; }
        if [ -f "$ENV_PROPERTIES_TEMPLATE" ]; then
            cp "$ENV_PROPERTIES_TEMPLATE" "$ENV_FILE" || { echo "ERROR: Failed to copy '$ENV_PROPERTIES_TEMPLATE' to '$ENV_FILE'."; exit 1; }
            echo "SUCCESS: '$ENV_PROPERTIES_TEMPLATE' copied to '$ENV_FILE'."
            echo "ACTION REQUIRED: Please update the variables in '$ENV_FILE' before running this script with an operation mode."
            exit 0 # Exit after setup, user needs to edit the file
        else
            echo "ERROR: Template file '$ENV_PROPERTIES_TEMPLATE' not found. Cannot create '$ENV_FILE'."
            exit 1
        fi
    else
        # .env file exists but no arguments provided, so show usage
        echo "ERROR: No operation mode argument provided."
        usage # usage function calls exit 1
    fi
fi
OPERATION_MODE="$1"
# Validate the provided operation mode
case "$OPERATION_MODE" in
  local_run|cloudrun_deploy|agent_engine_deploy|agentspace_register)
    echo "INFO: Operation mode selected: '$OPERATION_MODE'."
    ;;
  *)
    echo "ERROR: Invalid operation mode: '$OPERATION_MODE'."
    usage # usage function calls exit 1
    ;;
esac
# Check if the .env file exists (after potential creation in no-arg case)
if [ ! -f "$ENV_FILE" ]; then
  echo "ERROR: Environment file '$ENV_FILE' not found after initial checks. This should not happen."
  exit 1 # Terminate the script
fi
echo "--- Loading environment variables from '$ENV_FILE' ---"
# Load all valid variables from the .env file into the current shell environment.
eval "$(grep -v '^[[:space:]]*#' "$ENV_FILE" | grep -E '^[[:alnum:]_]+=.*$' | sed -E 's/^([[:alnum:]_]+)=(.*)$/export \1="\2"/' )"
echo "--- Environment variables loaded. ---"
# Perform validation and execute mode-specific logic
case "$OPERATION_MODE" in
  local_run)
    validate_required_vars "$vars_for_local_run" || exit 1 # Exit if validation fails
    echo "INFO: Running ADK Agent for local development..."
    # Execute the local run command
    adk web
    local_run_status=$? # Capture exit status of the command
    if [ $local_run_status -eq 0 ]; then
        echo "SUCCESS: 'adk web' command completed successfully."
    else
        echo "ERROR: 'adk web' command failed with exit status $local_run_status."
        exit $local_run_status
    fi
    ;;
  cloudrun_deploy)
    validate_required_vars "$vars_for_cloudrun_deploy" || exit 1 # Exit if validation fails
    echo "INFO: Preparing for Cloud Run deployment..."
    # Backup .env file
    echo "INFO: Backing up '$ENV_FILE' to '$ENV_BACKUP_FILE'."
    cp "$ENV_FILE" "$ENV_BACKUP_FILE" || { echo "ERROR: Failed to backup .env file."; exit 1; }
    # Set trap to restore .env file on script exit (success or failure)
    trap cleanup_env_on_exit EXIT ERR
    # Modify .env file variables
    echo "INFO: Modifying '$ENV_FILE': Deleting GOOGLE_API_KEY and setting GOOGLE_GENAI_USE_VERTEXAI=True."
    # Use sed -i for in-place editing.
    # Delete line containing GOOGLE_API_KEY
    sed -i '/^GOOGLE_API_KEY=/d' "$ENV_FILE" || { echo "WARNING: Failed to delete GOOGLE_API_KEY from .env."; }
    # Replace GOOGLE_GENAI_USE_VERTEXAI value
    sed -i 's/^GOOGLE_GENAI_USE_VERTEXAI=.*/GOOGLE_GENAI_USE_VERTEXAI=True/' "$ENV_FILE" || { echo "WARNING: Failed to set GOOGLE_GENAI_USE_VERTEXAI=True in .env."; }
    # Re-load modified variables into current shell environment
    echo "INFO: Re-loading modified environment variables."
    eval "$(grep -v '^[[:space:]]*#' "$ENV_FILE" | grep -E '^[[:alnum:]_]+=.*$' | sed -E 's/^([[:alnum:]_]+)=(.*)$/export \1="\2"/' )"
    echo "INFO: Deploying ADK Agent to Cloud Run..."
    # Execute the Cloud Run deployment command
    adk deploy cloud_run --project="$PROJECT_ID" --region="$REGION" --service_name="falcon-agent-service" --with_ui ./falcon_agent
    deploy_status=$? # Capture exit status of the command
    # Trap will handle restoration on exit
    if [ $deploy_status -eq 0 ]; then
        echo "SUCCESS: Cloud Run deployment completed successfully."
    else
        echo "ERROR: Cloud Run deployment failed with exit status $deploy_status."
        exit $deploy_status
    fi
    ;;
  agent_engine_deploy)
    validate_required_vars "$vars_for_agent_engine_deploy" || exit 1 # Exit if validation fails
    echo "INFO: Preparing for Agent Engine deployment..."
    # Backup .env file
    echo "INFO: Backing up '$ENV_FILE' to '$ENV_BACKUP_FILE'."
    cp "$ENV_FILE" "$ENV_BACKUP_FILE" || { echo "ERROR: Failed to backup .env file."; exit 1; }
    # Set trap to restore .env file on script exit (success or failure)
    trap cleanup_env_on_exit EXIT ERR
    # Modify .env file variables (same as cloudrun_deploy for now)
    echo "INFO: Modifying '$ENV_FILE': Deleting GOOGLE_API_KEY and setting GOOGLE_GENAI_USE_VERTEXAI=True."
    sed -i '/^GOOGLE_API_KEY=/d' "$ENV_FILE" || { echo "WARNING: Failed to delete GOOGLE_API_KEY from .env."; }
    sed -i 's/^GOOGLE_GENAI_USE_VERTEXAI=.*/GOOGLE_GENAI_USE_VERTEXAI=True/' "$ENV_FILE" || { echo "WARNING: Failed to set GOOGLE_GENAI_USE_VERTEXAI=True in .env."; }
    # Re-load modified variables into current shell environment
    echo "INFO: Re-loading modified environment variables."
    eval "$(grep -v '^[[:space:]]*#' "$ENV_FILE" | grep -E '^[[:alnum:]_]+=.*$' | sed -E 's/^([[:alnum:]_]+)=(.*)$/export \1="\2"/' )"
    echo "INFO: Deploying ADK Agent to Agent Engine..."
    # Execute the Agent Engine deployment command
    adk deploy agent_engine --project="$PROJECT_ID" --region="$REGION" --staging_bucket="$AGENT_ENGINE_STAGING_BUCKET" --display_name=falcon_agent ./falcon_agent
    deploy_status=$? # Capture exit status of the command
    # Trap will handle restoration on exit
    if [ $deploy_status -eq 0 ]; then
        echo "SUCCESS: Agent Engine deployment completed successfully."
    else
        echo "ERROR: Agent Engine deployment failed with exit status $deploy_status."
        exit $deploy_status
    fi
    ;;
  agentspace_register)
    validate_required_vars "$vars_for_agentspace_register" || exit 1 # Exit if validation fails
    echo "INFO: Registering ADK Agent with AgentSpace..."
    TARGET_URL="https://discoveryengine.googleapis.com/v1alpha/projects/$PROJECT_ID/locations/$AGENT_LOCATION/collections/default_collection/engines/$AGENT_SPACE_APP_NAME/assistants/default_assistant/agents"
    # Construct JSON data using a here-document
    JSON_DATA=$(cat <<EOF
{
    "displayName": "CrowdStrike Falcon Agent",
    "description": "Allows users interact with CrowdStrike Falcon backend",
    "adk_agent_definition":
    {
        "tool_settings": {
            "tool_description": "CrowdStrike Falcon tools"
        },
        "provisioned_reasoning_engine": {
            "reasoning_engine":"projects/$PROJECT_NUMBER/locations/$REGION/reasoningEngines/$REASONING_ENGINE_NUMBER"
        }
    }
}
EOF
)
    echo "INFO: Sending POST request to: $TARGET_URL"
    echo "DEBUG: Request Body :"
    echo "$JSON_DATA"
    echo "..."
    # Perform the POST request using curl
    # Note: X-Goog-User-Project header should use the variable value
    curl -X POST \
         -H "Content-Type: application/json" \
         -H "Authorization: Bearer $(gcloud auth print-access-token)" \
         -H "X-Goog-User-Project: $PROJECT_ID" \
         -d "$JSON_DATA" \
         "$TARGET_URL"
    curl_status=$? # Capture exit status of curl
    echo "" # Add a newline after curl output for better readability
    if [ $curl_status -eq 0 ]; then
        echo "SUCCESS: cURL command completed successfully for AgentSpace registration."
    else
        echo "ERROR: cURL command failed with exit status $curl_status during AgentSpace registration."
        exit $curl_status
    fi
    ;;
esac
echo "--- Operation '$OPERATION_MODE' complete. ---"
```
--------------------------------------------------------------------------------
/tests/common/test_utils.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the utility functions.
"""
import unittest
from unittest.mock import patch
from falcon_mcp.common.utils import (
    extract_first_resource,
    extract_resources,
    filter_none_values,
    generate_md_table,
    prepare_api_parameters,
)
class TestUtilFunctions(unittest.TestCase):
    """Test cases for the utility functions."""
    def test_filter_none_values(self):
        """Test filter_none_values function."""
        # Dictionary with None values
        data = {
            "key1": "value1",
            "key2": None,
            "key3": 0,
            "key4": False,
            "key5": "",
            "key6": None,
        }
        filtered = filter_none_values(data)
        # Verify None values were removed
        self.assertEqual(
            filtered,
            {
                "key1": "value1",
                "key3": 0,
                "key4": False,
                "key5": "",
            },
        )
        # Empty dictionary
        self.assertEqual(filter_none_values({}), {})
        # Dictionary without None values
        data = {"key1": "value1", "key2": 2}
        self.assertEqual(filter_none_values(data), data)
    def test_prepare_api_parameters(self):
        """Test prepare_api_parameters function."""
        # Parameters with None values
        params = {
            "filter": "name:test",
            "limit": 100,
            "offset": None,
            "sort": None,
        }
        prepared = prepare_api_parameters(params)
        # Verify None values were removed
        self.assertEqual(prepared, {"filter": "name:test", "limit": 100})
        # Empty parameters
        self.assertEqual(prepare_api_parameters({}), {})
        # Parameters without None values
        params = {"filter": "name:test", "limit": 100}
        self.assertEqual(prepare_api_parameters(params), params)
    def test_extract_resources(self):
        """Test extract_resources function."""
        # Success response with resources
        response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {"id": "resource1", "name": "Resource 1"},
                    {"id": "resource2", "name": "Resource 2"},
                ]
            },
        }
        resources = extract_resources(response)
        # Verify resources were extracted
        self.assertEqual(
            resources,
            [
                {"id": "resource1", "name": "Resource 1"},
                {"id": "resource2", "name": "Resource 2"},
            ],
        )
        # Success response with empty resources
        response = {"status_code": 200, "body": {"resources": []}}
        resources = extract_resources(response)
        # Verify empty list was returned
        self.assertEqual(resources, [])
        # Success response with empty resources and default
        default = [{"id": "default", "name": "Default Resource"}]
        resources = extract_resources(response, default=default)
        # Verify default was returned
        self.assertEqual(resources, default)
        # Error response
        response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Bad request"}]},
        }
        resources = extract_resources(response)
        # Verify empty list was returned
        self.assertEqual(resources, [])
        # Error response with default
        resources = extract_resources(response, default=default)
        # Verify default was returned
        self.assertEqual(resources, default)
    @patch("falcon_mcp.common.utils._format_error_response")
    def test_extract_first_resource(self, mock_format_error):
        """Test extract_first_resource function."""
        # Mock format_error_response
        mock_format_error.return_value = {"error": "Resource not found"}
        # Success response with resources
        response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {"id": "resource1", "name": "Resource 1"},
                    {"id": "resource2", "name": "Resource 2"},
                ]
            },
        }
        resource = extract_first_resource(response, "TestOperation")
        # Verify first resource was returned
        self.assertEqual(resource, {"id": "resource1", "name": "Resource 1"})
        # Success response with empty resources
        response = {"status_code": 200, "body": {"resources": []}}
        resource = extract_first_resource(
            response, "TestOperation", not_found_error="Custom error"
        )
        # Verify error response was returned
        mock_format_error.assert_called_with("Custom error", operation="TestOperation")
        self.assertEqual(resource, {"error": "Resource not found"})
        # Error response
        response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Bad request"}]},
        }
        resource = extract_first_resource(response, "TestOperation")
        # Verify error response was returned
        mock_format_error.assert_called_with(
            "Resource not found", operation="TestOperation"
        )
        self.assertEqual(resource, {"error": "Resource not found"})
    def test_generate_md_table(self):
        """Test generate_md_table function."""
        # Test data with headers as the first row
        data = [
            # Header row
            (" Name", "    Type", "Operators ", "Description    ", "Extra"),
            # Data rows
            (
                "test_string",
                "String",
                "Yes",
                """
This is a test description.
    
It has multiple lines.
For testing purposes.
"""
            ),
            (
                "test_bool",
                "\nBoolean", 
                "\nYes",
                "This is a test description.\nIt has multiple lines.\nFor testing purposes.",
                True,
            ),
            (
                "test_none",
                " None",
                "   No",
                """
                    Multi line description.
                    Hello
                """,
                None,
            ),
            (
                "test_number",
                "Number ",
                "No   ",
                "Single line description.",
                42,
            )
        ]
        # Generate table
        table = generate_md_table(data)
        # Expected table format (with exact spacing and formatting)
        expected_table = """|Name|Type|Operators|Description|Extra|
|-|-|-|-|-|
|test_string|String|Yes|This is a test description. It has multiple lines. For testing purposes.||
|test_bool|Boolean|Yes|This is a test description. It has multiple lines. For testing purposes.|true|
|test_none|None|No|Multi line description. Hello||
|test_number|Number|No|Single line description.|42|"""
        # Compare the generated table with the expected table
        self.assertEqual(table, expected_table)
        # Split into lines for easier assertion
        lines = table.split('\n')
        # Check basic structure
        self.assertEqual(len(lines), 6)  # header + separator + 4 data rows
        # Check header row exists and contains all headers (stripped of spaces)
        header_row = lines[0]
        for header in data[0]:
            self.assertIn(header.strip(), header_row)
        # Check for multi-line handling - descriptions should be combined with spaces
        self.assertIn("This is a test description. It has multiple lines. For testing purposes.", lines[2])
        # Check for proper pipe character usage
        for i in range(6):  # Check all lines
            self.assertTrue(lines[i].startswith('|'))
            self.assertTrue(lines[i].endswith('|'))
            # Should have exactly 6 | characters (start, end, and 4 column separators)
            self.assertEqual(lines[i].count('|'), 6)
    def test_generate_table_with_non_string_headers(self):
        """Test generate_table function with non-string headers."""
        # Test data with non-string headers
        data = [
            # Header row with a non-string value
            ("Name", 123, "Operators", "Description", "Extra"),
            # Data rows
            (
                "test_string",
                "String",
                "Yes",
                "This is a test description.",
                None,
            ),
        ]
        # Verify that TypeError is raised
        with self.assertRaises(TypeError) as context:
            generate_md_table(data)
        # Check the error message
        self.assertIn("Header values must be strings", str(context.exception))
        self.assertIn("got int", str(context.exception))
    def test_generate_table_with_single_column(self):
        """Test generate_table function with a single column."""
        # Test data with a single column
        data = [
            # Header row with a single value
            ("Name",),
            # Data rows with a single value
            ("test_string",),
            ("test_bool",),
            ("test_none",),
        ]
        # Generate table
        table = generate_md_table(data)
        # Expected table format (with exact spacing and formatting)
        expected_table = """|Name|
|-|
|test_string|
|test_bool|
|test_none|"""
        # Compare the generated table with the expected table
        self.assertEqual(table, expected_table)
        # Split into lines for easier assertion
        lines = table.split('\n')
        # Check basic structure
        self.assertEqual(len(lines), 5)  # header + separator + 3 data rows
        # Check header row exists and contains the header
        header_row = lines[0]
        self.assertEqual(header_row, "|Name|")
        # Check separator row
        self.assertEqual(lines[1], "|-|")
        # Check data rows exist with correct content
        self.assertEqual(lines[2], "|test_string|")
        self.assertEqual(lines[3], "|test_bool|")
        self.assertEqual(lines[4], "|test_none|")
        # Check for proper pipe character usage
        for i in range(5):  # Check all lines
            self.assertTrue(lines[i].startswith('|'))
            self.assertTrue(lines[i].endswith('|'))
            # Should have exactly 2 | characters (start and end)
            self.assertEqual(lines[i].count('|'), 2)
            
    def test_generate_table_with_empty_header_row(self):
        """Test generate_table function with an empty header row."""
        # Test data with an empty header row
        data = [
            # Empty header row
            (),
            # Data rows
            ("test_string",),
        ]
        # Verify that ValueError is raised
        with self.assertRaises(ValueError) as context:
            generate_md_table(data)
        
        # Check the error message
        self.assertIn("Header row cannot be empty", str(context.exception))
        
    def test_generate_table_with_insufficient_data(self):
        """Test generate_table function with insufficient data."""
        # Test data with only a header row and no data rows
        data = [
            # Header row
            ("Name", "Type"),
        ]
        # Verify that TypeError is raised
        with self.assertRaises(TypeError) as context:
            generate_md_table(data)
        
        # Check the error message
        self.assertIn("Need at least 2 items", str(context.exception))
        
        # Test with empty data
        with self.assertRaises(TypeError) as context:
            generate_md_table([])
        
        # Check the error message
        self.assertIn("Need at least 2 items", str(context.exception))
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------
```markdown
# Changelog
## [0.3.0](https://github.com/CrowdStrike/falcon-mcp/compare/v0.2.0...v0.3.0) (2025-09-08)
### Features
* **module/discover:** Add unmanaged assets search tool to Discover module ([#132](https://github.com/CrowdStrike/falcon-mcp/issues/132)) ([1c7a798](https://github.com/CrowdStrike/falcon-mcp/commit/1c7a7985637fe81c789ac7b0912f748d135238a3))
* **modules/discover:** add new discover module ([#131](https://github.com/CrowdStrike/falcon-mcp/issues/131)) ([2862361](https://github.com/CrowdStrike/falcon-mcp/commit/2862361b8d0402ab7db4458794eb2b9bf62ef829))
* **modules/idp:** Add geolocation info to entities and timeline in i… ([#124](https://github.com/CrowdStrike/falcon-mcp/issues/124)) ([31bb268](https://github.com/CrowdStrike/falcon-mcp/commit/31bb268070a55cd9a0dc52cc3eab566a65dd5ac3))
* **modules/idp:** Add geolocation info to entities and timeline in idp module ([#121](https://github.com/CrowdStrike/falcon-mcp/issues/121)) ([31bb268](https://github.com/CrowdStrike/falcon-mcp/commit/31bb268070a55cd9a0dc52cc3eab566a65dd5ac3))
* **modules/serverless:** add serverless module ([#127](https://github.com/CrowdStrike/falcon-mcp/issues/127)) ([0d7b7b3](https://github.com/CrowdStrike/falcon-mcp/commit/0d7b7b3e33b05541a9507278861d37621d32dfaa))
### Bug Fixes
* fix incorrect module registration assumptions ([#153](https://github.com/CrowdStrike/falcon-mcp/issues/153)) ([bd3aa95](https://github.com/CrowdStrike/falcon-mcp/commit/bd3aa95706a2a35004d6c3c95dbbddd9e8fcffcf))
* **modules/identity:** add missing scope for Identity Protection module ([#148](https://github.com/CrowdStrike/falcon-mcp/issues/148)) ([791a262](https://github.com/CrowdStrike/falcon-mcp/commit/791a2621ed97d20553c0b0d98c6e0690a165208a))
## [0.2.0](https://github.com/CrowdStrike/falcon-mcp/compare/v0.1.0...v0.2.0) (2025-08-07)
### Features
* add origins to intel fql guide ([#89](https://github.com/CrowdStrike/falcon-mcp/issues/89)) ([c9a147e](https://github.com/CrowdStrike/falcon-mcp/commit/c9a147eef3f1c991eebc5c2e63781f8ab0eda311))
* disable telemetry ([#102](https://github.com/CrowdStrike/falcon-mcp/issues/102)) ([feb4507](https://github.com/CrowdStrike/falcon-mcp/commit/feb450797b981f9b9dd768e54cb7419f42cdfc90))
* **modules/sensorusage:** add new sensor usage module ([#101](https://github.com/CrowdStrike/falcon-mcp/issues/101)) ([ad97eb8](https://github.com/CrowdStrike/falcon-mcp/commit/ad97eb853f45b3d37af1a9b447531eb859201a0d))
* **resources/spotlight:** FQL filter as tuples ([#91](https://github.com/CrowdStrike/falcon-mcp/issues/91)) ([d9664a6](https://github.com/CrowdStrike/falcon-mcp/commit/d9664a6e37bafa102e1fea1ff109843c4ba9437d))
* **server:** add distinct tools for active vs available modules ([#103](https://github.com/CrowdStrike/falcon-mcp/issues/103)) ([f5f941a](https://github.com/CrowdStrike/falcon-mcp/commit/f5f941a28e9f2e6765d9de0fd060580274d7baab))
### Bug Fixes
* **resources/detections:** added severity_name over severity level and cleaned up example filters ([#93](https://github.com/CrowdStrike/falcon-mcp/issues/93)) ([5f4b775](https://github.com/CrowdStrike/falcon-mcp/commit/5f4b7750ad87475a3ec59f2b493db82193b7358d))
### Refactoring
* remove all return statements from tool docstrings ([#117](https://github.com/CrowdStrike/falcon-mcp/issues/117)) ([80250bb](https://github.com/CrowdStrike/falcon-mcp/commit/80250bb23da4029f0c8bb812cc6334aa7b36673d))
* remove mention to Host from FQL guide ([cf82392](https://github.com/CrowdStrike/falcon-mcp/commit/cf82392cc9f299334ae5cf7a07bd42a81b01f607))
* **resources/cloud:** remove mention to Host from FQL guide ([#76](https://github.com/CrowdStrike/falcon-mcp/issues/76)) ([81ec4de](https://github.com/CrowdStrike/falcon-mcp/commit/81ec4de3c121d407290dde6965942da26478f652))
* **resources/cloud:** use new tuple methodology to create filters ([#95](https://github.com/CrowdStrike/falcon-mcp/issues/95)) ([fd5cce7](https://github.com/CrowdStrike/falcon-mcp/commit/fd5cce7ed458b99f6aa89c4f9cfed0823e51290f))
* **resources/detections:** update guide to be more accurate ([#83](https://github.com/CrowdStrike/falcon-mcp/issues/83)) ([4ff2144](https://github.com/CrowdStrike/falcon-mcp/commit/4ff2144bbf2af3c2db3d2d8e5351c075cee7f610))
* **resources/detections:** use new tuple method for fql detections table ([#97](https://github.com/CrowdStrike/falcon-mcp/issues/97)) ([f328b79](https://github.com/CrowdStrike/falcon-mcp/commit/f328b79cbdcac9e5a1e29cbf11fc517c19e24606))
* **resources/hosts:** tested and updated fql filters and operator support for hosts module ([#63](https://github.com/CrowdStrike/falcon-mcp/issues/63)) ([e0b971c](https://github.com/CrowdStrike/falcon-mcp/commit/e0b971c6b4e4dcda693ea7f8407a21a3e847a1dc))
* **resources/hosts:** use new tuple methodology to create filters ([#96](https://github.com/CrowdStrike/falcon-mcp/issues/96)) ([da38d69](https://github.com/CrowdStrike/falcon-mcp/commit/da38d6904d25ccf8fcdfc8aef62a762acc89507d))
* **resources/incidents:** use new tuple methodology to create filters ([#98](https://github.com/CrowdStrike/falcon-mcp/issues/98)) ([a9ba2f7](https://github.com/CrowdStrike/falcon-mcp/commit/a9ba2f7ba94fe1b7b6108d5e89e4c767afad5657))
* **resources/intel:** use new tuple methodology to create filters ([#99](https://github.com/CrowdStrike/falcon-mcp/issues/99)) ([cf0c19e](https://github.com/CrowdStrike/falcon-mcp/commit/cf0c19ea77b21b8e1590c5642a6aa3de6dbd1a14))
* standardize parameter consistency across all modules ([#106](https://github.com/CrowdStrike/falcon-mcp/issues/106)) ([3c9c299](https://github.com/CrowdStrike/falcon-mcp/commit/3c9c29946942941b50d1fbcf9d640329ea8bc84a))
## 0.1.0 (2025-07-16)
### Features
* add Docker support ([#19](https://github.com/crowdstrike/falcon-mcp/issues/19)) ([f60adc1](https://github.com/crowdstrike/falcon-mcp/commit/f60adc1c1e7e0a441a57d671fa44bb430b66280d))
* add E2E testing ([#16](https://github.com/crowdstrike/falcon-mcp/issues/16)) ([c8a1d18](https://github.com/crowdstrike/falcon-mcp/commit/c8a1d18400fc5d89ef26c7cbe01fe4d46628fdff))
* add filter guide for all tools which have filter param ([#46](https://github.com/crowdstrike/falcon-mcp/issues/46)) ([61ffde9](https://github.com/crowdstrike/falcon-mcp/commit/61ffde90062644bb6014bb89c8b50ec904c728d5))
* add hosts module ([#42](https://github.com/crowdstrike/falcon-mcp/issues/42)) ([9375f4b](https://github.com/crowdstrike/falcon-mcp/commit/9375f4b2399b3ed793d548a498dc132e69ef6081))
* add intel module ([#22](https://github.com/crowdstrike/falcon-mcp/issues/22)) ([6da3359](https://github.com/crowdstrike/falcon-mcp/commit/6da3359e3890d6ee218b105f4342a1ae13690e79))
* add resources infrastructure ([#39](https://github.com/crowdstrike/falcon-mcp/issues/39)) ([2629eae](https://github.com/crowdstrike/falcon-mcp/commit/2629eaef671f75d244f355d43c3e18cad47ee488))
* add spotlight module ([#58](https://github.com/crowdstrike/falcon-mcp/issues/58)) ([713b551](https://github.com/crowdstrike/falcon-mcp/commit/713b55193141fc5d71f3bdc273d960c20e99bff8))
* add streamable-http transport with Docker support and testing ([#24](https://github.com/crowdstrike/falcon-mcp/issues/24)) ([5e44e97](https://github.com/crowdstrike/falcon-mcp/commit/5e44e9708bcccd2580444ffcaf27b03fb6716c9d))
* add user agent ([#68](https://github.com/crowdstrike/falcon-mcp/issues/68)) ([824a69f](https://github.com/crowdstrike/falcon-mcp/commit/824a69f23211cb1e0699332fa07b453bbf0401b4))
* average CrowdScore ([#20](https://github.com/crowdstrike/falcon-mcp/issues/20)) ([6580663](https://github.com/crowdstrike/falcon-mcp/commit/65806634d49248c6b59ef509eadbf4d2b64145f1))
* cloud module ([#56](https://github.com/crowdstrike/falcon-mcp/issues/56)) ([7f563c2](https://github.com/crowdstrike/falcon-mcp/commit/7f563c2e0b5afa35af3d9dbfb778f07b014812ab))
* convert fql guides to resources ([#62](https://github.com/crowdstrike/falcon-mcp/issues/62)) ([63bff7d](https://github.com/crowdstrike/falcon-mcp/commit/63bff7d3a87ea6c07b290f0c610e95e3a4c8423d))
* create _is_error method ([ee7bd01](https://github.com/crowdstrike/falcon-mcp/commit/ee7bd01d691a2cd6a74c2a9c50f406f3bd6e09de))
* flexible tool input parsing ([#41](https://github.com/crowdstrike/falcon-mcp/issues/41)) ([06287fe](https://github.com/crowdstrike/falcon-mcp/commit/06287feaccf41f4c41d587c9ab2f0a874382455b))
* idp support domain lookup and input sanitization ([#73](https://github.com/crowdstrike/falcon-mcp/issues/73)) ([9d6858c](https://github.com/crowdstrike/falcon-mcp/commit/9d6858cd7d0f97a1fbcca3858cafccf688e73da6))
* implement lazy module discovery ([#37](https://github.com/crowdstrike/falcon-mcp/issues/37)) ([a38c949](https://github.com/crowdstrike/falcon-mcp/commit/a38c94973aae3ebdc5b5f51f0980b0266c287680))
* implement lazy module discovery approach ([a38c949](https://github.com/crowdstrike/falcon-mcp/commit/a38c94973aae3ebdc5b5f51f0980b0266c287680))
* initial implementation for the falcon-mcp server ([#4](https://github.com/crowdstrike/falcon-mcp/issues/4)) ([773ecb5](https://github.com/crowdstrike/falcon-mcp/commit/773ecb54f5c7ef7760933a5c12b473df953ca85c))
* refactor to use falcon_mcp name and absolute imports ([#52](https://github.com/crowdstrike/falcon-mcp/issues/52)) ([8fe3f2d](https://github.com/crowdstrike/falcon-mcp/commit/8fe3f2d28573258a620c50270cd23c56aaf4d5fb))
### Bug Fixes
* conversational incidents ([#21](https://github.com/crowdstrike/falcon-mcp/issues/21)) ([ee7bd01](https://github.com/crowdstrike/falcon-mcp/commit/ee7bd01d691a2cd6a74c2a9c50f406f3bd6e09de))
* count number of tools correctly ([#72](https://github.com/crowdstrike/falcon-mcp/issues/72)) ([6c2284e](https://github.com/crowdstrike/falcon-mcp/commit/6c2284e2bac220bfc55b9aea1b416300dbceffb6))
* discover modules in examples ([#31](https://github.com/crowdstrike/falcon-mcp/issues/31)) ([e443fc8](https://github.com/crowdstrike/falcon-mcp/commit/e443fc8348b8aa8c79c17733833b0cb3509d7451))
* ensures proper lists are passed to module arg + ENV VAR support for args ([#54](https://github.com/crowdstrike/falcon-mcp/issues/54)) ([9820310](https://github.com/crowdstrike/falcon-mcp/commit/982031012184b4fe5d5054ace41a4abcac0ff86b))
* freshen up e2e tests ([#40](https://github.com/crowdstrike/falcon-mcp/issues/40)) ([7ba3d86](https://github.com/crowdstrike/falcon-mcp/commit/7ba3d86faed06b4033074bbed0eb5410d87f117f))
* improve error handling and fix lint issue ([#69](https://github.com/crowdstrike/falcon-mcp/issues/69)) ([31672ad](https://github.com/crowdstrike/falcon-mcp/commit/31672ad20a7a78f9edb5e7d5f7e5d610bf8aafb6))
* lock version for mcp-use to 1.3.1 ([#47](https://github.com/crowdstrike/falcon-mcp/issues/47)) ([475fe0a](https://github.com/crowdstrike/falcon-mcp/commit/475fe0a59879a5c53198ebd5e9b548d2fdfd9538))
* make api scope names the UI name to prevent confusion ([#67](https://github.com/crowdstrike/falcon-mcp/issues/67)) ([0089fec](https://github.com/crowdstrike/falcon-mcp/commit/0089fec425c5d1a58e15ebb3d6262cfa21b61931))
* return types for incidents ([ee7bd01](https://github.com/crowdstrike/falcon-mcp/commit/ee7bd01d691a2cd6a74c2a9c50f406f3bd6e09de))
### Documentation
* major refinements to README  ([#55](https://github.com/crowdstrike/falcon-mcp/issues/55)) ([c98dde4](https://github.com/crowdstrike/falcon-mcp/commit/c98dde4a35491806a27bc1ef3ec53e184810b7b9))
* minor readme updates ([7ad3285](https://github.com/crowdstrike/falcon-mcp/commit/7ad3285a942917502cebd8bf1bf067db12a0d6c6))
* provide better clarity around using .env ([#71](https://github.com/crowdstrike/falcon-mcp/issues/71)) ([2e5ec0c](https://github.com/crowdstrike/falcon-mcp/commit/2e5ec0cfd5ba918625481b0c4ea75bf161a3a606))
* update descriptions for better clarity ([#49](https://github.com/crowdstrike/falcon-mcp/issues/49)) ([1fceee1](https://github.com/crowdstrike/falcon-mcp/commit/1fceee1070d04da20fea8e1c19c0c4e286e67828))
* update readme ([#64](https://github.com/crowdstrike/falcon-mcp/issues/64)) ([7b21c1b](https://github.com/crowdstrike/falcon-mcp/commit/7b21c1b8f42a33c3704e116a56e13af6108609aa))
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/cloud.py:
--------------------------------------------------------------------------------
```python
"""
Contains Cloud resources.
"""
from falcon_mcp.common.utils import generate_md_table
FQL_DOCUMENTATION = """Falcon Query Language (FQL)
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = not text match
• * = wildcard (one or more characters)
• !* = not wildcard (one or more characters)
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== DATA TYPES & SUPPORTED OPERATORS ===
• String: equal, not equal, wildcard.
• Date, Timestamp: equal, not equal, less than, less than or equal, greater than, greater than or equal.
• Boolean: equal, not equal.
• Number: equal, not equal, less than, less than or equal, greater than, greater than or equal.
=== DATA TYPES & SYNTAX ===
• String: 'value' or ['value1', 'value2'] for a list of values. Wildcards: 'partial*' or '*partial' or '*partial*'.
• Date, Timestamp: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format).
• Boolean: true or false (no quotes).
• Number: 123 (no quotes).
=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for list of string values: ['value 1', 'value 2']
• Use wildcard operator to determine if a property contains or not a substring. Ex: `property:*'*sub*'`, `property:!*'*sub*'`
• Dates and timestamps format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
"""
# List of tuples containing filter options data: (name, type, description)
KUBERNETES_CONTAINERS_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Description"
    ),
    (
        "agent_id",
        "String",
        """
        The sensor agent ID running in the container.
        Ex: agent_id:'3c1ca4a114504ca89af51fd126991efd'
        """
    ),
    (
        "agent_type",
        "String",
        """
        The sensor agent type running in the container.
        Ex: agent_type:'Falcon sensor for linux'
        """
    ),
    (
        "ai_related",
        "Boolean",
        """
        Determines if the container hosts AI related packages.
        Ex: ai_related:true
        """
    ),
    (
        "cloud_account_id",
        "String",
        """
        The cloud provider account ID.
        Ex: cloud_account_id:'171998889118'
        """
    ),
    (
        "cloud_name",
        "String",
        """
        The cloud provider name.
        Ex: cloud_name:'AWS'
        """
    ),
    (
        "cloud_region",
        "String",
        """
        The cloud region.
        Ex: cloud_region:'us-1'
        """
    ),
    (
        "cluster_id",
        "String",
        """
        The kubernetes cluster ID of the container.
        Ex: cluster_id:'6055bde7-acfe-48ae-9ee0-0ac1a60d8eac'
        """
    ),
    (
        "cluster_name",
        "String",
        """
        The kubernetes cluster that manages the container.
        Ex: cluster_name:'prod-cluster'
        """
    ),
    (
        "container_id",
        "String",
        """
        The kubernetes container ID.
        Ex: container_id:'c30c45f9-4702-4663-bce8-cca9f2237d1d'
        """
    ),
    (
        "container_name",
        "String",
        """
        The kubernetes container name.
        Ex: container_name:'prod-cluster'
        """
    ),
    (
        "cve_id",
        "String",
        """
        The CVE ID found in the container image.
        Ex: cve_id:'CVE-2025-1234'
        """
    ),
    (
        "detection_name",
        "String",
        """
        The name of the detection found in the container image.
        Ex: detection_name:'RunningAsRootContainer'
        """
    ),
    (
        "first_seen",
        "Timestamp",
        """
        Timestamp when the kubernetes container was first seen in UTC date format ("YYYY-MM-DDTHH:MM:SSZ").
        Ex: first_seen:'2025-01-19T11:14:15Z'
        """
    ),
    (
        "image_detection_count",
        "Number",
        """
        Number of images detections found in the container image.
        Ex: image_detection_count:5
        """
    ),
    (
        "image_digest",
        "String",
        """
        The digest of the container image.
        Ex: image_digest:'sha256:a08d3ee8ee68ebd8a78525a710c6479270692259e'
        """
    ),
    (
        "image_has_been_assessed",
        "Boolean",
        """
        Tells whether the container image has been assessed.
        Ex: image_has_been_assessed:true
        """
    ),
    (
        "image_id",
        "String",
        """
        The ID of the container image.
        Ex: image_id:'a90f484d134848af858cd409801e213e'
        """
    ),
    (
        "image_registry",
        "String",
        """
        The registry of the container image.
        """
    ),
    (
        "image_repository",
        "String",
        """
        The repository of the container image.
        Ex: image_repository:'my-app'
        """
    ),
    (
        "image_tag",
        "String",
        """
        The tag of the container image.
        Ex: image_tag:'v1.0.0'
        """
    ),
    (
        "image_vulnerability_count",
        "Number",
        """
        Number of image vulnerabilities found in the container image.
        Ex: image_vulnerability_count:1
        """
    ),
    (
        "insecure_mount_source",
        "String",
        """
        File path of the insecure mount in the container.
        Ex: insecure_mount_source:'/var/data'
        """
    ),
    (
        "insecure_mount_type",
        "String",
        """
        Type of the insecure mount in the container.
        Ex: insecure_mount_type:'hostPath'
        """
    ),
    (
        "insecure_propagation_mode",
        "Boolean",
        """
        Tells whether the container has an insecure mount propagation mode.
        Ex: insecure_propagation_mode:false
        """
    ),
    (
        "interactive_mode",
        "Boolean",
        """
        Tells whether the container is running in interactive mode.
        Ex: interactive_mode:true
        """
    ),
    (
        "ipv4",
        "String",
        """
        The IPv4 of the container.
        Ex: ipv4:'10.10.1.5'
        """
    ),
    (
        "ipv6",
        "String",
        """
        The IPv6 of the container.
        Ex: ipv6:'2001:db8::ff00:42:8329'
        """
    ),
    (
        "last_seen",
        "Timestamp",
        """
        Timestamp when the kubernetes container was last seen in UTC date format ("YYYY-MM-DDTHH:MM:SSZ").
        Ex: last_seen:'2025-01-19T11:14:15Z'
        """
    ),
    (
        "namespace",
        "String",
        """
        The kubernetes namespace name.
        Ex: namespace:'default'
        """
    ),
    (
        "node_name",
        "String",
        """
        The name of the kubernetes node.
        Ex: node_name:'k8s-pool'
        """
    ),
    (
        "node_uid",
        "String",
        """
        The kubernetes node UID of the container.
        Ex: node_uid:'79f1741e7db542bdaaecac11a7f7b7ae'
        """
    ),
    (
        "pod_id",
        "String",
        """
        The kubernetes pod ID of the container.
        Ex: pod_id:'6ab0fffa-2662-440b-8e95-2be93e11da3c'
        """
    ),
    (
        "pod_name",
        "String",
        """
        The kubernetes pod name of the container.
        """
    ),
    (
        "port",
        "String",
        """
        The port that the container exposes.
        """
    ),
    (
        "privileged",
        "Boolean",
        """
        Tells whether the container is running with elevated privileges.
        Ex: privileged:false
        """
    ),
    (
        "root_write_access",
        "Boolean",
        """
        Tells whether the container has root write access.
        Ex: root_write_access:false
        """
    ),
    (
        "run_as_root_group",
        "Boolean",
        """
        Tells whether the container is running as root group.
        """
    ),
    (
        "run_as_root_user",
        "Boolean",
        """
        Tells whether the container is running as root user.
        """
    ),
    (
        "running_status",
        "Boolean",
        """
        Tells whether the container is running.
        Ex: running_status:true
        """
    ),
]
KUBERNETES_CONTAINERS_FQL_DOCUMENTATION = (
    FQL_DOCUMENTATION
    + """
=== falcon_search_kubernetes_containers FQL filter available fields ===
""" + generate_md_table(KUBERNETES_CONTAINERS_FQL_FILTERS) + """
=== falcon_search_kubernetes_containers FQL filter examples ===
# Find kubernetes containers that are running and have 1 or more image vulnerabilities
image_vulnerability_count:>0+running_status:true
# Find kubernetes containers seen in the last 7 days and by the CVE ID found in their container images
cve_id:'CVE-2025-1234'+last_seen:>'2025-03-15T00:00:00Z'
# Find kubernetes containers whose cloud_name is in a list
cloud_name:['AWS', 'Azure']
# Find kubernetes containers whose names starts with "app-"
container_name:*'app-*'
# Find kubernetes containers whose cluster or namespace name is "prod"
cluster_name:'prod',namespace:'prod'
=== falcon_count_kubernetes_containers FQL filter examples ===
# Count kubernetes containers by cluster name
cluster_name:'staging'
# Count kubernetes containers by agent type
agent_type:'Kubernetes'
"""
)
# List of tuples containing filter options data: (name, type, description)
IMAGES_VULNERABILITIES_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Description"
    ),
    (
        "ai_related",
        "Boolean",
        """
        Tells whether the image has AI related packages.
        Ex: ai_related:true
        """
    ),
    (
        "base_os",
        "String",
        """
        The base operating system of the image.
        Ex: base_os:'ubuntu'
        """
    ),
    (
        "container_id",
        "String",
        """
        The kubernetes container id in which the image vulnerability was detected.
        Ex: container_id:'515f976c43eaa3edf51590e7217ac8191a7e50c59'
        """
    ),
    (
        "container_running_status",
        "Boolean",
        """
        The running status of the kubernetes container in which the image vulnerability was detected.
        Ex: container_running_status:true
        """
    ),
    (
        "cps_rating",
        "String",
        """
        The CSP rating of the image vulnerability.
        Possible values: Low, Medium, High, Critical
        Ex: cps_rating:'Critical'
        """
    ),
    (
        "cve_id",
        "String",
        """
        The CVE ID of the image vulnerability.
        Ex: cve_id:'CVE-2025-1234'
        """
    ),
    (
        "cvss_score",
        "Number",
        """
        The CVSS Score of the image vulnerability. The value must be between 0 and 10.
        Ex: cvss_score:8
        """
    ),
    (
        "image_digest",
        "String",
        """
        The digest of the image.
        Ex: image_digest:'sha256:a08d3ee8ee68ebd8a78525a710c6479270692259e'
        """
    ),
    (
        "image_id",
        "String",
        """
        The ID of the image.
        Ex: image_id:'a90f484d134848af858cd409801e213e'
        """
    ),
    (
        "registry",
        "String",
        """
        The image registry of the image in which the vulnerability was detected.
        Ex: registry:'docker.io'
        """
    ),
    (
        "repository",
        "String",
        """
        The image repository of the image in which the vulnerability was detected.
        Ex: repository:'my-app'
        """
    ),
    (
        "severity",
        "String",
        """
        The severity of the vulnerability.
        Available values: Low, Medium, High, Critical.
        Ex: severity:'High'
        """
    ),
    (
        "tag",
        "String",
        """
        The image tag of the image in which the vulnerability was detected.
        Ex: tag:'v1.0.0'
        """
    ),
]
IMAGES_VULNERABILITIES_FQL_DOCUMENTATION = (
    FQL_DOCUMENTATION
    + """
=== falcon_search_images_vulnerabilities FQL filter options ===
""" + generate_md_table(IMAGES_VULNERABILITIES_FQL_FILTERS) + """
=== falcon_search_images_vulnerabilities FQL filter examples ===
# Find images vulnerabilities by container ID
container_id:'12341223'
# Find images vulnerabilities by a list of container IDs
container_id:['12341223', '199929292', '1000101']
# Find images vulnerabilities by CVSS score and container with running status true
cvss_score:>5+container_running_status:true
# Find images vulnerabilities by image registry using wildcard
registry:*'*docker*'
"""
)
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/spotlight.py:
--------------------------------------------------------------------------------
```python
"""
Contains Spotlight Vulnerabilities resources.
"""
from falcon_mcp.common.utils import generate_md_table
# List of tuples containing filter options data: (name, type, operators, description)
SEARCH_VULNERABILITIES_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "aid",
        "String",
        "No",
        """
        Unique agent identifier (AID) of the sensor where the vulnerability was found.
        For assets without a Falcon sensor installed, this field matches the asset ID field.
        Ex: aid:'abcde6b9a3427d8c4a1af416424d6231'
        """
    ),
    (
        "apps.remediation.ids",
        "String",
        "Yes",
        """
        Unique identifier of a remediation. Supports multiple values and negation.
        Ex: apps.remediation.ids:'7bba2e543744a92962be7afeb6484858'
        Ex: apps.remediation.ids:['ID1','ID2','ID3']
        """
    ),
    (
        "cid",
        "String",
        "No",
        """
        Unique system-generated customer identifier (CID) of the account. In multi-CID environments, you can filter by both parent and child CIDs.
        Ex: cid:'0123456789ABCDEFGHIJKLMNOPQRSTUV'
        """
    ),
    (
        "closed_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time a vulnerability was set to a status of CLOSED.
        Ex: closed_timestamp:>'2021-06-25T10:32'
        Ex: closed_timestamp:<'2021-10-18'
        """
    ),
    (
        "confidence",
        "String",
        "Yes",
        """
        Whether or not the vulnerability has been confirmed.
        Values: confirmed, potential
        Ex: confidence:'potential'
        """
    ),
    (
        "created_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time when this vulnerability was found in your environment. Use this to get vulnerabilities created after the timestamp you last pulled data on.
        Ex: created_timestamp:<'2021-09-25T13:22'
        Ex: created_timestamp:>'2021-02-12'
        """
    ),
    (
        "cve.base_score",
        "Number",
        "Yes",
        """
        CVE base score.
        Ex: cve.base_score:>5.0
        """
    ),
    (
        "cve.cwes",
        "String",
        "Yes",
        """
        Unique identifier for a vulnerability from the Common Weakness Enumeration (CWE) list.
        Ex: cve.cwes:['CWE-787','CWE-699']
        """
    ),
    (
        "cve.exploit_status",
        "String",
        "Yes",
        """
        Numeric value of the most severe known exploit. Supports multiple values and negation.
        Values: 0=Unproven, 30=Available, 60=Easily accessible, 90=Actively used
        Ex: cve.exploit_status:'60'
        Ex: cve.exploit_status:!'0'
        """
    ),
    (
        "cve.exprt_rating",
        "String",
        "Yes",
        """
        ExPRT rating assigned by CrowdStrike's predictive AI rating system. Value must be in all caps. Supports multiple values and negation.
        Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL
        Ex: cve.exprt_rating:'HIGH'
        Ex: cve.exprt_rating:['HIGH','CRITICAL']
        """
    ),
    (
        "cve.id",
        "String",
        "Yes",
        """
        Unique identifier for a vulnerability as cataloged in the National Vulnerability Database (NVD). Supports multiple values and negation. For case-insensitive filtering, add .insensitive to the field name.
        Note: All values must be enclosed in brackets.
        Ex: cve.id:['CVE-2022-1234']
        Ex: cve.id:['CVE-2022-1234','CVE-2023-1234']
        """
    ),
    (
        "cve.is_cisa_kev",
        "Boolean",
        "Yes",
        """
        Filter for vulnerabilities that are in the CISA Known Exploited Vulnerabilities (KEV) catalog. Supports negation.
        Ex: cve.is_cisa_kev:true
        """
    ),
    (
        "cve.remediation_level",
        "String",
        "Yes",
        """
        CVSS remediation level of the vulnerability. Supports multiple values and negation.
        Ex: cve.remediation_level:'O' (official fix)
        Ex: cve.remediation_level:'U' (no available fix)
        """
    ),
    (
        "cve.severity",
        "String",
        "Yes",
        """
        CVSS severity rating of the vulnerability. Value must be in all caps. Supports multiple values and negation.
        Values: UNKNOWN, NONE, LOW, MEDIUM, HIGH, CRITICAL
        Ex: cve.severity:'LOW'
        Ex: cve.severity:!'UNKNOWN'
        """
    ),
    (
        "cve.types",
        "String",
        "Yes",
        """
        Vulnerability type.
        Values: Vulnerability, Misconfiguration, Unsupported software
        Ex: cve.types:!'Misconfiguration'
        """
    ),
    (
        "data_providers.ports",
        "String",
        "Yes",
        """
        Ports on the host where the vulnerability was found by the third-party provider.
        Ex: data_providers.ports:'53'
        Ex: data_providers.ports:!'0' (any port)
        """
    ),
    (
        "data_providers.provider",
        "String",
        "No",
        """
        Name of the data provider.
        Ex: data_providers.provider:'{provider name}'
        """
    ),
    (
        "data_providers.rating",
        "String",
        "Yes",
        """
        Third-party provider rating.
        Values: UNKNOWN, NONE, LOW, MEDIUM, HIGH, CRITICAL
        Ex: data_providers.rating:'CRITICAL'
        """
    ),
    (
        "data_providers.scan_time",
        "Timestamp",
        "Yes",
        """
        UTC date and time when the vulnerability was most recently identified by the third-party provider.
        Ex: data_providers.scan_time:>'2023-08-03'
        """
    ),
    (
        "data_providers.scanner_id",
        "String",
        "No",
        """
        ID of the third-party scanner that identified the vulnerability.
        Ex: data_providers.scanner_id:'{scanner id}'
        """
    ),
    (
        "host_info.asset_criticality",
        "String",
        "Yes",
        """
        Assigned criticality level of the asset.
        Values: Critical, High, Noncritical, Unassigned
        Ex: host_info.asset_criticality:['Critical','High']
        Ex: host_info.asset_criticality:!'Unassigned'
        """
    ),
    (
        "host_info.groups",
        "String",
        "Yes",
        """
        Unique system-assigned ID of a host group. Supports multiple values and negation. All values must be enclosed in brackets.
        Ex: host_info.groups:['03f0b54af2692e99c4cec945818fbef7']
        Ex: host_info.groups:!['03f0b54af2692e99c4cec945818fbef7']
        """
    ),
    (
        "host_info.has_run_container",
        "Boolean",
        "No",
        """
        Whether or not the host is running Kubernetes containers.
        Ex: host_info.has_run_container:true
        """
    ),
    (
        "host_info.internet_exposure",
        "String",
        "No",
        """
        Whether or not the asset is internet-facing.
        Values: Yes, No, Pending
        Ex: host_info.internet_exposure:'Yes'
        """
    ),
    (
        "host_info.managed_by",
        "String",
        "Yes",
        """
        Indicates if the asset has the Falcon sensor installed.
        Values: Falcon sensor, Unmanaged
        Supports multiple values and negation.
        Ex: host_info.managed_by:'Unmanaged'
        """
    ),
    (
        "host_info.platform_name",
        "String",
        "Yes",
        """
        Operating system platform. Supports negation.
        Values: Windows, Mac, Linux
        Ex: host_info.platform_name:'Windows'
        Ex: host_info.platform_name:!'Linux'
        """
    ),
    (
        "host_info.product_type_desc",
        "String",
        "Yes",
        """
        Type of host a sensor is running on. Supports multiple values and negation. For case-insensitive filtering, add .insensitive to the field name. Enter values with first letter capitalized.
        Values: Workstation, Server, Domain Controller
        Ex: host_info.product_type_desc:'Workstation'
        Ex: host_info.product_type_desc:!'Workstation'
        """
    ),
    (
        "host_info.tags",
        "String",
        "Yes",
        """
        Name of a tag assigned to a host. Supports multiple values and negation. All values must be enclosed in brackets.
        Ex: host_info.tags:['ephemeral']
        Ex: host_info.tags:!['search','ephemeral']
        """
    ),
    (
        "host_info.third_party_asset_ids",
        "String",
        "Yes",
        """
        Asset IDs assigned to the host by third-party providers in the format: {data_provider}: {data_provider_asset_id}
        Supports multiple values and negation.
        Ex: host_info.third_party_asset_ids:'{provider}: {asset_id}'
        """
    ),
    (
        "last_seen_within",
        "Number",
        "No",
        """
        Filter for vulnerabilities based on the number of days since a host last connected to Falcon. Enter a numeric value from 3 to 45 to indicate the number of days to look back.
        Ex: last_seen_within:'10'
        """
    ),
    (
        "services.port",
        "String",
        "No",
        """
        Port on the host where a vulnerability was found by Falcon EASM or a third-party provider.
        Ex: services.port:'443'
        """
    ),
    (
        "services.protocol",
        "String",
        "No",
        """
        Network protocols recognized by Falcon EASM.
        Ex: services.protocol:'pop3'
        """
    ),
    (
        "services.transport",
        "String",
        "No",
        """
        Transport methods recognized by Falcon EASM.
        Ex: services.transport:'tcp'
        """
    ),
    (
        "status",
        "String",
        "Yes",
        """
        Status of a vulnerability. Value must be in all lowercase letters. Supports multiple values and negation.
        Values: open, closed, reopen, expired
        Ex: status:'open'
        Ex: status:!'closed'
        Ex: status:['open','reopen']
        """
    ),
    (
        "suppression_info.is_suppressed",
        "Boolean",
        "No",
        """
        Indicates if the vulnerability is suppressed by a suppression rule or not.
        Ex: suppression_info.is_suppressed:true
        """
    ),
    (
        "suppression_info.reason",
        "String",
        "Yes",
        """
        Attribute assigned to a suppression rule. Supports multiple values and negation. All values must be enclosed in brackets.
        Values: ACCEPT_RISK, COMPENSATING_CONTROL, FALSE_POSITIVE
        Ex: suppression_info.reason:['ACCEPT_RISK']
        Ex: suppression_info.reason:!['FALSE_POSITIVE']
        """
    ),
    (
        "updated_timestamp",
        "Timestamp",
        "Yes",
        """
        UTC date and time of the last update made on a vulnerability.
        Ex: updated_timestamp:<'2021-10-20T22:36'
        Ex: updated_timestamp:>'2021-09-15'
        """
    ),
    (
        "vulnerability_id",
        "String",
        "Yes",
        """
        CVE ID of the vulnerability. If there's no CVE ID, this is the CrowdStrike or third-party ID of the vulnerability.
        For case-insensitive filtering, add .insensitive to the field name. Supports multiple values and negation.
        Ex: vulnerability_id:['CVE-2022-1234']
        Ex: vulnerability_id:['CVE-2022-1234','CVE-2023-4321']
        """
    ),
]
SEARCH_VULNERABILITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Search Vulnerabilities Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== falcon_search_vulnerabilities FQL filter options ===
""" + generate_md_table(SEARCH_VULNERABILITIES_FQL_FILTERS) + """
=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches and multiple values: ['value1','value2']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
• For case-insensitive filtering, add .insensitive to field names
• Boolean values: true or false (no quotes)
• Wildcards (*) are unsupported in this API
• Some fields require specific capitalization (check individual field descriptions)
=== COMMON FILTER EXAMPLES ===
• High severity vulnerabilities: cve.severity:'HIGH'
• Recent vulnerabilities: created_timestamp:>'2024-01-01'
• Windows vulnerabilities: host_info.platform_name:'Windows'
• Open vulnerabilities with exploits: status:'open'+cve.exploit_status:!'0'
• Critical ExPRT rated vulnerabilities: cve.exprt_rating:'CRITICAL'
• CISA KEV vulnerabilities: cve.is_cisa_kev:true
"""
```
--------------------------------------------------------------------------------
/tests/modules/test_hosts.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the Hosts module.
"""
import unittest
from falcon_mcp.modules.hosts import HostsModule
from tests.modules.utils.test_modules import TestModules
class TestHostsModule(TestModules):
    """Test cases for the Hosts module."""
    def setUp(self):
        """Set up test fixtures."""
        self.setup_module(HostsModule)
    def test_register_tools(self):
        """Test registering tools with the server."""
        expected_tools = [
            "falcon_search_hosts",
            "falcon_get_host_details",
        ]
        self.assert_tools_registered(expected_tools)
    def test_register_resources(self):
        """Test registering resources with the server."""
        expected_resources = [
            "falcon_search_hosts_fql_guide",
        ]
        self.assert_resources_registered(expected_resources)
    def test_search_hosts(self):
        """Test searching for hosts."""
        # Setup mock responses for both API calls
        query_response = {
            "status_code": 200,
            "body": {"resources": ["device1", "device2"]},
        }
        details_response = {
            "status_code": 200,
            "body": {"resources": []},  # Empty resources for PostDeviceDetailsV2
        }
        self.mock_client.command.side_effect = [query_response, details_response]
        # Call search_hosts
        result = self.module.search_hosts(filter="platform_name:'Windows'", limit=50)
        # Verify client commands were called correctly
        self.assertEqual(self.mock_client.command.call_count, 2)
        # Check that the first call was to QueryDevicesByFilter with the right filter and limit
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(first_call[0][0], "QueryDevicesByFilter")
        self.assertEqual(
            first_call[1]["parameters"]["filter"], "platform_name:'Windows'"
        )
        self.assertEqual(first_call[1]["parameters"]["limit"], 50)
        self.mock_client.command.assert_any_call(
            "PostDeviceDetailsV2", body={"ids": ["device1", "device2"]}
        )
        # Verify result
        self.assertEqual(
            result, []
        )  # Empty list because PostDeviceDetailsV2 returned empty resources
    def test_search_hosts_with_details(self):
        """Test searching for hosts with details."""
        # Setup mock responses
        query_response = {
            "status_code": 200,
            "body": {"resources": ["device1", "device2"]},
        }
        details_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "device_id": "device1",
                        "hostname": "TEST-HOST-1",
                        "platform_name": "Windows",
                    },
                    {
                        "device_id": "device2",
                        "hostname": "TEST-HOST-2",
                        "platform_name": "Linux",
                    },
                ]
            },
        }
        self.mock_client.command.side_effect = [query_response, details_response]
        # Call search_hosts
        result = self.module.search_hosts(filter="platform_name:'Windows'", limit=50)
        # Verify client commands were called correctly
        self.assertEqual(self.mock_client.command.call_count, 2)
        # Check that the first call was to QueryDevicesByFilter with the right filter and limit
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(first_call[0][0], "QueryDevicesByFilter")
        self.assertEqual(
            first_call[1]["parameters"]["filter"], "platform_name:'Windows'"
        )
        self.assertEqual(first_call[1]["parameters"]["limit"], 50)
        self.mock_client.command.assert_any_call(
            "PostDeviceDetailsV2", body={"ids": ["device1", "device2"]}
        )
        # Verify result
        expected_result = [
            {
                "device_id": "device1",
                "hostname": "TEST-HOST-1",
                "platform_name": "Windows",
            },
            {
                "device_id": "device2",
                "hostname": "TEST-HOST-2",
                "platform_name": "Linux",
            },
        ]
        self.assertEqual(result, expected_result)
    def test_search_hosts_error(self):
        """Test searching for hosts with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 400,
            "body": {"errors": [{"message": "Invalid filter"}]},
        }
        self.mock_client.command.return_value = mock_response
        # Call search_hosts
        result = self.module.search_hosts(filter="invalid_filter")
        # Verify result contains error
        self.assertEqual(len(result), 1)
        self.assertIn("error", result[0])
        self.assertIn("details", result[0])
    def test_search_hosts_no_results(self):
        """Test searching for hosts with no results."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response
        # Call search_hosts
        result = self.module.search_hosts(filter="hostname:'NONEXISTENT'")
        # Verify result is empty list
        self.assertEqual(result, [])
        # Only one API call should be made (QueryDevicesByFilter)
        self.assertEqual(self.mock_client.command.call_count, 1)
    def test_search_hosts_with_all_parameters(self):
        """Test searching for hosts with all parameters."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response
        # Call search_hosts with all parameters
        result = self.module.search_hosts(
            filter="platform_name:'Linux'", limit=25, offset=10, sort="hostname.desc"
        )
        # Verify API call with all parameters
        self.mock_client.command.assert_called_once_with(
            "QueryDevicesByFilter",
            parameters={
                "filter": "platform_name:'Linux'",
                "limit": 25,
                "offset": 10,
                "sort": "hostname.desc",
            },
        )
        # Verify result
        self.assertEqual(result, [])
    def test_get_host_details(self):
        """Test getting host details."""
        # Setup mock response
        mock_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "device_id": "device1",
                        "hostname": "TEST-HOST-1",
                        "platform_name": "Windows",
                    }
                ]
            },
        }
        self.mock_client.command.return_value = mock_response
        # Call get_host_details
        result = self.module.get_host_details(["device1"])
        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "PostDeviceDetailsV2", body={"ids": ["device1"]}
        )
        # Verify result
        expected_result = [
            {
                "device_id": "device1",
                "hostname": "TEST-HOST-1",
                "platform_name": "Windows",
            }
        ]
        self.assertEqual(result, expected_result)
    def test_get_host_details_multiple_ids(self):
        """Test getting host details for multiple IDs."""
        # Setup mock response
        mock_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "device_id": "device1",
                        "hostname": "TEST-HOST-1",
                        "platform_name": "Windows",
                    },
                    {
                        "device_id": "device2",
                        "hostname": "TEST-HOST-2",
                        "platform_name": "Linux",
                    },
                ]
            },
        }
        self.mock_client.command.return_value = mock_response
        # Call get_host_details
        result = self.module.get_host_details(["device1", "device2"])
        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "PostDeviceDetailsV2", body={"ids": ["device1", "device2"]}
        )
        # Verify result
        expected_result = [
            {
                "device_id": "device1",
                "hostname": "TEST-HOST-1",
                "platform_name": "Windows",
            },
            {
                "device_id": "device2",
                "hostname": "TEST-HOST-2",
                "platform_name": "Linux",
            },
        ]
        self.assertEqual(result, expected_result)
    def test_get_host_details_not_found(self):
        """Test getting host details for non-existent host."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response
        # Call get_host_details
        result = self.module.get_host_details(["nonexistent"])
        # For empty resources, handle_api_response returns the default_result (empty list)
        self.assertEqual(result, [])
    def test_get_host_details_error(self):
        """Test getting host details with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 404,
            "body": {"errors": [{"message": "Device not found"}]},
        }
        self.mock_client.command.return_value = mock_response
        # Call get_host_details
        result = self.module.get_host_details(["invalid-id"])
        # Verify result contains error
        self.assertIsInstance(result, dict)
        self.assertIn("error", result)
        self.assertIn("details", result)
    def test_get_host_details_empty_list(self):
        """Test getting host details with empty ID list."""
        # Call get_host_details with empty list
        result = self.module.get_host_details([])
        # Should return empty list without making API call
        self.assertEqual(result, [])
        self.mock_client.command.assert_not_called()
    def test_search_hosts_windows_platform(self):
        """Test searching for Windows hosts."""
        # Setup mock responses
        query_response = {
            "status_code": 200,
            "body": {"resources": ["win-host-1", "win-host-2"]},
        }
        details_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "device_id": "win-host-1",
                        "platform_name": "Windows",
                        "hostname": "WIN-01",
                    },
                    {
                        "device_id": "win-host-2",
                        "platform_name": "Windows",
                        "hostname": "WIN-02",
                    },
                ]
            },
        }
        self.mock_client.command.side_effect = [query_response, details_response]
        # Call search_hosts
        result = self.module.search_hosts(filter="platform_name:'Windows'")
        # Verify result
        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]["platform_name"], "Windows")
        self.assertEqual(result[1]["platform_name"], "Windows")
        # Verify filter was applied correctly
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(
            first_call[1]["parameters"]["filter"], "platform_name:'Windows'"
        )
    def test_search_hosts_linux_platform(self):
        """Test searching for Linux hosts."""
        # Setup mock responses
        query_response = {"status_code": 200, "body": {"resources": ["linux-host-1"]}}
        details_response = {
            "status_code": 200,
            "body": {
                "resources": [
                    {
                        "device_id": "linux-host-1",
                        "platform_name": "Linux",
                        "hostname": "LINUX-01",
                    }
                ]
            },
        }
        self.mock_client.command.side_effect = [query_response, details_response]
        # Call search_hosts
        result = self.module.search_hosts(filter="platform_name:'Linux'")
        # Verify result
        self.assertEqual(len(result), 1)
        self.assertEqual(result[0]["platform_name"], "Linux")
        # Verify filter was applied correctly
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(first_call[1]["parameters"]["filter"], "platform_name:'Linux'")
    def test_search_hosts_mac_platform_no_results(self):
        """Test searching for Mac hosts with no results."""
        # Setup mock response with empty resources
        mock_response = {"status_code": 200, "body": {"resources": []}}
        self.mock_client.command.return_value = mock_response
        # Call search_hosts
        result = self.module.search_hosts(filter="platform_name:'Mac'")
        # Verify result
        self.assertEqual(len(result), 0)
        # Verify filter was applied correctly
        first_call = self.mock_client.command.call_args_list[0]
        self.assertEqual(first_call[1]["parameters"]["filter"], "platform_name:'Mac'")
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/e2e/utils/base_e2e_test.py:
--------------------------------------------------------------------------------
```python
"""Base class for E2E tests."""
import asyncio
import atexit
import json
import os
import threading
import time
import unittest
from typing import Any
from unittest.mock import MagicMock, patch
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
from falcon_mcp.server import FalconMCPServer
# Load environment variables from .env file for local development
load_dotenv()
# Default models to test against
DEFAULT_MODLES_TO_TEST = ["gpt-4.1-mini", "gpt-4o-mini"]
# Default number of times to run each test
DEFAULT_RUNS_PER_TEST = 2
# Default success threshold for passing a test
DEFAULT_SUCCESS_TRESHOLD = 0.7
# Models to test against
MODELS_TO_TEST = os.getenv("MODELS_TO_TEST", ",".join(DEFAULT_MODLES_TO_TEST)).split(
    ","
)
# Number of times to run each test
RUNS_PER_TEST = int(os.getenv("RUNS_PER_TEST", str(DEFAULT_RUNS_PER_TEST)))
# Success threshold for passing a test
SUCCESS_THRESHOLD = float(os.getenv("SUCCESS_TRESHOLD", str(DEFAULT_SUCCESS_TRESHOLD)))
# Module-level singleton for shared server resources
class SharedTestServer:
    """Singleton class to manage shared test server resources."""
    instance = None
    initialized = False
    def __new__(cls):
        if cls.instance is None:
            cls.instance = super().__new__(cls)
        return cls.instance
    def __init__(self):
        if not self.initialized:
            # Group server-related attributes
            self.server_config = {
                "thread": None,
                "client": None,
                "loop": None,
            }
            # Group patching-related attributes
            self.patchers = {
                "env": None,
                "api": None,
                "mock_api_instance": None,
            }
            # Group test configuration
            self.test_config = {
                "results": [],
                "verbosity_level": 0,
                "base_url": os.getenv("OPENAI_BASE_URL"),
                "models_to_test": MODELS_TO_TEST,
            }
            self._cleanup_registered = False
    def initialize(self):
        """Initialize the shared server and test environment."""
        if self.initialized:
            return
        print("Initializing shared FalconMCP server for E2E tests...")
        self.server_config["loop"] = asyncio.new_event_loop()
        asyncio.set_event_loop(self.server_config["loop"])
        self.patchers["env"] = patch.dict(
            os.environ,
            {
                "FALCON_CLIENT_ID": "test-client-id",
                "FALCON_CLIENT_SECRET": "test-client-secret",
                "FALCON_BASE_URL": "https://api.test.crowdstrike.com",
                "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "test-openai-key"),
            },
        )
        self.patchers["env"].start()
        self.patchers["api"] = patch("falcon_mcp.client.APIHarnessV2")
        mock_apiharness_class = self.patchers["api"].start()
        self.patchers["mock_api_instance"] = MagicMock()
        self.patchers["mock_api_instance"].login.return_value = True
        self.patchers["mock_api_instance"].token_valid.return_value = True
        mock_apiharness_class.return_value = self.patchers["mock_api_instance"]
        server = FalconMCPServer(debug=False)
        self.server_config["thread"] = threading.Thread(
            target=server.run, args=("sse",)
        )
        self.server_config["thread"].daemon = True
        self.server_config["thread"].start()
        time.sleep(2)  # Wait for the server to initialize
        server_config = {"mcpServers": {"falcon": {"url": "http://127.0.0.1:8000/sse"}}}
        self.server_config["client"] = MCPClient(config=server_config)
        self.__class__.initialized = True
        # Register cleanup function to run when Python exits (only once)
        if not self._cleanup_registered:
            atexit.register(self.cleanup)
            self._cleanup_registered = True
        print("Shared FalconMCP server initialized successfully.")
    def cleanup(self):
        """Clean up the shared server and test environment."""
        if not self.initialized:
            return
        print("Cleaning up shared FalconMCP server...")
        try:
            # Write test results to file
            with open("test_results.json", "w", encoding="utf-8") as f:
                json.dump(self.test_config["results"], f, indent=4)
            if self.patchers["api"]:
                try:
                    self.patchers["api"].stop()
                except (RuntimeError, AttributeError) as e:
                    print(f"Warning: API patcher cleanup error: {e}")
            if self.patchers["env"]:
                try:
                    self.patchers["env"].stop()
                except (RuntimeError, AttributeError) as e:
                    print(f"Warning: Environment patcher cleanup error: {e}")
            if (
                self.server_config["loop"]
                and not self.server_config["loop"].is_closed()
            ):
                try:
                    self.server_config["loop"].close()
                    asyncio.set_event_loop(None)
                except RuntimeError as e:
                    print(f"Warning: Event loop cleanup error: {e}")
            # Reset state
            self.__class__.initialized = False
            self._cleanup_registered = False
            print("Shared FalconMCP server cleanup completed.")
        except (IOError, OSError) as e:
            print(f"Error during cleanup: {e}")
            # Still reset the state even if cleanup partially failed
            self.__class__.initialized = False
            self._cleanup_registered = False
# Global singleton instance
_shared_server = SharedTestServer()
def ensure_dict(data: Any) -> dict:
    """
    Return input if it is a dict, otherwise, attempt to convert it to a dict using json.loads
    """
    if isinstance(data, dict):
        return data
    return json.loads(data)
class BaseE2ETest(unittest.TestCase):
    """
    Base class for end-to-end tests for the Falcon MCP Server.
    This class sets up a live server in a separate thread, mocks the Falcon API,
    and provides helper methods for running tests with an MCP client and agent.
    The server is shared across all test classes that inherit from this base class.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.llm = None
        self.agent = None
    @classmethod
    def setUpClass(cls):
        """Set up the test environment for the entire class."""
        # Initialize the shared server
        _shared_server.initialize()
        # Set instance variables to point to shared resources
        cls.test_results = _shared_server.test_config["results"]
        cls._server_thread = _shared_server.server_config["thread"]
        cls._env_patcher = _shared_server.patchers["env"]
        cls._api_patcher = _shared_server.patchers["api"]
        cls._mock_api_instance = _shared_server.patchers["mock_api_instance"]
        cls.models_to_test = _shared_server.test_config["models_to_test"]
        cls.base_url = _shared_server.test_config["base_url"]
        cls.verbosity_level = _shared_server.test_config["verbosity_level"]
        cls.client = _shared_server.server_config["client"]
        cls.loop = _shared_server.server_config["loop"]
    @classmethod
    def tearDownClass(cls):
        """Tear down the test environment for the current class."""
        # Don't cleanup here - let atexit handle it
    def setUp(self):
        """Set up test fixtures before each test method."""
        self.assertTrue(
            self._server_thread.is_alive(), "Server thread did not start correctly."
        )
        self._mock_api_instance.reset_mock()
    async def _run_agent_stream(self, prompt: str) -> tuple[list, str]:
        """
        Run the agent stream for a given prompt and return the tools used and the final result.
        Args:
            prompt: The input prompt to send to the agent.
        Returns:
            A tuple containing the list of tool calls and the final string result from the agent.
        """
        result = ""
        tools = []
        await self.agent.initialize()
        async for event in self.agent.stream_events(prompt, manage_connector=False):
            event_type = event.get("event")
            data = event.get("data", {})
            name = event.get("name")
            if event_type == "on_tool_end" and name == "use_tool_from_server":
                tools.append(data)
            elif event_type == "on_chat_model_stream" and data.get("chunk"):
                result += str(data["chunk"].content)
        return tools, result
    def run_test_with_retries(
        self,
        test_name: str,
        test_logic_coro: callable,
        assertion_logic: callable,
    ):
        """
        Run a given test logic multiple times against different models and check for a success threshold.
        Args:
            test_name: The name of the test being run.
            test_logic_coro: An asynchronous function that runs the agent and returns tools and result.
            assertion_logic: A function that takes tools and result and performs assertions.
        """
        # Extract module name from the test class name
        module_name = self._get_module_name()
        success_count = 0
        total_runs = len(self.models_to_test) * RUNS_PER_TEST
        for model_name in self.models_to_test:
            self._setup_model_and_agent(model_name)
            success_count += self._run_model_tests(
                test_name, module_name, model_name, test_logic_coro, assertion_logic
            )
        self._assert_success_threshold(success_count, total_runs)
    def _setup_model_and_agent(self, model_name: str):
        """Set up the LLM and agent for a specific model."""
        # Initialize ChatOpenAI with base_url only if it's provided
        kwargs = {"model": model_name, "temperature": 0.7}
        if self.base_url:
            kwargs["base_url"] = self.base_url
        self.llm = ChatOpenAI(**kwargs)
        # Set agent verbosity based on pytest verbosity
        verbose_mode = self.verbosity_level > 0
        self.agent = MCPAgent(
            llm=self.llm,
            client=self.client,
            max_steps=20,
            verbose=verbose_mode,
            use_server_manager=True,
            memory_enabled=False,
        )
    def _run_model_tests(
        self,
        test_name: str,
        module_name: str,
        model_name: str,
        test_logic_coro: callable,
        assertion_logic: callable,
    ) -> int:
        """Run tests for a specific model and return success count."""
        model_success_count = 0
        for i in range(RUNS_PER_TEST):
            print(
                f"Running test {test_name} with model {model_name}, try {i + 1}/{RUNS_PER_TEST}"
            )
            run_result = {
                "test_name": test_name,
                "module_name": module_name,
                "model_name": model_name,
                "run_number": i + 1,
                "status": "failure",
                "failure_reason": None,
                "tools_used": None,
                "agent_result": None,
            }
            try:
                # Each test logic run needs a clean slate.
                self._mock_api_instance.reset_mock()
                tools, result = self.loop.run_until_complete(test_logic_coro())
                run_result.update(
                    {
                        "tools_used": tools,
                        "agent_result": result,
                    }
                )
                assertion_logic(tools, result)
                run_result["status"] = "success"
                model_success_count += 1
            except AssertionError as e:
                run_result["failure_reason"] = f"Assertion failed: {str(e)}"
                print(f"Assertion failed with model {model_name}, try {i + 1}: {e}")
            except Exception as e:
                # Catch any other exception that might occur during agent streaming or test execution
                # fmt: off
                run_result["failure_reason"] = f"Test execution failed: {type(e).__name__}: {str(e)}"
                print(f"Test execution failed with model {model_name}, try {i + 1}: {type(e).__name__}: {e}")
            finally:
                self.test_results.append(run_result)
        return model_success_count
    def _assert_success_threshold(self, success_count: int, total_runs: int):
        """Assert that the success rate meets the threshold."""
        success_rate = success_count / total_runs if total_runs > 0 else 0
        print(f"Success rate: {success_rate * 100:.2f}% ({success_count}/{total_runs})")
        self.assertGreaterEqual(
            success_rate,
            SUCCESS_THRESHOLD,
            f"Success rate of {success_rate * 100:.2f}% is below the required {SUCCESS_THRESHOLD * 100:.2f}% threshold.",
        )
    def _get_module_name(self) -> str:
        """
        Extract the module name from the test class name.
        Expected pattern: Test{ModuleName}ModuleE2E -> {ModuleName}
        """
        class_name = self.__class__.__name__
        # Remove 'Test' prefix and 'ModuleE2E' suffix
        if class_name.startswith("Test") and class_name.endswith("ModuleE2E"):
            module_name = class_name[
                4:-9
            ]  # Remove 'Test' (4 chars) and 'ModuleE2E' (9 chars)
            return module_name
        # Fallback: use the class name as-is if it doesn't match the expected pattern
        return class_name
    def _create_mock_api_side_effect(self, fixtures: list) -> callable:
        """Create a side effect function for the `mock API` based on a list of fixtures."""
        def mock_api_side_effect(operation: str, **kwargs: dict) -> dict:
            print(f"Mock API called with: operation={operation}, kwargs={kwargs}")
            for fixture in fixtures:
                if fixture["operation"] == operation and fixture["validator"](kwargs):
                    print(
                        f"Found matching fixture for {operation}, returning {fixture['response']}"
                    )
                    return fixture["response"]
            print(f"No matching fixture found for {operation}")
            return {"status_code": 200, "body": {"resources": []}}
        return mock_api_side_effect
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_incidents.py:
--------------------------------------------------------------------------------
```python
"""
E2E tests for the Incidents module.
"""
import json
import unittest
import pytest
from tests.e2e.utils.base_e2e_test import BaseE2ETest, ensure_dict
@pytest.mark.e2e
class TestIncidentsModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Incidents Module.
    """
    def test_crowd_score_default_parameters(self):
        """Verify the agent can retrieve CrowdScore with default parameters."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "CrowdScore",
                    "validator": lambda kwargs: kwargs.get("parameters", {}).get(
                        "limit"
                    )
                    == 100,
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {"id": "score-1", "score": 50, "adjusted_score": 60},
                                {"id": "score-2", "score": 70, "adjusted_score": 80},
                                {"id": "score-3", "score": 40, "adjusted_score": 50},
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "What is our current CrowdScore?"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_show_crowd_score")
            # Verify the output contains the expected data
            output = json.loads(used_tool["output"])
            self.assertEqual(
                output["average_score"], 53
            )  # (50+70+40)/3 = 53.33 rounded to 53
            self.assertEqual(
                output["average_adjusted_score"], 63
            )  # (60+80+50)/3 = 63.33 rounded to 63
            self.assertEqual(len(output["scores"]), 3)
            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                1,
                "Expected at least 1 API call",
            )
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            self.assertEqual(api_call_params.get("limit"), 100)  # Default limit
            self.assertEqual(api_call_params.get("offset"), 0)  # Default offset
            # Verify result contains CrowdScore information
            self.assertIn("CrowdScore", result)
            self.assertIn("53", result)  # Average score should be mentioned
        self.run_test_with_retries(
            "test_crowd_score_default_parameters", test_logic, assertions
        )
    def test_search_incidents_with_filter(self):
        """Verify the agent can search for incidents with a filter."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryIncidents",
                    "validator": lambda kwargs: "state:'open'"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {"resources": ["incident-1", "incident-2"]},
                    },
                },
                {
                    "operation": "GetIncidents",
                    "validator": lambda kwargs: "incident-1"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "incident-1",
                                    "name": "Test Incident 1",
                                    "description": "This is a test incident",
                                    "status": 20,  # New
                                    "state": "open",
                                    "final_score": 80,
                                    "start": "2023-01-01T00:00:00Z",
                                    "end": "2023-01-02T00:00:00Z",
                                },
                                {
                                    "id": "incident-2",
                                    "name": "Test Incident 2",
                                    "description": "This is another test incident",
                                    "status": 30,  # In Progress
                                    "state": "open",
                                    "final_score": 65,
                                    "start": "2023-01-03T00:00:00Z",
                                    "end": "2023-01-04T00:00:00Z",
                                },
                            ]
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Find all open incidents"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_incidents")
            # Verify the tool input contains the filter
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("open", tool_input.get("filter", "").lower())
            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                2,
                "Expected at least 2 API calls",
            )
            # Check QueryIncidents call
            api_call_1_params = self._mock_api_instance.command.call_args_list[0][
                1
            ].get("parameters", {})
            self.assertIn("state:'open'", api_call_1_params.get("filter", ""))
            # Check GetIncidents call
            api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
                "body", {}
            )
            self.assertEqual(api_call_2_body.get("ids"), ["incident-1", "incident-2"])
            # Verify result contains incident information
            self.assertIn("incident-1", result)
            self.assertIn("Test Incident 1", result)
            self.assertIn("incident-2", result)
            self.assertIn("Test Incident 2", result)
        self.run_test_with_retries(
            "test_search_incidents_with_filter", test_logic, assertions
        )
    def test_get_incident_details(self):
        """Verify the agent can get details for specific incidents."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "GetIncidents",
                    "validator": lambda kwargs: "incident-3"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "incident-3",
                                    "name": "High Priority Incident",
                                    "description": "Critical security incident requiring immediate attention",
                                    "status": 30,  # In Progress
                                    "state": "open",
                                    "final_score": 95,
                                    "start": "2023-02-01T00:00:00Z",
                                    "end": "2023-02-02T00:00:00Z",
                                    "tags": ["Critical", "Security Breach"],
                                    "host_ids": ["host-1", "host-2"],
                                }
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Get details for incident with ID incident-3"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_get_incident_details"
            )
            # Verify the tool input contains the incident ID
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("incident-3", tool_input.get("ids", []))
            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                1,
                "Expected at least 1 API call",
            )
            api_call_body = self._mock_api_instance.command.call_args_list[0][1].get(
                "body", {}
            )
            self.assertEqual(api_call_body.get("ids"), ["incident-3"])
            # Verify result contains incident information
            self.assertIn("incident-3", result)
            self.assertIn("High Priority Incident", result)
            self.assertIn("Critical security incident", result)
            self.assertIn("95", result)  # Score
        self.run_test_with_retries("test_get_incident_details", test_logic, assertions)
    def test_search_behaviors(self):
        """Verify the agent can search for behaviors with a filter."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryBehaviors",
                    "validator": lambda kwargs: "tactic:'Defense Evasion'"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {"resources": ["behavior-1", "behavior-2"]},
                    },
                },
                {
                    "operation": "GetBehaviors",
                    "validator": lambda kwargs: "behavior-1"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "behavior-1",
                                    "tactic": "Defense Evasion",
                                },
                                {
                                    "id": "behavior-2",
                                    "tactic": "Defense Evasion",
                                },
                            ]
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Find behaviors with the tactic 'Defense Evasion'"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_behaviors")
            # Verify the tool input contains the filter
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("tactic", tool_input.get("filter", "").lower())
            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                2,
                "Expected at least 2 API calls",
            )
            # Check QueryBehaviors call
            api_call_1_params = self._mock_api_instance.command.call_args_list[0][
                1
            ].get("parameters", {})
            self.assertIn(
                "tactic:'Defense Evasion'", api_call_1_params.get("filter", "")
            )
            # Check GetBehaviors call
            api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
                "body", {}
            )
            self.assertEqual(api_call_2_body.get("ids"), ["behavior-1", "behavior-2"])
            # Verify result contains behavior information
            self.assertIn("behavior-1", result)
            self.assertIn("behavior-2", result)
            self.assertIn("Defense Evasion", result)
        self.run_test_with_retries("test_search_behaviors", test_logic, assertions)
    def test_get_behavior_details(self):
        """Verify the agent can get details for specific behaviors."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "GetBehaviors",
                    "validator": lambda kwargs: "behavior-3"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "behavior-3",
                                    "tactic": "Exfiltration",
                                }
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Get details for behavior with ID behavior-3"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_get_behavior_details"
            )
            # Verify the tool input contains the behavior ID
            tool_input = ensure_dict(used_tool["input"]["tool_input"])
            self.assertIn("behavior-3", tool_input.get("ids", []))
            # Verify API call parameters
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count,
                1,
                "Expected at least 1 API call",
            )
            api_call_body = self._mock_api_instance.command.call_args_list[0][1].get(
                "body", {}
            )
            self.assertEqual(api_call_body.get("ids"), ["behavior-3"])
            # Verify result contains behavior information
            self.assertIn("behavior-3", result)
            self.assertIn("Exfiltration", result)
        self.run_test_with_retries("test_get_behavior_details", test_logic, assertions)
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/docs/module_development.md:
--------------------------------------------------------------------------------
```markdown
# Falcon MCP Server Module Development Guide
This guide provides instructions for implementing new modules for the Falcon MCP server.
## Module Structure
Each module should:
1. Inherit from the `BaseModule` class
2. Implement the `register_tools` method
3. Define tool methods that interact with the Falcon API
4. Use common utilities for configuration, logging, error handling, and API interactions
## Step-by-Step Implementation Guide
### 1. Create a New Module File
Create a new file in the `falcon_mcp/modules` directory:
```python
"""
[Module Name] module for Falcon MCP Server
This module provides tools for [brief description].
"""
from typing import Dict, List, Optional, Any
from mcp.server import FastMCP
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.utils import prepare_api_parameters, extract_first_resource
from falcon_mcp.modules.base import BaseModule
class YourModule(BaseModule):
    """Module for [description]."""
    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.
        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.your_tool_method,
            name="your_tool_name",
        )
        # Add more tools as needed
    def your_tool_method(self, param1: str, param2: Optional[int] = None) -> Dict[str, Any]:
        """Description of what your tool does.
        Args:
            param1: Description of param1
            param2: Description of param2
        Returns:
            Tool result description
        """
        # Prepare parameters
        params = prepare_api_parameters({
            "param1": param1,
            "param2": param2,
        })
        # Define the operation name (used for error handling)
        operation = "YourFalconAPIOperation"
        # Make the API request
        response = self.client.command(operation, parameters=params)
        # Handle the response
        return handle_api_response(
            response,
            operation=operation,
            error_message="Failed to perform operation",
            default_result={},
        )
```
### 2. Update API Scope Requirements
Add your API operations to the `API_SCOPE_REQUIREMENTS` dictionary in `falcon_mcp/common/errors.py`:
```python
API_SCOPE_REQUIREMENTS = {
    # Existing operations...
    "YourFalconAPIOperation": ["required:scope"],
    # Add more operations as needed
}
```
### 3. Module Auto-Discovery
Modules are automatically discovered by the registry system. You don't need to call any registration functions or add imports:
1. Create your module class in the `falcon_mcp/modules` directory (e.g., `your_module.py`)
2. Make sure it inherits from `BaseModule`
3. **Modules are automatically discovered** - no manual imports or registration needed
The server will automatically discover and register your module during initialization. The module name will be derived
from the class name (e.g., `YourModule` becomes `your`).
During server initialization, the registry system will:
1. Scan the modules directory using `pkgutil.iter_modules()`
2. Dynamically import each module file using `importlib.import_module()`  
3. Find classes that end with "Module" (excluding BaseModule) via introspection
4. Register them in the `AVAILABLE_MODULES` dictionary
5. Make them available to the server
This approach eliminates manual registration while maintaining a clean architecture that avoids cyclic imports.
### 4. Add Tests
Create a test file in the `tests/modules` directory that inherits from the `TestModules` base class:
```python
"""
Tests for the YourModule module.
"""
from falcon_mcp.modules.your_module import YourModule
from tests.modules.utils.test_modules import TestModules
class TestYourModule(TestModules):
    """Test cases for the YourModule module."""
    def setUp(self):
        """Set up test fixtures."""
        self.setup_module(YourModule)
    def test_register_tools(self):
        """Test registering tools with the server."""
        expected_tools = [
            "falcon_your_tool_name",
            # Add other tools here
        ]
        self.assert_tools_registered(expected_tools)
    def test_your_tool_method(self):
        """Test your tool method."""
        # Setup mock response
        mock_response = {
            "status_code": 200,
            "body": {
                "resources": [{"id": "test", "name": "Test Resource"}]
            }
        }
        self.mock_client.command.return_value = mock_response
        # Call your tool method
        result = self.module.your_tool_method("test_param", 123)
        # Verify client command was called correctly
        self.mock_client.command.assert_called_once_with(
            "YourFalconAPIOperation",
            parameters={"param1": "test_param", "param2": 123}
        )
        # Verify result
        expected_result = [{"id": "test", "name": "Test Resource"}]
        self.assertEqual(result, expected_result)
    def test_your_tool_method_error(self):
        """Test your tool method with API error."""
        # Setup mock response with error
        mock_response = {
            "status_code": 403,
            "body": {
                "errors": [{"message": "Access denied"}]
            }
        }
        self.mock_client.command.return_value = mock_response
        # Call your tool method
        result = self.module.your_tool_method("test_param")
        # Verify result contains error
        self.assertIn("error", result)
        self.assertIn("details", result)
```
The `TestModules` base class provides:
1. A `setup_module()` method that handles the common setup of mocking the client and server
2. An `assert_tools_registered()` helper method to verify tool registration
This approach simplifies test code and ensures consistency across all module tests.
## Contributing Module Changes
When contributing new modules or changes to existing modules, please follow these guidelines:
### Conventional Commits for Modules
This project uses [Conventional Commits](https://www.conventionalcommits.org/) for automated releases and clear commit history. When contributing module-related changes, use these commit message patterns:
**Adding New Modules:**
```bash
git commit -m "feat(modules): add [module-name] module for [functionality]"
# Examples:
git commit -m "feat(modules): add spotlight module for vulnerability management"
git commit -m "feat(modules): add intel module for threat intelligence analysis"
```
**Adding Tools to Existing Modules (Preferred - More Specific Scoping):**
```bash
git commit -m "feat(modules/[module-name]): add [specific-functionality]"
# Examples:
git commit -m "feat(modules/cloud): add list kubernetes clusters tool"
git commit -m "feat(modules/hosts): add list devices tool"
git commit -m "feat(modules/detections): add advanced filtering capability"
```
**Modifying Existing Modules:**
```bash
git commit -m "feat(modules/[module-name]): enhance [specific-functionality]"
git commit -m "fix(modules/[module-name]): resolve [specific-issue]"
# Examples:
git commit -m "feat(modules/detections): enhance FQL filtering with new operators"
git commit -m "fix(modules/hosts): resolve authentication error in search function"
```
**General Module Changes (Less Preferred but Acceptable):**
```bash
git commit -m "feat(modules): enhance [module-name] with [new-functionality]"
git commit -m "fix(modules): resolve [issue] in [module-name] module"
# Examples:
git commit -m "feat(modules): enhance detections module with FQL filtering"
git commit -m "fix(modules): resolve authentication error in hosts module"
```
**Module Tests and Documentation:**
```bash
git commit -m "test(modules): add comprehensive tests for [module-name] module"
git commit -m "docs(modules): update [module-name] module documentation"
```
See the main [CONTRIBUTING.md](CONTRIBUTING.md) guide for complete conventional commits guidelines.
## Best Practices
### Error Handling
1. **Use Common Error Utilities**: Always use `handle_api_response` for API responses instead of manual status code checks
2. **Provide Operation Names**: Include the operation name for better error messages and permission handling
3. **Custom Error Messages**: Use descriptive error messages for each operation
4. **Consistent Error Format**: Ensure error responses follow the standard format with `error` and optional `details` fields
Example of proper error handling:
```python
# Make the API request
response = self.client.command(operation, parameters=params)
# Use handle_api_response to process the response
result = handle_api_response(
    response,
    operation=operation,
    error_message="Failed to perform operation",
    default_result=[]
)
# Check if the result is an error response
if isinstance(result, dict) and "error" in result:
    # Handle error case
    return result  # or wrap it in a list if returning to a tool expecting a list
```
### Parameter Handling
1. **Use prepare_api_parameters**: Filter out None values and format parameters
2. **Type Annotations**: Always include type annotations for parameters and return values
3. **Default Values**: Provide sensible defaults for optional parameters
### Response Processing
1. **Use extract_resources**: Extract resources from API responses
2. **Handle Empty Results**: Provide appropriate defaults for empty results
3. **Return Structured Data**: Return well-structured data that follows consistent patterns
### Documentation
1. **Docstrings**: Include detailed docstrings for all classes and methods. Tool descriptions are derived from method docstrings, so make sure they are comprehensive and well-written.
2. **Parameter Descriptions**: Document all parameters and return values
3. **Examples**: Include examples in docstrings where helpful
### Testing
1. **Test All Tools**: Write tests for all tools in your module
2. **Test Error Cases**: Include tests for error scenarios
3. **Mock API Responses**: Use mock responses for testing
## Common Utilities Reference
### Configuration (`falcon_mcp/common/config.py`)
- `FalconConfig`: Configuration class for the Falcon MCP server
- `load_config`: Load configuration from environment variables and arguments
### Logging (`falcon_mcp/common/logging.py`)
- `configure_logging`: Configure logging for the Falcon MCP server
- `get_logger`: Get a logger with the specified name
### Error Handling (`falcon_mcp/common/errors.py`)
- `is_success_response`: Check if an API response indicates success
- `get_required_scopes`: Get the required API scopes for a specific operation
- `_format_error_response`: Format an error as a standardized response
- `handle_api_response`: Handle an API response, returning either the result or an error
### Utilities (`falcon_mcp/common/utils.py`)
- `filter_none_values`: Remove None values from a dictionary
- `prepare_api_parameters`: Prepare parameters for Falcon API requests
- `extract_resources`: Extract resources from an API response
- `extract_first_resource`: Extract the first resource from an API response
## Example: Implementing a Hosts Module
Here's an example of implementing a Hosts module that provides tools for accessing and managing hosts in the Falcon platform:
```python
"""
Hosts module for Falcon MCP Server
This module provides tools for accessing and managing CrowdStrike Falcon hosts.
"""
from typing import Dict, List, Optional, Any
from mcp.server import FastMCP
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.utils import prepare_api_parameters, extract_resources, extract_first_resource
from falcon_mcp.modules.base import BaseModule
class HostsModule(BaseModule):
    """Module for accessing and managing CrowdStrike Falcon hosts."""
    def register_tools(self, server: FastMCP) -> None:
        """Register tools with the MCP server.
        Args:
            server: MCP server instance
        """
        # Register tools
        self._add_tool(
            server=server,
            method=self.search_hosts,
            name="search_hosts",
        )
        self._add_tool(
            server=server,
            method=self.get_host_details,
            name="get_host_details",
        )
        self._add_tool(
            server=server,
            method=self.get_host_count,
            name="get_host_count",
        )
    def search_hosts(self, query: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]:
        """Search for hosts in your CrowdStrike environment.
        Args:
            query: FQL query string to filter hosts
            limit: Maximum number of results to return
        Returns:
            List of host details
        """
        # Prepare parameters
        params = prepare_api_parameters({
            "filter": query,
            "limit": limit,
        })
        # Define the operation name
        operation = "QueryDevices"
        # Make the API request
        response = self.client.command(operation, parameters=params)
        # Handle the response
        host_ids = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to search hosts",
            default_result=[],
        )
        # If we have host IDs, get the details for each one
        if host_ids:
            details_operation = "GetDeviceDetails"
            details_response = self.client.command(
                details_operation,
                body={"ids": host_ids}
            )
            return handle_api_response(
                details_response,
                operation=details_operation,
                error_message="Failed to get host details",
                default_result=[],
            )
        return []
    def get_host_details(self, host_id: str) -> Dict[str, Any]:
        """Get detailed information about a specific host.
        Args:
            host_id: The ID of the host to retrieve
        Returns:
            Host details
        """
        # Define the operation name
        operation = "GetDeviceDetails"
        # Make the API request
        response = self.client.command(
            operation,
            body={"ids": [host_id]},
        )
        # Extract the first resource
        return extract_first_resource(
            response,
            operation=operation,
            not_found_error="Host not found",
        )
    def get_host_count(self, query: Optional[str] = None) -> Dict[str, int]:
        """Get the count of hosts matching a query.
        Args:
            query: FQL query string to filter hosts
        Returns:
            Dictionary with host count
        """
        # Prepare parameters
        params = prepare_api_parameters({
            "filter": query,
        })
        # Define the operation name
        operation = "QueryDevices"
        # Make the API request
        response = self.client.command(operation, parameters=params)
        # Use handle_api_response to get host IDs
        host_ids = handle_api_response(
            response,
            operation=operation,
            error_message="Failed to get host count",
            default_result=[],
        )
        # If handle_api_response returns an error dict instead of a list,
        # it means there was an error, so we return it with a count of 0
        if isinstance(host_ids, dict) and "error" in host_ids:
            return {"count": 0, **host_ids}
        return {"count": len(host_ids)}
```
Don't forget to update the `API_SCOPE_REQUIREMENTS` dictionary in `falcon_mcp/common/errors.py`:
```python
API_SCOPE_REQUIREMENTS = {
    # Existing operations...
    "QueryDevices": ["hosts:read"],
    "GetDeviceDetails": ["hosts:read"],
    # Add more operations as needed
}
```
The module will be automatically discovered by the registry system - no manual imports or registration needed.
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/intel.py:
--------------------------------------------------------------------------------
```python
"""
Contains Intel resources.
"""
from falcon_mcp.common.utils import generate_md_table
QUERY_ACTOR_ENTITIES_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Description",
    ),
    (
        "id",
        "Number",
        """
        The adversary's ID.
        Ex: 2583
        """
    ),
    (
        "actor_type",
        "String",
        """
        The type of adversary.
        Ex: "targeted"
        """
    ),
    (
        "actors.id",
        "Number",
        """
        The ID of an associated actor.
        Ex: 1823
        """
    ),
    (
        "actors.name",
        "String",
        """
        The name of an associated actor.
        Ex: "VENOMOUS BEAR"
        """
    ),
    (
        "actors.slug",
        "String",
        """
        The URL-friendly identifier of an associated actor.
        Ex: "venomous-bear"
        """
    ),
    (
        "actors.url",
        "String",
        """
        The URL to the actor's profile page.
        Ex: "https://falcon.crowdstrike.com/intelligence/actors/venomous-bear/"
        """
    ),
    (
        "animal_classifier",
        "String",
        """
        The animal classification assigned to the adversary.
        Ex: "BEAR"
        """
    ),
    (
        "capability.value",
        "String",
        """
        The adversary's capability.
        Ex: "average"
        """
    ),
    (
        "created_date",
        "Timestamp",
        """
        Timestamp when the actor entity was created.
        Ex: 1441729727
        """
    ),
    (
        "description",
        "String",
        """
        A detailed description of the adversary.
        Ex: "VENOMOUS BEAR is a sophisticated Russia-based adversary..."
        """
    ),
    (
        "first_activity_date",
        "Timestamp",
        """
        First activity date.
        Ex: 1094660880
        """
    ),
    (
        "known_as",
        "String",
        """
        The adversary's alias.
        Ex: "dridex"
        """
    ),
    (
        "last_activity_date",
        "Timestamp",
        """
        Last activity date.
        Ex: 1749427200
        """
    ),
    (
        "last_modified_date",
        "Timestamp",
        """
        Timestamp when the actor entity was last modified.
        Ex: 1754320661
        """
    ),
    (
        "motivations.id",
        "Number",
        """
        The ID of a motivation associated with the adversary.
        Ex: 1001485
        """
    ),
    (
        "motivations.slug",
        "String",
        """
        The URL-friendly identifier of a motivation.
        Ex: "state-sponsored"
        """
    ),
    (
        "motivations.value",
        "String",
        """
        The display name of a motivation.
        Ex: "State-Sponsored"
        """
    ),
    (
        "name",
        "String",
        """
        The adversary's name.
        Ex: "FANCY BEAR"
        """
    ),
    (
        "origins.slug",
        "String",
        """
        The adversary's country of origin slug.
        Ex: "ru"
        """
    ),
    (
        "origins.value",
        "String",
        """
        The adversary's country of origin.
        Ex: "Afghanistan"
        """
    ),
    (
        "short_description",
        "String",
        """
        A truncated version of the adversary's description.
        Ex: "VENOMOUS BEAR is a sophisticated Russia-based adversary..."
        """
    ),
    (
        "slug",
        "String",
        """
        The URL-friendly identifier of the adversary.
        Ex: "fancy-bear"
        """
    ),
    (
        "target_countries.id",
        "Number",
        """
        The ID of a target country.
        Ex: 1
        """
    ),
    (
        "target_countries.slug",
        "String",
        """
        The URL-friendly identifier of a target country.
        Ex: "us"
        """
    ),
    (
        "target_countries.value",
        "String",
        """
        The display name of a target country.
        Ex: "United States"
        """
    ),
    (
        "target_industries.id",
        "Number",
        """
        The ID of a target industry.
        Ex: 344
        """
    ),
    (
        "target_industries.slug",
        "String",
        """
        The URL-friendly identifier of a target industry.
        Ex: "government"
        """
    ),
    (
        "target_industries.value",
        "String",
        """
        The display name of a target industry.
        Ex: "Government"
        """
    ),
    (
        "url",
        "String",
        """
        The URL to the adversary's profile page.
        Ex: "https://falcon.crowdstrike.com/intelligence/actors/fancy-bear/"
        """
    ),
]
QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Intel Query Actor Entities Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
• * = wildcard matching (one or more characters)
=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '_partial_'
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== falcon_search_actors FQL filter options ===
""" + generate_md_table(QUERY_ACTOR_ENTITIES_FQL_FILTERS) + """
=== EXAMPLE USAGE ===
• animal_classifier:'BEAR'
• name:'FANCY BEAR'
• animal_classifier:'BEAR',animal_classifier:'SPIDER'
=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches: ['exact_value']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
"""
QUERY_INDICATOR_ENTITIES_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Description"
    ),
    (
        "id",
        "String",
        """
        The indicator ID. It follows the format: {type}_{indicator}
        """
    ),
    (
        "created_date",
        "Timestamp",
        """
        Timestamp in standard Unix time, UTC when the indicator was created.
        Ex: 1753022288
        """
    ),
    (
        "deleted",
        "Boolean",
        """
        If true, include only published indicators.
        If false, include only deleted indicators.
        Ex: false
        """
    ),
    (
        "domain_types",
        "String",
        """
        The domain type of domain indicators.
        Possible values include:
        - ActorControlled
        - DGA
        - DynamicDNS
        - KnownGood
        - LegitimateCompromised
        - PhishingDomain
        - Sinkholed
        - StrategicWebCompromise
        - Unregistered
        """
    ),
    (
        "indicator",
        "String",
        """
        The indicator that was queried.
        Ex: "all-deutsch.gl.at.ply.gg"
        """
    ),
    (
        "ip_address_types",
        "String",
        """
        The address type of ip_address indicators.
        Possible values include:
        - HtranDestinationNode
        - HtranProxy
        - LegitimateCompromised
        - Parking
        - PopularSite
        - SharedWebHost
        - Sinkhole
        - TorProxy
        """
    ),
    (
        "kill_chains",
        "String",
        """
        The point in the kill chain at which an indicator is associated.
        Possible values include:
        - reconnaissance
        - weaponization
        - delivery
        - exploitation
        - installation
        - c2 (Command and Control)
        - actionOnObjectives
        Ex: "delivery"
        """
    ),
    (
        "last_updated",
        "Timestamp",
        """
        Timestamp in standard Unix time, UTC when the indicator was last updated in the internal database.
        Ex: 1753027269
        """
    ),
    (
        "malicious_confidence",
        "String",
        """
        Indicates a confidence level by which an indicator is considered to be malicious.
        Possible values:
        - high: If indicator is an IP or domain, it has been associated with malicious activity within the last 60 days.
        - medium: If indicator is an IP or domain, it has been associated with malicious activity within the last 60-120 days.
        - low: If indicator is an IP or domain, it has been associated with malicious activity exceeding 120 days.
        - unverified: This indicator has not been verified by a CrowdStrike Intelligence analyst or an automated system.
        Ex: "high"
        """
    ),
    (
        "malware_families",
        "String",
        """
        Indicates the malware family an indicator has been associated with. An indicator might be associated with more than one malware family.
        Ex: "Xworm", "njRATLime"
        """
    ),
    (
        "published_date",
        "Timestamp",
        """
        Timestamp in standard Unix time, UTC when the indicator was first published to the API.
        Ex: 1753022288
        """
    ),
    (
        "reports",
        "String",
        """
        The report ID that the indicator is associated with (such as CSIT-XXXX or CSIR-XXXX).
        The report list is also represented under the labels list in the JSON data structure.
        """
    ),
    (
        "targets",
        "String",
        """
        The indicators targeted industries.
        Possible values include sectors like:
        - Aerospace
        - Agricultural
        - Chemical
        - Defense
        - Dissident
        - Energy
        - Financial
        - Government
        - Healthcare
        - Technology
        """
    ),
    (
        "threat_types",
        "String",
        """
        Types of threats.
        Ex: "ddos", "mineware", "banking"
        """
    ),
    (
        "type",
        "String",
        """
        Possible indicator types include:
        - binary_string
        - compile_time
        - device_name
        - domain
        - email_address
        - email_subject
        - event_name
        - file_mapping
        - file_name
        - file_path
        - hash_ion
        - hash_md5
        - hash_sha256
        - ip_address
        - ip_address_block
        - mutex_name
        - password
        - persona_name
        - phone_number
        - port
        - registry
        - semaphore_name
        - service_name
        - url
        - user_agent
        - username
        - x509_seria
        - x509_subject
        Ex: "domain"
        """
    ),
    (
        "vulnerabilities",
        "String",
        """
        Associated vulnerabilities (CVEs).
        Ex: "CVE-2023-1234"
        """
    ),
]
QUERY_INDICATOR_ENTITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Intel Query Indicator Entities Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
• * = wildcard matching (one or more characters)
=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '*partial*'
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== falcon_search_indicators FQL filter options ===
""" + generate_md_table(QUERY_INDICATOR_ENTITIES_FQL_FILTERS) + """
=== EXAMPLE USAGE ===
• type:'domain'
• malicious_confidence:'high'
• type:'hash_md5'+malicious_confidence:'high'
• created_date:>'2023-01-01T00:00:00Z'
=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches: ['exact_value']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
"""
QUERY_REPORT_ENTITIES_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Description",
    ),
    (
        "id",
        "Number",
        """
        The report's ID.
        Ex: 2583
        """
    ),
    (
        "actors",
        "String",
        """
        Names of adversaries included in a report.
        Ex: "FANCY BEAR"
        """
    ),
    (
        "created_date",
        "Timestamp",
        """
        Timestamp in Unix epoch format when the report was created.
        Ex: 1754075803
        """
    ),
    (
        "description",
        "String",
        """
        A detailed description of the report.
        Ex: "In mid-July 2025, CrowdStrike Intelligence identified infrastructure..."
        """
    ),
    (
        "last_modified_date",
        "Timestamp",
        """
        Timestamp in Unix epoch format when the report was last modified.
        Ex: 1754076191
        """
    ),
    (
        "motivations.value",
        "String",
        """
        Motivations included in the report.
        Ex: "Criminal", "State-Sponsored"
        """
    ),
    (
        "name",
        "String",
        """
        The report's name.
        Ex: "CSA-250861 Newly Identified HAYWIRE KITTEN Infrastructure Associated with Microsoft Phishing Campaign"
        """
    ),
    (
        "type",
        "String",
        """
        The type of report.
        Ex: "notice", "tipper", "periodic-report"
        """
    ),
    (
        "short_description",
        "String",
        """
        A truncated version of the report's description.
        Ex: "Adversary: HAYWIRE KITTEN || Target Industry: Technology, Renewable Energy..."
        """
    ),
    (
        "slug",
        "String",
        """
        The URL-friendly identifier of the report.
        Ex: "csa-250861", "csit-25151"
        """
    ),
    (
        "sub_type",
        "String",
        """
        The subtype of the report.
        Ex: "daily", "yara"
        """
    ),
    (
        "tags",
        "String",
        """
        The report's tags.
        Ex: "ransomware", "espionage", "vulnerabilities"
        """
    ),
    (
        "target_countries",
        "String",
        """
        Targeted countries included in the report.
        Ex: "United States", "Taiwan", "Western Europe"
        """
    ),
    (
        "target_industries",
        "String",
        """
        Targeted industries included in the report.
        Ex: "Technology", "Government", "Healthcare"
        """
    ),
    (
        "url",
        "String",
        """
        The URL to the report's page.
        Ex: "https://falcon.crowdstrike.com/intelligence/reports/csa-250861"
        """
    ),
]
QUERY_REPORT_ENTITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Intel Query Report Entities Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
• * = wildcard matching (one or more characters)
=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
• Wildcards: 'partial*' or '*partial' or '*partial*'
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== falcon_search_reports FQL filter options ===
""" + generate_md_table(QUERY_REPORT_ENTITIES_FQL_FILTERS) + """
=== EXAMPLE USAGE ===
• report_type:'malware'
• name:'*ransomware*'
• created_date:>'2023-01-01T00:00:00Z'
• target_industries:'healthcare'
=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches: ['exact_value']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
"""
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_cloud.py:
--------------------------------------------------------------------------------
```python
"""
E2E tests for the Cloud module.
"""
import json
import unittest
import pytest
from tests.e2e.utils.base_e2e_test import BaseE2ETest
@pytest.mark.e2e
class TestCloudModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Cloud Module.
    """
    def test_search_kubernetes_containers_running(self):
        """Verify the agent can search for kubernetes containers that are running."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "ReadContainerCombined",
                    "validator": lambda kwargs: "running_status"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "container_id": "container-001",
                                    "agents": [
                                        {
                                            "aid": "558ce490595748d6a67b16969797d655",
                                            "build": "0000",
                                            "type": "Falcon sensor for linux",
                                        },
                                    ],
                                    "cloud_name": "AWS",
                                    "cloud_account_id": "00001",
                                    "cloud_region": "ca-central-1",
                                    "cluster_name": "production",
                                    "first_seen": "2025-05-27T03:04:10Z",
                                    "image_registry": "docker.amazonaws.com",
                                    "image_repository": "myservice",
                                    "image_tag": "v1.0.0",
                                    "image_vulnerability_count": 361,
                                    "last_seen": "2025-07-13T19:53:07Z",
                                    "container_name": "myservice",
                                    "namespace": "default",
                                    "running_status": True,
                                },
                                {
                                    "container_id": "container-002",
                                    "agents": [
                                        {
                                            "aid": "523c3113363845d4a6da493a29caa924",
                                            "build": "0000",
                                            "type": "Falcon sensor for linux",
                                        },
                                    ],
                                    "cloud_name": "AWS",
                                    "cloud_account_id": "00001",
                                    "cloud_region": "us-1",
                                    "cluster_name": "production",
                                    "first_seen": "2025-06-27T03:04:10Z",
                                    "image_registry": "docker.amazonaws.com",
                                    "image_repository": "myservice",
                                    "image_tag": "v1.0.0",
                                    "image_vulnerability_count": 361,
                                    "last_seen": "2025-07-13T19:53:07Z",
                                    "container_name": "myservice",
                                    "namespace": "default",
                                    "running_status": True,
                                },
                            ],
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Find all kubernetes containers that are running"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_search_kubernetes_containers"
            )
            # Check for the filter for running status
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "running_status" in tool_input_str,
                f"Expected running status filtering in tool input: {tool_input_str}",
            )
            self.assertIn("container-001", used_tool["output"])
            self.assertIn("container-002", used_tool["output"])
            # Verify API calls were made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (ReadContainerCombined)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            self.assertTrue(
                "running_status" in filter_str,
                f"Expected running_status filtering in API call: {filter_str}",
            )
            # Verify result contains expected information
            self.assertIn("container-001", result)
            self.assertIn("container-002", result)
        self.run_test_with_retries(
            "test_search_kubernetes_containers_running", test_logic, assertions
        )
    def test_search_kubernetes_container_with_vulnerabilities(self):
        """Verify the agent can search for kubernetes containers have image vulnerabilities and sort them
        by image_vulnerability_count in descending order.
        """
        async def test_logic():
            fixtures = [
                {
                    "operation": "ReadContainerCombined",
                    "validator": lambda kwargs: "image_vulnerability_count"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "container_id": "container-001",
                                    "agents": [
                                        {
                                            "aid": "558ce490595748d6a67b16969797d655",
                                            "build": "0000",
                                            "type": "Falcon sensor for linux",
                                        },
                                    ],
                                    "cloud_name": "AWS",
                                    "cloud_account_id": "00001",
                                    "cloud_region": "ca-central-1",
                                    "cluster_name": "production",
                                    "first_seen": "2025-05-27T03:04:10Z",
                                    "image_registry": "docker.amazonaws.com",
                                    "image_repository": "myservice",
                                    "image_tag": "v1.0.0",
                                    "image_vulnerability_count": 361,
                                    "last_seen": "2025-07-13T19:53:07Z",
                                    "container_name": "myservice",
                                    "namespace": "default",
                                    "running_status": True,
                                },
                            ],
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Find top 1 kubernetes container that is running and have image vulnerabilities."  # fmt: skip
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_search_kubernetes_containers"
            )
            # Check for the filter for image_vulnerability_count
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "image_vulnerability_count" in tool_input_str,
                f"Expected image_vulnerability_count filtering in tool input: {tool_input_str}",
            )
            self.assertIn("container-001", used_tool["output"])
            # Verify API calls were made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (ReadContainerCombined)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            self.assertTrue(
                "image_vulnerability_count:>0" in filter_str,
                f"Expected image_vulnerability_count filtering in API call: {filter_str}",
            )
            # Verify result contains expected information
            self.assertIn("container-001", result)
            self.assertIn("361", result)  # vulnerability count
        self.run_test_with_retries(
            "test_search_kubernetes_container_with_vulnerabilities",
            test_logic,
            assertions,
        )
    def test_count_kubernetes_containers_by_cloud_name(self):
        """Verify the agent can aggregate kubernetes containers by cloud name."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "ReadContainerCount",
                    "validator": lambda kwargs: "cloud_name"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "count": 333,
                                },
                            ],
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "How many kubernetes containers do I have in cloud provider AWS?"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"], "falcon_count_kubernetes_containers"
            )
            # Check for the filter for cloud_name
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "cloud_name" in tool_input_str,
                f"Expected cloud_name filtering in tool input: {tool_input_str}",
            )
            self.assertIn("333", used_tool["output"])
            # Verify API calls were made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (ReadContainerCount)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            self.assertTrue(
                "cloud_name" in filter_str,
                f"Expected cloud_name filtering in API call: {filter_str}",
            )
            # Verify result contains expected information
            self.assertIn("AWS", result)  # cloud name
            self.assertIn("333", result)  # containers count
        self.run_test_with_retries(
            "test_count_kubernetes_containers_by_cloud_name",
            test_logic,
            assertions,
        )
    def test_search_images_vulnerabilities_by_container_id(self):
        """Verify the agent can search images vulnerabilities by container ID."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "ReadCombinedVulnerabilities",
                    "validator": lambda kwargs: "container_id"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "cve_id": "CVE-2005-2541",
                                    "severity": "High",
                                    "cvss_score": 10,
                                    "cps_current_rating": "Low",
                                    "description": "Tar 1.15.1 does not properly warn the user when extracting setuid or setgid files, which may allow local users or remote attackers to gain privileges.\n",
                                    "exploit_found": False,
                                    "exploited_status": 0,
                                    "exploited_status_string": "Unproven",
                                    "published_date": "2005-08-10T04:00:00Z",
                                    "images_impacted": 284,
                                    "packages_impacted": 7,
                                    "containers_impacted": 46,
                                    "remediation_available": False,
                                },
                            ],
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = 'Search images vulnerabilities for the container "container-001"'
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(
                used_tool["input"]["tool_name"],
                "falcon_search_images_vulnerabilities",
            )
            # Check for the filter for container_id
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "container_id" in tool_input_str,
                f"Expected container_id filtering in tool input: {tool_input_str}",
            )
            # Check for the vulnerability from the API response
            self.assertIn("CVE-2005-2541", used_tool["output"])
            # Verify API calls were made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (ReadContainerCombined)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            self.assertTrue(
                "container_id:'container-001'" in filter_str,
                f"Expected container_id filtering in API call: {filter_str}",
            )
            # Verify result contains expected information
            self.assertIn("CVE-2005-2541", result)
        self.run_test_with_retries(
            "test_search_images_vulnerabilities_by_container_id",
            test_logic,
            assertions,
        )
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/discover.py:
--------------------------------------------------------------------------------
```python
"""
Contains Discover resources for applications and unmanaged assets.
"""
from falcon_mcp.common.utils import generate_md_table
# List of tuples containing filter options data: (name, type, operators, description)
SEARCH_APPLICATIONS_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "architectures",
        "String",
        "Yes",
        """
        Application architecture. Unavailable for browser extensions.
        Ex: architectures:'x86'
        Ex: architectures:!'x64'
        Ex: architectures:['x86','x64']
        """
    ),
    (
        "category",
        "String",
        "Yes",
        """
        Category the application is in. Unavailable for browser extensions.
        Ex: category:'IT/Security Apps'
        Ex: category:'Web Browsers'
        Ex: category:'Back up and Recovery'
        Ex: category:['IT/Security Apps','Web Browsers']
        """
    ),
    (
        "cid",
        "String",
        "Yes",
        """
        The application's customer ID. In multi-CID environments:
        - You can filter on both parent and child CIDs.
        - If you're in a parent CID and leave this filter empty, the response includes data about the parent CID and all its child CIDs.
        - If you're in a parent CID and use this filter, the response includes data for only the CIDs you filtered on.
        - If you're in a child CID, this property will only show data for that CID.
        Ex: cid:'cxxx4'
        Ex: cid:!'cxxx4'
        Ex: cid:'cxxx4',cid:'dxxx5'
        """
    ),
    (
        "first_seen_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time the application was first seen.
        Ex: first_seen_timestamp:'2022-12-22T12:41:47.417Z'
        """
    ),
    (
        "groups",
        "String",
        "Yes",
        """
        All application groups the application is assigned to.
        Ex: groups:'ExampleAppGroup'
        Ex: groups:['AppGroup1','AppGroup2']
        """
    ),
    (
        "id",
        "String",
        "Yes",
        """
        Unique ID of the application. Each application ID represents a particular instance of an application on a particular asset.
        Ex: id:'a89xxxxx191'
        Ex: id:'a89xxxxx191',id:'a89xxxxx192'
        """
    ),
    (
        "installation_paths",
        "String",
        "Yes",
        """
        File paths of the application or executable file to the folder on the asset.
        Ex: installation_paths:'C:\\Program Files\\Internet Explorer\\iexplore.exe'
        Ex: installation_paths:'C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe'
        Ex: installation_paths:['C:\\Program Files (x86)\\Google*','C:\\Program Files (x86)\\Adobe*']
        """
    ),
    (
        "installation_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time the application was installed, if available.
        Ex: installation_timestamp:'2023-01-11T00:00:00.000Z'
        """
    ),
    (
        "is_normalized",
        "Boolean",
        "Yes",
        """
        Windows: Whether the application name is normalized (true/false).
        Applications can have different naming variations that result in different records for each variation.
        To avoid this duplication, the most common applications are listed under a single normalized application name.
        Unavailable for browser extensions.
        Ex: is_normalized:true
        """
    ),
    (
        "is_suspicious",
        "Boolean",
        "Yes",
        """
        Whether the application is suspicious based on how often it's been seen in a detection on that asset (true/false).
        Unavailable for browser extensions. See browser_extension.permission_severity instead.
        Ex: is_suspicious:true
        Ex: is_suspicious:!false
        """
    ),
    (
        "last_updated_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time the installation fields of the application instance most recently changed.
        Ex: last_updated_timestamp:'2022-12-22T12:41:47.417Z'
        """
    ),
    (
        "last_used_file_hash",
        "String",
        "Yes",
        """
        Windows and macOS: Most recent file hash used for the application.
        Ex: last_used_file_hash:'0xxxa'
        Ex: last_used_file_hash:['0xxxa','7xxxx9']
        """
    ),
    (
        "last_used_file_name",
        "String",
        "Yes",
        """
        Windows and macOS: Most recent file name used for the application.
        Ex: last_used_file_name:'setup.exe'
        Ex: last_used_file_name:'putty.exe'
        Ex: last_used_file_name:['setup.exe','putty.exe']
        """
    ),
    (
        "last_used_timestamp",
        "Timestamp",
        "Yes",
        """
        Windows and macOS: Date and time the application was most recently used.
        Ex: last_used_timestamp:'2023-01-10T23:00:00.000Z'
        """
    ),
    (
        "last_used_user_name",
        "String",
        "Yes",
        """
        Windows and macOS: Username of the account that most recently used the application.
        Ex: last_used_user_name:'Administrator'
        Ex: last_used_user_name:'xiany'
        Ex: last_used_user_name:['xiany','dursti']
        """
    ),
    (
        "last_used_user_sid",
        "String",
        "Yes",
        """
        Windows and macOS: Security identifier of the account that most recently used the application.
        Ex: last_used_user_sid:'S-1-x-x-xxxxxxxxxx-xxxxxxxxxx-xxxxxxxxxx-xxx1'
        Ex: last_used_user_sid:['S-x-x-x-x-1','S-x-x-x-7']
        """
    ),
    (
        "name",
        "String",
        "Yes",
        """
        Name of the application.
        Ex: name:'Chrome'
        Ex: name:'Falcon Sensor'
        Ex: name:['Chrome','Edge']
        """
    ),
    (
        "name_vendor",
        "String",
        "Yes",
        """
        To group results by application: The app name and vendor name for all application IDs with this application name.
        Ex: name_vendor:'Chrome-Google'
        Ex: name_vendor:'Tools-VMware'
        Ex: name_vendor:['Chrome-Google','Tools-VMware']
        """
    ),
    (
        "name_vendor_version",
        "String",
        "Yes",
        """
        To group results by application version: The app name, vendor name, and vendor version for all application IDs with this application name.
        Ex: name_vendor_version:'Chrome-Google-108.0.5359.99'
        Ex: name_vendor_version:'Flash Player-Adobe-32.0.0.387'
        Ex: name_vendor_version:['Chrome-Google-108*','Flash Player-Adobe-32*']
        """
    ),
    (
        "software_type",
        "String",
        "Yes",
        """
        The type of software: 'application' or 'browser_extension'.
        Ex: software_type:'application'
        """
    ),
    (
        "vendor",
        "String",
        "Yes",
        """
        Name of the application vendor.
        Ex: vendor:'Microsoft Corporation'
        Ex: vendor:'Google'
        Ex: vendor:'CrowdStrike'
        Ex: vendor:['Microsoft*','Google']
        """
    ),
    (
        "version",
        "String",
        "Yes",
        """
        Application version.
        Ex: version:'4.8.4320.0'
        Ex: version:'108.0.5359.99'
        Ex: version:'6.50.16403.0'
        Ex: version:['6.50.16403.0','6.50.16403.1']
        """
    ),
    (
        "versioning_scheme",
        "String",
        "Yes",
        """
        Versioning scheme of the application. Unavailable for browser extensions.
        Ex: versioning_scheme:'semver'
        Ex: versioning_scheme:['semver','calver']
        """
    ),
]
SEARCH_APPLICATIONS_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Search Applications Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== falcon_search_applications FQL filter options ===
""" + generate_md_table(SEARCH_APPLICATIONS_FQL_FILTERS) + """
=== IMPORTANT NOTES ===
• Use single quotes around string values: 'value'
• Use square brackets for exact matches and multiple values: ['value1','value2']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
• Boolean values: true or false (no quotes)
• Some fields require specific capitalization (check individual field descriptions)
=== COMMON FILTER EXAMPLES ===
• Find Chrome applications: name:'Chrome'
• Find applications from Microsoft: vendor:'Microsoft Corporation'
• Find recently installed applications: installation_timestamp:>'2024-01-01'
• Find suspicious applications: is_suspicious:true
• Find browser extensions: software_type:'browser_extension'
• Find applications used by a specific user: last_used_user_name:'Administrator'
"""
# List of tuples containing filter options for unmanaged assets
SEARCH_UNMANAGED_ASSETS_FQL_FILTERS = [
    (
        "Name",
        "Type",
        "Operators",
        "Description"
    ),
    (
        "platform_name",
        "String",
        "Yes",
        """
        Operating system platform of the unmanaged asset.
        Ex: platform_name:'Windows'
        Ex: platform_name:'Linux'
        Ex: platform_name:'Mac'
        Ex: platform_name:['Windows','Linux']
        """
    ),
    (
        "os_version",
        "String",
        "Yes",
        """
        Operating system version of the unmanaged asset.
        Ex: os_version:'Windows 10'
        Ex: os_version:'Ubuntu 20.04'
        Ex: os_version:'macOS 12.3'
        Ex: os_version:*'Windows*'
        """
    ),
    (
        "hostname",
        "String",
        "Yes",
        """
        Hostname of the unmanaged asset.
        Ex: hostname:'PC-001'
        Ex: hostname:*'PC-*'
        Ex: hostname:['PC-001','PC-002']
        """
    ),
    (
        "country",
        "String",
        "Yes",
        """
        Country where the unmanaged asset is located.
        Ex: country:'United States of America'
        Ex: country:'Germany'
        Ex: country:['United States of America','Canada']
        """
    ),
    (
        "city",
        "String",
        "Yes",
        """
        City where the unmanaged asset is located.
        Ex: city:'New York'
        Ex: city:'London'
        Ex: city:['New York','Los Angeles']
        """
    ),
    (
        "product_type_desc",
        "String",
        "Yes",
        """
        Product type description of the unmanaged asset.
        Ex: product_type_desc:'Workstation'
        Ex: product_type_desc:'Server'
        Ex: product_type_desc:'Domain Controller'
        Ex: product_type_desc:['Workstation','Server']
        """
    ),
    (
        "external_ip",
        "String",
        "Yes",
        """
        External IP address of the unmanaged asset.
        Ex: external_ip:'192.0.2.1'
        Ex: external_ip:'192.0.2.0/24'
        Ex: external_ip:['192.0.2.1','203.0.113.1']
        """
    ),
    (
        "local_ip_addresses",
        "String",
        "Yes",
        """
        Local IP addresses of the unmanaged asset.
        Ex: local_ip_addresses:'10.0.1.100'
        Ex: local_ip_addresses:'192.168.1.0/24'
        Ex: local_ip_addresses:['10.0.1.100','192.168.1.50']
        """
    ),
    (
        "mac_addresses",
        "String",
        "Yes",
        """
        MAC addresses of the unmanaged asset.
        Ex: mac_addresses:'AA-BB-CC-DD-EE-FF'
        Ex: mac_addresses:*'AA-BB-CC*'
        Ex: mac_addresses:['AA-BB-CC-DD-EE-FF','11-22-33-44-55-66']
        """
    ),
    (
        "first_seen_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time when the unmanaged asset was first discovered.
        Ex: first_seen_timestamp:'2024-01-01T00:00:00Z'
        Ex: first_seen_timestamp:>'2024-01-01T00:00:00Z'
        Ex: first_seen_timestamp:>'now-7d'
        """
    ),
    (
        "last_seen_timestamp",
        "Timestamp",
        "Yes",
        """
        Date and time when the unmanaged asset was last seen.
        Ex: last_seen_timestamp:'2024-06-15T12:00:00Z'
        Ex: last_seen_timestamp:>'now-24h'
        Ex: last_seen_timestamp:<'now-30d'
        """
    ),
    (
        "kernel_version",
        "String",
        "Yes",
        """
        Kernel version of the unmanaged asset.
        Linux and Mac: The major version, minor version, and patch version.
        Windows: The build number.
        Ex: kernel_version:'5.4.0'
        Ex: kernel_version:'19041'
        Ex: kernel_version:*'5.4*'
        """
    ),
    (
        "system_manufacturer",
        "String",
        "Yes",
        """
        System manufacturer of the unmanaged asset.
        Ex: system_manufacturer:'Dell Inc.'
        Ex: system_manufacturer:'VMware, Inc.'
        Ex: system_manufacturer:*'Dell*'
        """
    ),
    (
        "system_product_name",
        "String",
        "Yes",
        """
        System product name of the unmanaged asset.
        Ex: system_product_name:'OptiPlex 7090'
        Ex: system_product_name:'VMware Virtual Platform'
        Ex: system_product_name:*'OptiPlex*'
        """
    ),
    (
        "criticality",
        "String",
        "Yes",
        """
        Criticality level assigned to the unmanaged asset.
        Ex: criticality:'Critical'
        Ex: criticality:'High'
        Ex: criticality:'Medium'
        Ex: criticality:'Low'
        Ex: criticality:'Unassigned'
        """
    ),
    (
        "internet_exposure",
        "String",
        "Yes",
        """
        Whether the unmanaged asset is exposed to the internet.
        Ex: internet_exposure:'Yes'
        Ex: internet_exposure:'No'
        Ex: internet_exposure:'Pending'
        Ex: internet_exposure:['Yes','Pending']
        """
    ),
    (
        "discovering_by",
        "String",
        "Yes",
        """
        Method by which the unmanaged asset was discovered.
        Ex: discovering_by:'Passive'
        Ex: discovering_by:'Active'
        Ex: discovering_by:['Passive','Active']
        """
    ),
    (
        "confidence",
        "Number",
        "Yes",
        """
        Confidence level of the unmanaged asset discovery (0-100).
        Higher values indicate higher confidence that the asset is real.
        Ex: confidence:>80
        Ex: confidence:>=90
        Ex: confidence:<50
        Ex: confidence:[80,90,95]
        """
    ),
]
SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Search Unmanaged Assets Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
• No operator = equals (default)
• ! = not equal to
• > = greater than
• >= = greater than or equal
• < = less than
• <= = less than or equal
• ~ = text match (ignores case, spaces, punctuation)
• !~ = does not text match
=== DATA TYPES & SYNTAX ===
• Strings: 'value' or ['exact_value'] for exact match
• Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
• Booleans: true or false (no quotes)
• Numbers: 123 (no quotes)
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== AUTOMATIC FILTERING ===
This tool automatically filters for unmanaged assets only by adding entity_type:'unmanaged' to all queries.
You do not need to (and cannot) specify entity_type in your filter - it is always set to 'unmanaged'.
=== falcon_search_unmanaged_assets FQL filter options ===
""" + generate_md_table(SEARCH_UNMANAGED_ASSETS_FQL_FILTERS) + """
=== IMPORTANT NOTES ===
• entity_type:'unmanaged' is automatically applied - do not include in your filter
• Use single quotes around string values: 'value'
• Use square brackets for exact matches and multiple values: ['value1','value2']
• Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
• Boolean values: true or false (no quotes)
• Some fields require specific capitalization (check individual field descriptions)
=== COMMON FILTER EXAMPLES ===
• Find Windows unmanaged assets: platform_name:'Windows'
• Find high-confidence unmanaged assets: confidence:>80
• Find recently discovered assets: first_seen_timestamp:>'now-7d'
• Find assets by hostname pattern: hostname:*'PC-*'
• Find critical unmanaged assets: criticality:'Critical'
• Find servers: product_type_desc:'Server'
• Find internet-exposed assets: internet_exposure:'Yes'
• Find assets in specific network: external_ip:'192.168.1.0/24'
• Find assets by manufacturer: system_manufacturer:*'Dell*'
• Find recently seen assets: last_seen_timestamp:>'now-24h'
=== COMPLEX QUERY EXAMPLES ===
• Windows workstations seen recently: platform_name:'Windows'+product_type_desc:'Workstation'+last_seen_timestamp:>'now-7d'
• Critical servers with internet exposure: criticality:'Critical'+product_type_desc:'Server'+internet_exposure:'Yes'
• Dell systems discovered this month: system_manufacturer:*'Dell*'+first_seen_timestamp:>'now-30d'
"""
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_hosts.py:
--------------------------------------------------------------------------------
```python
"""
E2E tests for the Hosts module.
"""
import json
import unittest
import pytest
from tests.e2e.utils.base_e2e_test import BaseE2ETest
@pytest.mark.e2e
class TestHostsModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Hosts Module.
    """
    def test_search_linux_servers(self):
        """Verify the agent can search for Linux servers and retrieve their details."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryDevicesByFilter",
                    "validator": lambda kwargs: "linux"
                    in kwargs.get("parameters", {}).get("filter", "").lower()
                    and "server"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {"resources": ["host-001", "host-002", "host-003"]},
                    },
                },
                {
                    "operation": "PostDeviceDetailsV2",
                    "validator": lambda kwargs: "host-001"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "device_id": "host-001",
                                    "hostname": "linux-server-01",
                                    "platform_name": "Linux",
                                    "product_type_desc": "Server",
                                    "os_version": "Ubuntu 20.04.3 LTS",
                                    "agent_version": "7.26.17905.0",
                                    "status": "normal",
                                    "last_seen": "2024-01-20T10:00:00Z",
                                    "first_seen": "2024-01-15T08:30:00Z",
                                    "external_ip": "203.0.113.10",
                                    "local_ip": "192.168.1.10",
                                    "machine_domain": "company.local",
                                    "system_manufacturer": "Dell Inc.",
                                    "system_product_name": "PowerEdge R740",
                                },
                                {
                                    "device_id": "host-002",
                                    "hostname": "linux-server-02",
                                    "platform_name": "Linux",
                                    "product_type_desc": "Server",
                                    "os_version": "CentOS Linux 8.4",
                                    "agent_version": "7.26.17905.0",
                                    "status": "normal",
                                    "last_seen": "2024-01-20T09:45:00Z",
                                    "first_seen": "2024-01-10T14:20:00Z",
                                    "external_ip": "203.0.113.11",
                                    "local_ip": "192.168.1.11",
                                    "machine_domain": "company.local",
                                    "system_manufacturer": "HPE",
                                    "system_product_name": "ProLiant DL380",
                                },
                                {
                                    "device_id": "host-003",
                                    "hostname": "linux-server-03",
                                    "platform_name": "Linux",
                                    "product_type_desc": "Server",
                                    "os_version": "Red Hat Enterprise Linux 8.5",
                                    "agent_version": "7.25.16803.0",
                                    "status": "normal",
                                    "last_seen": "2024-01-20T09:30:00Z",
                                    "first_seen": "2024-01-12T11:15:00Z",
                                    "external_ip": "203.0.113.12",
                                    "local_ip": "192.168.1.12",
                                    "machine_domain": "company.local",
                                    "system_manufacturer": "Lenovo",
                                    "system_product_name": "ThinkSystem SR650",
                                },
                            ]
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Find all Linux servers in our environment and show me their hostnames, IP addresses, and agent versions"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_hosts")
            # Check for Linux and server filtering
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "linux" in tool_input_str and "server" in tool_input_str,
                f"Expected Linux server filtering in tool input: {tool_input_str}",
            )
            # Verify all three hosts are in the output
            self.assertIn("linux-server-01", used_tool["output"])
            self.assertIn("linux-server-02", used_tool["output"])
            self.assertIn("linux-server-03", used_tool["output"])
            # Verify API calls were made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 2, "Expected 2 API calls",
            )
            # Check first API call (QueryDevicesByFilter)
            api_call_1_params = self._mock_api_instance.command.call_args_list[0][
                1
            ].get("parameters", {})
            filter_str = api_call_1_params.get("filter", "").lower()
            self.assertTrue(
                "linux" in filter_str and "server" in filter_str,
                f"Expected Linux server filtering in API call: {filter_str}",
            )
            # Check second API call (PostDeviceDetailsV2)
            api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
                "body", {}
            )
            expected_ids = ["host-001", "host-002", "host-003"]
            self.assertEqual(api_call_2_body.get("ids"), expected_ids)
            # Verify result contains expected information
            self.assertIn("linux-server-01", result)
            self.assertIn("linux-server-02", result)
            self.assertIn("linux-server-03", result)
            self.assertIn("192.168.1.", result)  # Should contain IP addresses
            self.assertIn("7.26.", result)  # Should contain agent versions
        self.run_test_with_retries("test_search_linux_servers", test_logic, assertions)
    def test_get_specific_host_details(self):
        """Verify the agent can get details for specific host IDs."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "PostDeviceDetailsV2",
                    "validator": lambda kwargs: "host-windows-001"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "device_id": "host-windows-001",
                                    "hostname": "DESKTOP-WIN10-01",
                                    "platform_name": "Windows",
                                    "product_type_desc": "Workstation",
                                    "os_version": "Windows 10 Enterprise",
                                    "major_version": "10",
                                    "minor_version": "0",
                                    "agent_version": "7.26.17905.0",
                                    "status": "normal",
                                    "last_seen": "2024-01-20T11:15:00Z",
                                    "first_seen": "2024-01-18T09:00:00Z",
                                    "external_ip": "203.0.113.20",
                                    "local_ip": "192.168.1.20",
                                    "mac_address": "00:50:56:C0:00:08",
                                    "machine_domain": "CORPORATE",
                                    "system_manufacturer": "VMware, Inc.",
                                    "system_product_name": "VMware Virtual Platform",
                                    "bios_manufacturer": "Phoenix Technologies LTD",
                                    "bios_version": "6.00",
                                    "serial_number": "VMware-56-4d-xx-xx-xx-xx",
                                    "reduced_functionality_mode": "no",
                                    "filesystem_containment_status": "normal",
                                }
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Get detailed information for host ID 'host-windows-001', including its hostname, platform, and containment status"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_get_host_details")
            # Check that the specific host ID was used
            tool_input = used_tool["input"]["tool_input"]
            self.assertIn("host-windows-001", json.dumps(tool_input))
            # Verify host details are in the output
            self.assertIn("DESKTOP-WIN10-01", used_tool["output"])
            self.assertIn("Windows", used_tool["output"])
            self.assertIn("host-windows-001", used_tool["output"])
            # Verify API call was made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (PostDeviceDetailsV2)
            api_call_body = self._mock_api_instance.command.call_args_list[0][1].get(
                "body", {}
            )
            self.assertIn("host-windows-001", api_call_body.get("ids", []))
            # Verify result contains expected information
            self.assertIn("DESKTOP-WIN10-01", result)
            self.assertIn("Windows", result)
            self.assertIn("normal", result)  # Status and containment status
            self.assertIn("192.168.1.20", result)  # Local IP
        self.run_test_with_retries(
            "test_get_specific_host_details", test_logic, assertions
        )
    def test_search_azure_cloud_hosts(self):
        """Verify the agent can search for cloud hosts with complex filtering."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "QueryDevicesByFilter",
                    "validator": lambda kwargs: "azure"
                    in kwargs.get("parameters", {}).get("filter", "").lower(),
                    "response": {
                        "status_code": 200,
                        "body": {"resources": ["azure-host-001"]},
                    },
                },
                {
                    "operation": "PostDeviceDetailsV2",
                    "validator": lambda kwargs: "azure-host-001"
                    in kwargs.get("body", {}).get("ids", []),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "device_id": "azure-host-001",
                                    "hostname": "azure-vm-debian",
                                    "platform_name": "Linux",
                                    "product_type_desc": "Server",
                                    "os_version": "Debian GNU 12",
                                    "kernel_version": "6.11.0-1015-azure",
                                    "agent_version": "7.26.17905.0",
                                    "status": "normal",
                                    "last_seen": "2024-01-20T12:00:00Z",
                                    "first_seen": "2024-01-19T10:30:00Z",
                                    "external_ip": "20.45.123.45",
                                    "connection_ip": "172.18.0.2",
                                    "default_gateway_ip": "172.18.0.1",
                                    "service_provider": "AZURE",
                                    "service_provider_account_id": "99841e6a-b123-4567-8901-123456789abc",
                                    "instance_id": "f9d3cef9-0123-4567-8901-123456789def",
                                    "system_manufacturer": "Microsoft Corporation",
                                    "system_product_name": "Virtual Machine",
                                    "deployment_type": "DaemonSet",
                                    "linux_sensor_mode": "User Mode",
                                    "reduced_functionality_mode": "yes",
                                    "k8s_cluster_id": "ecbb9795-9123-4567-8901-123456789ghi",
                                    "tags": ["SensorGroupingTags/daemonset"],
                                }
                            ]
                        },
                    },
                },
            ]
            self._mock_api_instance.command.side_effect = (
                self._create_mock_api_side_effect(fixtures)
            )
            prompt = "Find Azure cloud hosts and show their deployment details including Kubernetes cluster information"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
            # Find a search hosts tool call (may not be the last one)
            search_tool = None
            for tool in tools:
                if tool["input"]["tool_name"] == "falcon_search_hosts":
                    search_tool = tool
                    break
            self.assertIsNotNone(
                search_tool, "Expected at least one falcon_search_hosts tool call"
            )
            # Check for Azure filtering in any tool call
            found_azure_filtering = False
            for tool in tools:
                tool_input_str = json.dumps(tool["input"]["tool_input"]).lower()
                if "azure" in tool_input_str:
                    found_azure_filtering = True
                    break
            self.assertTrue(
                found_azure_filtering, "Expected Azure filtering in tool inputs"
            )
            # Verify Azure host is in the search tool output
            self.assertIn("azure-vm-debian", search_tool["output"])
            self.assertIn("AZURE", search_tool["output"])
            # Verify API calls were made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
            )
            # Check that we have QueryDevicesByFilter call with Azure filtering
            found_azure_query = False
            found_details_call = False
            for call in self._mock_api_instance.command.call_args_list:
                if call[0][0] == "QueryDevicesByFilter":
                    filter_str = call[1].get("parameters", {}).get("filter", "").lower()
                    if "azure" in filter_str:
                        found_azure_query = True
                elif call[0][0] == "PostDeviceDetailsV2":
                    if "azure-host-001" in call[1].get("body", {}).get("ids", []):
                        found_details_call = True
            self.assertTrue(
                found_azure_query,
                "Expected QueryDevicesByFilter call with Azure filtering",
            )
            self.assertTrue(
                found_details_call,
                "Expected PostDeviceDetailsV2 call with azure-host-001",
            )
            # Verify result contains expected Azure and Kubernetes information (more flexible matching)
            result_lower = result.lower()
            self.assertIn("azure-vm-debian", result_lower)
            self.assertIn("azure", result_lower)
            self.assertIn("daemonset", result_lower)
            # Check for Kubernetes info (could be "k8s" or "kubernetes")
            self.assertTrue(
                "k8s" in result_lower or "kubernetes" in result_lower,
                f"Expected Kubernetes cluster info in result: {result_lower[:500]}...",
            )
        self.run_test_with_retries(
            "test_search_azure_cloud_hosts", test_logic, assertions
        )
if __name__ == "__main__":
    unittest.main()
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_discover.py:
--------------------------------------------------------------------------------
```python
"""
E2E tests for the Discover module.
"""
import json
import unittest
import pytest
from tests.e2e.utils.base_e2e_test import BaseE2ETest
@pytest.mark.e2e
class TestDiscoverModuleE2E(BaseE2ETest):
    """
    End-to-end test suite for the Falcon MCP Server Discover Module.
    """
    def test_search_applications_by_category(self):
        """Verify the agent can search for applications by name."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "combined_applications",
                    "validator": lambda kwargs: "category:'Web Browsers'"
                    in kwargs.get("parameters", {}).get("filter", ""),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "abc123_def456789abcdef123456789abcdef123456789abcdef123456789abcdef",
                                    "cid": "abc123",
                                    "name": "Chrome Browser",
                                    "vendor": "Google",
                                    "version": "120.0.6099.130",
                                    "software_type": "application",
                                    "name_vendor": "Chrome Browser-Google",
                                    "name_vendor_version": "Chrome Browser-Google-120.0.6099.130",
                                    "versioning_scheme": "semver",
                                    "groups": [
                                        "group1",
                                        "group2",
                                        "group3",
                                    ],
                                    "category": "Web Browsers",
                                    "architectures": [
                                        "x64",
                                    ],
                                    "first_seen_timestamp": "2025-02-15T10:30:00Z",
                                    "last_updated_timestamp": "2025-03-01T14:45:22Z",
                                    "is_suspicious": False,
                                    "is_normalized": True,
                                    "host": {
                                        "id": "abc123_xyz789",
                                    },
                                },
                                {
                                    "id": "def456_123456789abcdef123456789abcdef123456789abcdef123456789abcdef",
                                    "cid": "def456",
                                    "name": "Chrome Browser",
                                    "vendor": "Google",
                                    "version": "119.0.6045.199",
                                    "software_type": "application",
                                    "name_vendor": "Chrome Browser-Google",
                                    "name_vendor_version": "Chrome Browser-Google-119.0.6045.199",
                                    "versioning_scheme": "semver",
                                    "groups": [
                                        "group4",
                                        "group5",
                                    ],
                                    "category": "Web Browsers",
                                    "architectures": [
                                        "x64",
                                    ],
                                    "first_seen_timestamp": "2025-01-10T08:15:30Z",
                                    "last_updated_timestamp": "2025-02-20T11:22:45Z",
                                    "is_suspicious": False,
                                    "is_normalized": True,
                                    "host": {
                                        "id": "def456_abc123",
                                    },
                                },
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = self._create_mock_api_side_effect(
                fixtures
            )
            prompt = "Search for all applications categorized as Web Browsers in our environment and show me their details"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            tool_names_called = [tool["input"]["tool_name"] for tool in tools]
            self.assertIn("falcon_search_applications_fql_guide", tool_names_called)
            self.assertIn("falcon_search_applications", tool_names_called)
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_applications")
            # Check for name filtering
            tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
            self.assertTrue(
                "web browsers" in tool_input_str,
                f"Expected web browsers category filtering in tool input: {tool_input_str}",
            )
            # Verify both applications are in the output
            self.assertIn("Chrome Browser", used_tool["output"])
            self.assertIn("Google", used_tool["output"])
            self.assertIn("120.0.6099.130", used_tool["output"])
            self.assertIn("119.0.6045.199", used_tool["output"])
            # Verify API call was made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (combined_applications)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            self.assertTrue(
                "category" in filter_str and "web browsers" in filter_str,
                f"Expected category:Web Browsers filtering in API call: {filter_str}",
            )
            # Verify result contains expected information
            self.assertIn("Chrome Browser", result)
            self.assertIn("Google", result)
            self.assertIn("120.0.6099.130", result)
            self.assertIn("119.0.6045.199", result)
            self.assertIn("Web Browsers", result)
        self.run_test_with_retries("test_search_applications_by_category", test_logic, assertions)
    def test_search_unmanaged_assets_by_platform(self):
        """Verify the agent can search for unmanaged assets by platform."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "combined_hosts",
                    "validator": lambda kwargs: "entity_type:'unmanaged'"
                    in kwargs.get("parameters", {}).get("filter", "")
                    and (
                        "platform_name:'Windows'" in kwargs.get("parameters", {}).get("filter", "")
                    ),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "abc123def456789_1234567890abcdef1234567890abcdef1234567890abcdef",
                                    "cid": "abc123def456789",
                                    "entity_type": "unmanaged",
                                    "first_seen_timestamp": "2025-05-16T04:00:00Z",
                                    "last_seen_timestamp": "2025-08-12T23:00:00Z",
                                    "system_manufacturer": "VMware, Inc.",
                                    "hostname": "PC-FINANCE-W11",
                                    "local_ips_count": 1,
                                    "network_interfaces": [
                                        {
                                            "local_ip": "192.168.1.100",
                                            "mac_address": "AA-BB-CC-DD-EE-01",
                                            "network_prefix": "192.168",
                                        }
                                    ],
                                    "os_security": {},
                                    "current_local_ip": "192.168.1.100",
                                    "data_providers": ["Falcon passive discovery"],
                                    "data_providers_count": 1,
                                    "first_discoverer_aid": "abc123456789def0123456789abcdef01",
                                    "last_discoverer_aid": "abc123456789def0123456789abcdef01",
                                    "discoverer_count": 1,
                                    "discoverer_aids": ["abc123456789def0123456789abcdef01"],
                                    "discoverer_tags": [
                                        "FalconGroupingTags/Finance",
                                        "FalconGroupingTags/Workstation",
                                        "FalconGroupingTags/Windows11",
                                    ],
                                    "discoverer_platform_names": ["Windows"],
                                    "discoverer_product_type_descs": ["Workstation"],
                                    "discoverer_hostnames": ["WIN-MGMT-001"],
                                    "last_discoverer_hostname": "WIN-MGMT-001",
                                    "confidence": 75,
                                    "active_discovery": {},
                                },
                                {
                                    "id": "abc123def456789_fedcba0987654321fedcba0987654321fedcba0987654321",
                                    "cid": "abc123def456789",
                                    "entity_type": "unmanaged",
                                    "first_seen_timestamp": "2025-07-16T10:00:00Z",
                                    "last_seen_timestamp": "2025-08-12T23:00:00Z",
                                    "system_manufacturer": "Dell Inc.",
                                    "hostname": "SERVER-HR-002",
                                    "local_ips_count": 1,
                                    "network_interfaces": [
                                        {
                                            "local_ip": "192.168.2.50",
                                            "mac_address": "AA-BB-CC-DD-EE-02",
                                            "network_prefix": "192.168",
                                        }
                                    ],
                                    "os_security": {},
                                    "current_local_ip": "192.168.2.50",
                                    "data_providers": ["Falcon passive discovery"],
                                    "data_providers_count": 1,
                                    "first_discoverer_aid": "def456789abc012def456789abc012de",
                                    "last_discoverer_aid": "def456789abc012def456789abc012de",
                                    "discoverer_count": 1,
                                    "discoverer_aids": ["def456789abc012def456789abc012de"],
                                    "discoverer_tags": [
                                        "FalconGroupingTags/HR",
                                        "FalconGroupingTags/Server",
                                        "FalconGroupingTags/WindowsServer2019",
                                    ],
                                    "discoverer_platform_names": ["Windows"],
                                    "discoverer_product_type_descs": ["Server"],
                                    "discoverer_hostnames": ["WIN-DC-001"],
                                    "last_discoverer_hostname": "WIN-DC-001",
                                    "confidence": 85,
                                    "active_discovery": {},
                                },
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = self._create_mock_api_side_effect(
                fixtures
            )
            prompt = "Search for all unmanaged Windows assets in our environment and show me their details"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            tool_names_called = [tool["input"]["tool_name"] for tool in tools]
            # Agent must consult the FQL guide to learn proper platform filtering syntax
            self.assertIn("falcon_search_unmanaged_assets_fql_guide", tool_names_called)
            self.assertIn("falcon_search_unmanaged_assets", tool_names_called)
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_unmanaged_assets")
            # Note: Agent may interpret platform filtering differently
            # The key behavior is that it successfully finds and returns unmanaged assets
            # Verify both unmanaged assets are in the output
            self.assertIn("PC-FINANCE-W11", used_tool["output"])
            self.assertIn("SERVER-HR-002", used_tool["output"])
            self.assertIn("VMware, Inc.", used_tool["output"])
            self.assertIn("Dell Inc.", used_tool["output"])
            self.assertIn("unmanaged", used_tool["output"])
            # Verify API call was made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (combined_hosts)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            # Verify entity_type:'unmanaged' is automatically added
            self.assertTrue(
                "entity_type:'unmanaged'" in filter_str,
                f"Expected entity_type:'unmanaged' in API call filter: {filter_str}",
            )
            # Note: Platform filtering may vary based on agent interpretation
            # The core requirement is that entity_type:'unmanaged' is enforced
            # Verify result contains expected information
            self.assertIn("PC-FINANCE-W11", result)
            self.assertIn("SERVER-HR-002", result)
            self.assertIn("Windows", result)
            self.assertIn("unmanaged", result)
            self.assertIn("Workstation", result)
            self.assertIn("Server", result)
        self.run_test_with_retries(
            "test_search_unmanaged_assets_by_platform", test_logic, assertions
        )
    def test_search_unmanaged_assets_by_confidence(self):
        """Verify the agent can search for unmanaged assets by confidence level."""
        async def test_logic():
            fixtures = [
                {
                    "operation": "combined_hosts",
                    "validator": lambda kwargs: "entity_type:'unmanaged'"
                    in kwargs.get("parameters", {}).get("filter", "")
                    and ("confidence:" in kwargs.get("parameters", {}).get("filter", "")),
                    "response": {
                        "status_code": 200,
                        "body": {
                            "resources": [
                                {
                                    "id": "def789ghi012345_abcdef123456789abcdef123456789abcdef123456789abcdef",
                                    "cid": "def789ghi012345",
                                    "entity_type": "unmanaged",
                                    "first_seen_timestamp": "2025-07-17T08:00:00Z",
                                    "last_seen_timestamp": "2025-08-12T23:00:00Z",
                                    "system_manufacturer": "VMware, Inc.",
                                    "hostname": "PROD-DB-LINUX",
                                    "local_ips_count": 1,
                                    "network_interfaces": [
                                        {
                                            "local_ip": "10.0.1.200",
                                            "mac_address": "AA-BB-CC-DD-EE-03",
                                            "network_prefix": "10.0",
                                        }
                                    ],
                                    "os_security": {},
                                    "current_local_ip": "10.0.1.200",
                                    "data_providers": ["Falcon passive discovery"],
                                    "data_providers_count": 1,
                                    "first_discoverer_aid": "123456789def012345678901234567ab",
                                    "last_discoverer_aid": "123456789def012345678901234567ab",
                                    "discoverer_count": 1,
                                    "discoverer_aids": ["123456789def012345678901234567ab"],
                                    "discoverer_tags": [
                                        "FalconGroupingTags/Production",
                                        "FalconGroupingTags/Database",
                                        "FalconGroupingTags/Linux",
                                        "FalconGroupingTags/Critical-Infrastructure",
                                    ],
                                    "discoverer_platform_names": ["Linux"],
                                    "discoverer_product_type_descs": ["Server"],
                                    "discoverer_hostnames": ["LNX-MGMT-001"],
                                    "last_discoverer_hostname": "LNX-MGMT-001",
                                    "confidence": 95,
                                    "active_discovery": {},
                                }
                            ]
                        },
                    },
                }
            ]
            self._mock_api_instance.command.side_effect = self._create_mock_api_side_effect(
                fixtures
            )
            prompt = "Find all unmanaged assets with high confidence levels (above 80) that are likely real systems"
            return await self._run_agent_stream(prompt)
        def assertions(tools, result):
            tool_names_called = [tool["input"]["tool_name"] for tool in tools]
            self.assertIn("falcon_search_unmanaged_assets", tool_names_called)
            used_tool = tools[len(tools) - 1]
            self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_unmanaged_assets")
            # Note: Agent may interpret confidence filtering differently
            # The key behavior is that it successfully finds and returns unmanaged assets
            # Verify high confidence asset is in the output
            self.assertIn("PROD-DB-LINUX", used_tool["output"])
            self.assertIn("95", used_tool["output"])
            self.assertIn("unmanaged", used_tool["output"])
            # Verify API call was made correctly
            self.assertGreaterEqual(
                self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
            )
            # Check API call (combined_hosts)
            api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
                "parameters", {}
            )
            filter_str = api_call_params.get("filter", "").lower()
            # Verify entity_type:'unmanaged' is automatically added
            self.assertTrue(
                "entity_type:'unmanaged'" in filter_str,
                f"Expected entity_type:'unmanaged' in API call filter: {filter_str}",
            )
            # Note: Confidence filtering may vary based on agent interpretation
            # The core requirement is that entity_type:'unmanaged' is enforced
            # Verify result contains expected information
            self.assertIn("PROD-DB-LINUX", result)
            self.assertIn("95", result)
            self.assertIn("unmanaged", result)
            self.assertIn("Linux", result)
        self.run_test_with_retries(
            "test_search_unmanaged_assets_by_confidence", test_logic, assertions
        )
if __name__ == "__main__":
    unittest.main()
```