This is page 4 of 4. Use http://codebase.md/crowdstrike/falcon-mcp?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.dev.example
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug.yaml
│ │ ├── config.yml
│ │ ├── feature-request.yaml
│ │ └── question.yaml
│ └── workflows
│ ├── docker-build-push.yml
│ ├── docker-build-test.yml
│ ├── markdown-lint.yml
│ ├── python-lint.yml
│ ├── python-test-e2e.yml
│ ├── python-test.yml
│ └── release.yml
├── .gitignore
├── .markdownlint.json
├── CHANGELOG.md
├── Dockerfile
├── docs
│ ├── CODE_OF_CONDUCT.md
│ ├── CONTRIBUTING.md
│ ├── deployment
│ │ ├── amazon_bedrock_agentcore.md
│ │ └── google_cloud.md
│ ├── e2e_testing.md
│ ├── module_development.md
│ ├── resource_development.md
│ └── SECURITY.md
├── examples
│ ├── adk
│ │ ├── adk_agent_operations.sh
│ │ ├── falcon_agent
│ │ │ ├── __init__.py
│ │ │ ├── agent.py
│ │ │ ├── env.properties
│ │ │ └── requirements.txt
│ │ └── README.md
│ ├── basic_usage.py
│ ├── mcp_config.json
│ ├── sse_usage.py
│ └── streamable_http_usage.py
├── falcon_mcp
│ ├── __init__.py
│ ├── client.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── api_scopes.py
│ │ ├── errors.py
│ │ ├── logging.py
│ │ └── utils.py
│ ├── modules
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── detections.py
│ │ ├── discover.py
│ │ ├── hosts.py
│ │ ├── idp.py
│ │ ├── incidents.py
│ │ ├── intel.py
│ │ ├── sensor_usage.py
│ │ ├── serverless.py
│ │ └── spotlight.py
│ ├── registry.py
│ ├── resources
│ │ ├── __init__.py
│ │ ├── cloud.py
│ │ ├── detections.py
│ │ ├── discover.py
│ │ ├── hosts.py
│ │ ├── incidents.py
│ │ ├── intel.py
│ │ ├── sensor_usage.py
│ │ ├── serverless.py
│ │ └── spotlight.py
│ └── server.py
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── generate_e2e_report.py
│ └── test_results_viewer.html
├── SUPPORT.md
├── tests
│ ├── __init__.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── test_api_scopes.py
│ │ ├── test_errors.py
│ │ ├── test_logging.py
│ │ └── test_utils.py
│ ├── conftest.py
│ ├── e2e
│ │ ├── __init__.py
│ │ ├── modules
│ │ │ ├── __init__.py
│ │ │ ├── test_cloud.py
│ │ │ ├── test_detections.py
│ │ │ ├── test_discover.py
│ │ │ ├── test_hosts.py
│ │ │ ├── test_idp.py
│ │ │ ├── test_incidents.py
│ │ │ ├── test_intel.py
│ │ │ ├── test_sensor_usage.py
│ │ │ ├── test_serverless.py
│ │ │ └── test_spotlight.py
│ │ └── utils
│ │ ├── __init__.py
│ │ └── base_e2e_test.py
│ ├── modules
│ │ ├── __init__.py
│ │ ├── test_base.py
│ │ ├── test_cloud.py
│ │ ├── test_detections.py
│ │ ├── test_discover.py
│ │ ├── test_hosts.py
│ │ ├── test_idp.py
│ │ ├── test_incidents.py
│ │ ├── test_intel.py
│ │ ├── test_sensor_usage.py
│ │ ├── test_serverless.py
│ │ ├── test_spotlight.py
│ │ └── utils
│ │ └── test_modules.py
│ ├── test_client.py
│ ├── test_registry.py
│ ├── test_server.py
│ └── test_streamable_http_transport.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/falcon_mcp/resources/detections.py:
--------------------------------------------------------------------------------
```python
"""
Contains Detections resources.
"""
from falcon_mcp.common.utils import generate_md_table
# List of tuples containing filter options data: (name, type, description)
SEARCH_DETECTIONS_FQL_FILTERS = [
(
"Name",
"Type",
"Description"
),
(
"agent_id",
"String",
"""
Agent ID associated with the alert.
Ex: 77d11725xxxxxxxxxxxxxxxxxxxxc48ca19
"""
),
(
"aggregate_id",
"String",
"""
Unique identifier linking multiple related alerts
that represent a logical grouping (like legacy
detection_id). Use this to correlate related alerts.
Ex: aggind:77d1172532c8xxxxxxxxxxxxxxxxxxxx49030016385
"""
),
(
"composite_id",
"String",
"""
Global unique identifier for the individual alert.
This replaces the legacy detection_id for individual
alerts in the new Alerts API.
Ex: d615:ind:77d1172xxxxxxxxxxxxxxxxx6c48ca19
"""
),
(
"cid",
"String",
"""
Customer ID.
Ex: d61501xxxxxxxxxxxxxxxxxxxxa2da2158
"""
),
(
"pattern_id",
"Number",
"""
Detection pattern identifier.
Ex: 67
"""
),
(
"assigned_to_name",
"String",
"""
Name of assigned Falcon user.
Ex: Alice Anderson
"""
),
(
"assigned_to_uid",
"String",
"""
User ID of assigned Falcon user.
Ex: [email protected]
"""
),
(
"assigned_to_uuid",
"String",
"""
UUID of assigned Falcon user.
Ex: dc54xxxxxxxxxxxxxxxx1658
"""
),
(
"status",
"String",
"""
Alert status. Possible values:
- new: Newly detected, needs triage
- in_progress: Being investigated
- closed: Investigation completed
- reopened: Previously closed, now active again
Ex: new
"""
),
(
"created_timestamp",
"Timestamp",
"""
When alert was created in UTC format.
Ex: 2024-02-22T14:16:04.973070837Z
"""
),
(
"updated_timestamp",
"Timestamp",
"""
Last modification time in UTC format.
Ex: 2024-02-22T15:15:05.637481021Z
"""
),
(
"timestamp",
"Timestamp",
"""
Alert occurrence timestamp in UTC format.
Ex: 2024-02-22T14:15:03.112Z
"""
),
(
"crawled_timestamp",
"Timestamp",
"""
Internal timestamp for processing in UTC format.
Ex: 2024-02-22T15:15:05.637684718Z
"""
),
(
"confidence",
"Number",
"""
Confidence level (1-100). Higher values indicate
greater confidence in the detection.
Ex: 80
"""
),
(
"severity",
"Number",
"""
Security risk level (1-100). Use numeric values:
Ex: 90
"""
),
(
"severity_name",
"String",
"""
Human-readable severity level name. Easier to use
than numeric ranges. Possible values:
- Informational: Low-priority alerts
- Low: Minor security concerns
- Medium: Moderate security risks
- High: Significant security threats
- Critical: Severe security incidents
Ex: High
"""
),
(
"tactic",
"String",
"""
MITRE ATT&CK tactic name.
Ex: Credential Access
"""
),
(
"tactic_id",
"String",
"""
MITRE ATT&CK tactic identifier.
Ex: TA0006
"""
),
(
"technique",
"String",
"""
MITRE ATT&CK technique name.
Ex: OS Credential Dumping
"""
),
(
"technique_id",
"String",
"""
MITRE ATT&CK technique identifier.
Ex: T1003
"""
),
(
"objective",
"String",
"""
Attack objective description.
Ex: Gain Access
"""
),
(
"scenario",
"String",
"""
Detection scenario classification.
Ex: credential_theft
"""
),
(
"product",
"String",
"""
Source Falcon product. Possible values:
- epp: Endpoint Protection Platform
- idp: Identity Protection
- mobile: Mobile Device Protection
- xdr: Extended Detection and Response
- overwatch: Managed Threat Hunting
- cwpp: Cloud Workload Protection
- ngsiem: Next-Gen SIEM
- thirdparty: Third-party integrations
- data-protection: Data Loss Prevention
Ex: epp
"""
),
(
"platform",
"String",
"""
Operating system platform.
Ex: Windows, Linux, Mac
"""
),
(
"data_domains",
"Array",
"""
Domain to which this alert belongs to. Possible
values: Endpoint, Identity, Cloud, Email, Web,
Network (array field).
Ex: ["Endpoint"]
"""
),
(
"source_products",
"Array",
"""
Products associated with the source of this alert
(array field).
Ex: ["Falcon Insight"]
"""
),
(
"source_vendors",
"Array",
"""
Vendors associated with the source of this alert
(array field).
Ex: ["CrowdStrike"]
"""
),
(
"name",
"String",
"""
Detection pattern name.
Ex: NtdsFileAccessedViaVss
"""
),
(
"display_name",
"String",
"""
Human-readable detection name.
Ex: NtdsFileAccessedViaVss
"""
),
(
"description",
"String",
"""
Detection description.
Ex: Process accessed credential-containing NTDS.dit
in a Volume Shadow Snapshot
"""
),
(
"type",
"String",
"""
Detection type classification. Possible values:
- ldt: Legacy Detection Technology
- ods: On-sensor Detection System
- xdr: Extended Detection and Response
- ofp: Offline Protection
- ssd: Suspicious Script Detection
- windows_legacy: Windows Legacy Detection
Ex: ldt
"""
),
(
"show_in_ui",
"Boolean",
"""
Whether detection appears in UI.
Ex: true
"""
),
(
"email_sent",
"Boolean",
"""
Whether email was sent for this detection.
Ex: true
"""
),
(
"seconds_to_resolved",
"Number",
"""
Time in seconds to move from new to closed status.
Ex: 3600
"""
),
(
"seconds_to_triaged",
"Number",
"""
Time in seconds to move from new to in_progress.
Ex: 1800
"""
),
(
"comments.value",
"String",
"""
A single term in an alert comment. Matching is
case sensitive. Partial match and wildcard search
are not supported.
Ex: suspicious
"""
),
(
"tags",
"Array",
"""
Contains a separated list of FalconGroupingTags
and SensorGroupingTags (array field).
Ex: ["fc/offering/falcon_complete",
"fc/exclusion/pre-epp-migration", "fc/exclusion/nonlive"]
"""
),
(
"alleged_filetype",
"String",
"""
The alleged file type of the executable.
Ex: exe
"""
),
(
"cmdline",
"String",
"""
Command line arguments used to start the process.
Ex: powershell.exe -ExecutionPolicy Bypass
"""
),
(
"filename",
"String",
"""
Process filename without path.
Ex: powershell.exe
"""
),
(
"filepath",
"String",
"""
Full file path of the executable.
Ex: C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe
"""
),
(
"process_id",
"String",
"""
Process identifier.
Ex: pid:12345:abcdef
"""
),
(
"parent_process_id",
"String",
"""
Parent process identifier.
Ex: pid:12344:ghijkl
"""
),
(
"local_process_id",
"Number",
"""
Local process ID number.
Ex: 12345
"""
),
(
"process_start_time",
"Number",
"""
Process start timestamp (epoch).
Ex: 1724347200
"""
),
(
"process_end_time",
"Number",
"""
Process end timestamp (epoch).
Ex: 1724347800
"""
),
(
"tree_id",
"String",
"""
Process tree identifier.
Ex: tree:77d11725:abcd1234
"""
),
(
"tree_root",
"String",
"""
Process tree root identifier.
Ex: root:77d11725:efgh5678
"""
),
(
"device.agent_load_flags",
"String",
"""
Agent load flags configuration.
Ex: 0x00000001
"""
),
(
"device.agent_local_time",
"Timestamp",
"""
Agent local timestamp in UTC format.
Ex: 2024-02-22T14:15:03.112Z
"""
),
(
"device.agent_version",
"String",
"""
CrowdStrike Falcon agent version.
Ex: 7.10.19103.0
"""
),
(
"device.bios_manufacturer",
"String",
"""
System BIOS manufacturer name.
Ex: Dell Inc.
"""
),
(
"device.bios_version",
"String",
"""
System BIOS version information.
Ex: 2.18.0
"""
),
(
"device.config_id_base",
"String",
"""
Base configuration identifier.
Ex: 65994753
"""
),
(
"device.config_id_build",
"String",
"""
Build configuration identifier.
Ex: 19103
"""
),
(
"device.config_id_platform",
"String",
"""
Platform configuration identifier.
Ex: 3
"""
),
(
"device.device_id",
"String",
"""
Unique device identifier.
Ex: 77d11725xxxxxxxxxxxxxxxxxxxxc48ca19
"""
),
(
"device.external_ip",
"String",
"""
Device external/public IP address.
Ex: 203.0.113.5
"""
),
(
"device.first_seen",
"Timestamp",
"""
First time device was seen in UTC format.
Ex: 2024-01-15T10:30:00.000Z
"""
),
(
"device.hostname",
"String",
"""
Device hostname or computer name.
Ex: DESKTOP-ABC123
"""
),
(
"device.last_seen",
"Timestamp",
"""
Last time device was seen in UTC format.
Ex: 2024-02-22T14:15:03.112Z
"""
),
(
"device.local_ip",
"String",
"""
Device local/private IP address.
Ex: 192.168.1.100
"""
),
(
"device.major_version",
"String",
"""
Operating system major version.
Ex: 10
"""
),
(
"device.minor_version",
"String",
"""
Operating system minor version.
Ex: 0
"""
),
(
"device.modified_timestamp",
"Timestamp",
"""
Device record last modified timestamp in UTC format.
Ex: 2024-02-22T15:15:05.637Z
"""
),
(
"device.os_version",
"String",
"""
Complete operating system version string.
Ex: Windows 10
"""
),
(
"device.ou",
"String",
"""
Organizational unit or domain path.
Ex: OU=Computers,DC=example,DC=com
"""
),
(
"device.platform_id",
"String",
"""
Platform identifier code.
Ex: 0
"""
),
(
"device.platform_name",
"String",
"""
Operating system platform name.
Ex: Windows
"""
),
(
"device.product_type",
"String",
"""
Product type identifier.
Ex: 1
"""
),
(
"device.product_type_desc",
"String",
"""
Product type description.
Ex: Workstation
"""
),
(
"device.status",
"String",
"""
Device connection status.
Ex: normal
"""
),
(
"device.system_manufacturer",
"String",
"""
System hardware manufacturer.
Ex: Dell Inc.
"""
),
(
"device.system_product_name",
"String",
"""
System product model name.
Ex: OptiPlex 7090
"""
),
(
"md5",
"String",
"""
MD5 hash of the file.
Ex: 5d41402abc4b2a76b9719d911017c592
"""
),
(
"sha1",
"String",
"""
SHA1 hash of the file.
Ex: aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d
"""
),
(
"sha256",
"String",
"""
SHA256 hash of the file.
Ex: 13550350a8681c84c861aac2e5b440161c2b33a3e4f302ac680ca5b686de48de
"""
),
(
"global_prevalence",
"String",
"""
Global prevalence rating of the file.
Ex: rare
"""
),
(
"local_prevalence",
"String",
"""
Local prevalence rating within the organization.
Ex: common
"""
),
(
"charlotte.can_triage",
"Boolean",
"""
Whether alert can be triaged automatically.
Ex: true
"""
),
(
"charlotte.triage_status",
"String",
"""
Automated triage status.
Ex: triaged
"""
),
(
"incident.created",
"Timestamp",
"""
Incident creation timestamp in UTC format.
Ex: 2024-02-22T14:15:03.112Z
"""
),
(
"incident.end",
"Timestamp",
"""
Incident end timestamp in UTC format.
Ex: 2024-02-22T14:45:03.112Z
"""
),
(
"incident.id",
"String",
"""
Unique incident identifier.
Ex: inc_12345abcdef
"""
),
(
"incident.score",
"Number",
"""
Incident severity score (1-100).
Ex: 85
"""
),
(
"incident.start",
"Timestamp",
"""
Incident start timestamp in UTC format.
Ex: 2024-02-22T14:15:03.112Z
"""
),
(
"indicator_id",
"String",
"""
Threat indicator identifier.
Ex: ind_67890wxyz
"""
),
(
"parent_details.*",
"Object",
"""
Parent process information object. Use dot notation
for specific fields like parent_details.cmdline,
parent_details.filename, parent_details.filepath,
parent_details.process_id, etc.
Ex: parent_details.filename:'explorer.exe'
"""
),
(
"grandparent_details.*",
"Object",
"""
Grandparent process information object. Use dot
notation for specific fields like
grandparent_details.cmdline,
grandparent_details.filename, etc.
Ex: grandparent_details.filepath:'*winlogon*'
"""
),
(
"child_process_ids",
"Array",
"""
List of child process identifiers spawned by this
process (array field).
Ex: ["pid:12346:abcdef", "pid:12347:ghijkl"]
"""
),
(
"triggering_process_graph_id",
"String",
"""
Process graph identifier for the triggering process
in the attack chain.
Ex: graph:77d11725:trigger123
"""
),
(
"ioc_context",
"Array",
"""
IOC context information and metadata (array field).
Ex: ["malware_family", "apt_group"]
"""
),
(
"ioc_values",
"Array",
"""
IOC values associated with the alert (array field).
Ex: ["192.168.1.100", "malicious.exe"]
"""
),
(
"falcon_host_link",
"String",
"""
Direct link to Falcon console for this host.
Ex: https://falcon.crowdstrike.com/hosts/detail/77d11725xxxxxxxxxxxxxxxxxxxxc48ca19
"""
),
(
"user_id",
"String",
"""
User identifier associated with the process.
Ex: S-1-5-21-1234567890-987654321-1122334455-1001
"""
),
(
"user_name",
"String",
"""
Username associated with the process.
Ex: administrator
"""
),
(
"logon_domain",
"String",
"""
Logon domain name for the user.
Ex: CORP
"""
),
]
SEARCH_DETECTIONS_FQL_DOCUMENTATION = r"""Falcon Query Language (FQL) - Search Detections/Alerts Guide
=== BASIC SYNTAX ===
field_name:[operator]'value'
=== OPERATORS ===
• = (default): field_name:'value'
• !: field_name:!'value' (not equal)
• >, >=, <, <=: field_name:>50 (comparison)
• ~: field_name:~'partial' (text match, case insensitive)
• !~: field_name:!~'exclude' (not text match)
• *: field_name:'prefix*' or field_name:'*suffix*' (wildcards)
=== DATA TYPES ===
• String: 'value'
• Number: 123 (no quotes)
• Boolean: true/false (no quotes)
• Timestamp: 'YYYY-MM-DDTHH:MM:SSZ'
• Array: ['value1', 'value2']
=== WILDCARDS ===
✅ **String & Number fields**: field_name:'pattern*' (prefix), field_name:'*pattern' (suffix), field_name:'*pattern*' (contains)
❌ **Timestamp fields**: Not supported (causes errors)
⚠️ **Number wildcards**: Require quotes: pattern_id:'123*'
=== COMBINING ===
• + = AND: status:'new'+severity:>=70
• , = OR: product:'epp',product:'xdr'
• () = GROUPING: status:'new'+(severity:>=60+severity:<80)+product:'epp'
=== COMMON PATTERNS ===
🔍 SORT OPTIONS:
• timestamp: Timestamp when the detection occurred
• created_timestamp: When the detection was created
• updated_timestamp: When the detection was last modified
• severity: Severity level of the detection (recommended when sorting by severity)
• confidence: Confidence level of the detection
• agent_id: Agent ID associated with the detection
Sort either asc (ascending) or desc (descending).
Both formats are supported: 'severity.desc' or 'severity|desc'
When searching for high severity detections, use 'severity.desc' to get the highest severity detections first.
For chronological ordering, use 'timestamp.desc' for most recent detections first.
Examples: 'severity.desc', 'timestamp.desc'
🔍 SEVERITY RANGES:
**Numeric Ranges (for precise filtering):**
• Informational: severity:<20
• Low: severity:>=20+severity:<40
• Medium: severity:>=40+severity:<60
• High: severity:>=60+severity:<80
• Critical: severity:>=80
**Name-based (easier to use):**
• severity_name:'Informational' (severity 1-19)
• severity_name:'Low' (severity 20-39)
• severity_name:'Medium' (severity 40-59)
• severity_name:'High' (severity 60-79)
• severity_name:'Critical' (severity 80-100)
**Range Examples:**
• Medium severity and above: severity:>=40 OR severity_name:'Medium',severity_name:'High',severity_name:'Critical'
• High severity and above: severity:>=60 OR severity_name:'High',severity_name:'Critical'
• Critical alerts only: severity:>=80 OR severity_name:'Critical'
🔍 ESSENTIAL FILTERS:
• Status: status:'new' | status:'in_progress' | status:'closed' | status:'reopened'
• Severity (by name): severity_name:'High' | severity_name:'Critical' | severity_name:'Medium' | severity_name:'Low' | severity_name:'Informational'
• Severity (by range): severity:>=80 (Critical+) | severity:>=60 (High+) | severity:>=40 (Medium+) | severity:>=20 (Low+)
• Product: product:'epp' | product:'idp' | product:'xdr' | product:'overwatch' (see field table for all)
• Assignment: assigned_to_name:!'*' (unassigned) | assigned_to_name:'user.name'
• Timestamps: created_timestamp:>'2025-01-01T00:00:00Z' | created_timestamp:>='date1'+created_timestamp:<='date2'
• Wildcards: name:'EICAR*' | description:'*credential*' | agent_id:'77d11725*' | pattern_id:'301*'
• Combinations: status:'new'+severity_name:'High'+product:'epp' | status:'new'+severity:>=70+product:'epp' | product:'epp',product:'xdr'
🔍 EPP-SPECIFIC PATTERNS:
• Device targeting: product:'epp'+device.hostname:'DC*' | product:'epp'+device.external_ip:'192.168.*'
• Process analysis: product:'epp'+filename:'*cmd*'+cmdline:'*password*' | product:'epp'+filepath:'*system32*'
• Hash investigation: product:'epp'+sha256:'abc123...' | product:'epp'+md5:'def456...'
• Incident correlation: product:'epp'+incident.id:'inc_12345' | product:'epp'+incident.score:>=80
• User activity: product:'epp'+user_name:'admin*' | product:'epp'+logon_domain:'CORP'
• Nested queries: product:'epp'+device.agent_version:'7.*' | product:'epp'+parent_details.filename:'*explorer*'
=== falcon_search_detections FQL filter available fields ===
""" + generate_md_table(SEARCH_DETECTIONS_FQL_FILTERS) + """
=== COMPLEX FILTER EXAMPLES ===
# New high-severity endpoint alerts (numeric approach)
status:'new'+(severity:>=60+severity:<80)+product:'epp'
# New high-severity endpoint alerts (name-based approach)
status:'new'+severity_name:'High',severity_name:'Critical'+product:'epp'
# Unassigned critical alerts from last 24 hours (numeric)
assigned_to_name:!'*'+severity:>=90+created_timestamp:>'2025-01-19T00:00:00Z'
# Unassigned critical alerts from last 24 hours (name-based)
assigned_to_name:!'*'+severity_name:'Critical'+created_timestamp:>'2025-01-19T00:00:00Z'
# Medium severity and above endpoint alerts (numeric - easier for ranges)
severity:>=40+product:'epp'+status:'new'
# Medium severity and above endpoint alerts (name-based - more explicit)
severity_name:'Medium',severity_name:'High',severity_name:'Critical'+product:'epp'+status:'new'
# OverWatch alerts with credential access tactics
product:'overwatch'+tactic:'Credential Access'
# XDR high severity alerts with specific technique (name-based)
product:'xdr'+severity_name:'High'+technique_id:'T1003'
# XDR high severity alerts with specific technique (numeric)
product:'xdr'+severity:>=60+technique_id:'T1003'
# Find alerts by aggregate_id (related alerts)
aggregate_id:'aggind:77d1172532c8xxxxxxxxxxxxxxxxxxxx49030016385'
# Find alerts from multiple products
product:['epp', 'xdr', 'overwatch']
# Recently updated critical alerts assigned to specific analyst
assigned_to_name:'alice.anderson'+updated_timestamp:>'2025-01-18T12:00:00Z'+severity_name:'Critical'
# Find low-priority informational alerts for cleanup
severity_name:'Informational'+status:'closed'+assigned_to_name:!'*'
# Find alerts with specific MITRE ATT&CK tactics and medium+ severity
tactic:['Credential Access', 'Persistence', 'Privilege Escalation']+severity:>=40
# Closed alerts resolved quickly (under 1 hour) - high severity only
status:'closed'+seconds_to_resolved:<3600+severity_name:'High',severity_name:'Critical'
# Date range with multiple products and high+ severity (name-based)
created_timestamp:>='2025-01-15T00:00:00Z'+created_timestamp:<='2025-01-20T00:00:00Z'+product:'epp',product:'xdr'+severity_name:'High',severity_name:'Critical'
# Date range with multiple products and high+ severity (numeric)
created_timestamp:>='2025-01-15T00:00:00Z'+created_timestamp:<='2025-01-20T00:00:00Z'+product:'epp',product:'xdr'+severity:>=60
# All unassigned alerts except informational (name-based exclusion)
assigned_to_name:!'*'+severity_name:!'Informational'
# All unassigned alerts except informational (numeric approach)
assigned_to_name:!'*'+severity:>=20
"""
```
--------------------------------------------------------------------------------
/tests/modules/test_idp.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the IDP (Identity Protection) module.
"""
import unittest
from falcon_mcp.modules.idp import IdpModule
from tests.modules.utils.test_modules import TestModules
class TestIdpModule(TestModules):
"""Test cases for the IDP module."""
def setUp(self):
"""Set up test fixtures."""
self.setup_module(IdpModule)
def test_register_tools(self):
"""Test registering tools with the server."""
expected_tools = [
"falcon_idp_investigate_entity",
]
self.assert_tools_registered(expected_tools)
def test_investigate_entity_basic_functionality(self):
"""Test basic entity investigation functionality."""
# Setup mock GraphQL response for entity resolution
mock_response = {
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-123",
"primaryDisplayName": "Test User",
"secondaryDisplayName": "[email protected]",
"type": "USER",
"riskScore": 75,
"riskScoreSeverity": "MEDIUM",
}
]
}
}
},
}
self.mock_client.command.return_value = mock_response
# Call investigate_entity with basic parameters
result = self.module.investigate_entity(
entity_names=["Test User"],
investigation_types=["entity_details"],
limit=10,
)
# Verify client command was called (at least for entity resolution)
self.assertTrue(self.mock_client.command.called)
# Verify result structure
self.assertIn("investigation_summary", result)
self.assertIn("entity_details", result)
self.assertEqual(result["investigation_summary"]["status"], "completed")
self.assertGreater(result["investigation_summary"]["entity_count"], 0)
def test_investigate_entity_with_multiple_investigation_types(self):
"""Test entity investigation with multiple investigation types."""
# Setup mock GraphQL responses for different investigation types
mock_responses = [
# Entity resolution response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-456",
"primaryDisplayName": "Admin User",
"secondaryDisplayName": "[email protected]",
}
]
}
}
},
},
# Entity details response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-456",
"primaryDisplayName": "Admin User",
"secondaryDisplayName": "[email protected]",
"type": "USER",
"riskScore": 85,
"riskScoreSeverity": "HIGH",
"riskFactors": [
{
"type": "PRIVILEGED_ACCESS",
"severity": "HIGH",
}
],
}
]
}
}
},
},
# Timeline response
{
"status_code": 200,
"body": {
"data": {
"timeline": {
"nodes": [
{
"eventId": "event-123",
"eventType": "AUTHENTICATION",
"timestamp": "2024-01-01T12:00:00Z",
}
],
"pageInfo": {"hasNextPage": False},
}
}
},
},
# Relationship analysis response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-456",
"primaryDisplayName": "Admin User",
"associations": [
{
"bindingType": "OWNERSHIP",
"entity": {
"entityId": "server-789",
"primaryDisplayName": "Test Server",
},
}
],
}
]
}
}
},
},
# Risk assessment response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-456",
"primaryDisplayName": "Admin User",
"riskScore": 85,
"riskScoreSeverity": "HIGH",
"riskFactors": [
{
"type": "PRIVILEGED_ACCESS",
"severity": "HIGH",
}
],
}
]
}
}
},
},
]
self.mock_client.command.side_effect = mock_responses
# Call investigate_entity with multiple investigation types
result = self.module.investigate_entity(
email_addresses=["[email protected]"],
investigation_types=[
"entity_details",
"timeline_analysis",
"relationship_analysis",
"risk_assessment",
],
limit=50,
include_associations=True,
include_accounts=True,
include_incidents=True,
)
# Verify multiple client commands were called
self.assertGreaterEqual(self.mock_client.command.call_count, 2)
# Verify result structure contains all investigation types
self.assertIn("investigation_summary", result)
self.assertIn("entity_details", result)
self.assertIn("timeline_analysis", result)
self.assertIn("relationship_analysis", result)
self.assertIn("risk_assessment", result)
# Verify investigation summary
self.assertEqual(result["investigation_summary"]["status"], "completed")
self.assertGreater(result["investigation_summary"]["entity_count"], 0)
self.assertEqual(len(result["investigation_summary"]["investigation_types"]), 4)
def test_investigate_entity_no_identifiers_error(self):
"""Test error handling when no entity identifiers are provided."""
# Call investigate_entity without any identifiers
result = self.module.investigate_entity()
# Verify error response
self.assertIn("error", result)
self.assertIn("investigation_summary", result)
self.assertEqual(result["investigation_summary"]["status"], "failed")
self.assertEqual(result["investigation_summary"]["entity_count"], 0)
# Verify no API calls were made
self.assertFalse(self.mock_client.command.called)
def test_investigate_entity_no_entities_found(self):
"""Test handling when no entities are found matching criteria."""
# Setup mock response with no entities
mock_response = {
"status_code": 200,
"body": {"data": {"entities": {"nodes": []}}},
}
self.mock_client.command.return_value = mock_response
# Call investigate_entity
result = self.module.investigate_entity(entity_names=["NonExistent User"])
# Verify result indicates no entities found
self.assertIn("error", result)
self.assertIn("investigation_summary", result)
self.assertEqual(result["investigation_summary"]["status"], "failed")
self.assertEqual(result["investigation_summary"]["entity_count"], 0)
self.assertIn("search_criteria", result)
def test_investigate_entity_with_geographic_location_data(self):
"""Test entity investigation includes geographic location data in timeline analysis."""
# Setup mock GraphQL responses with geographic location data
mock_responses = [
# Entity resolution response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-geo-123",
"primaryDisplayName": "Global User",
"secondaryDisplayName": "[email protected]",
}
]
}
}
},
},
# Timeline response with geographic location data
{
"status_code": 200,
"body": {
"data": {
"timeline": {
"nodes": [
{
"eventId": "auth-event-123",
"eventType": "AUTHENTICATION",
"eventSeverity": "MEDIUM",
"timestamp": "2024-01-01T12:00:00Z",
"sourceEntity": {
"entityId": "test-entity-geo-123",
"primaryDisplayName": "Global User",
},
"targetEntity": {
"entityId": "server-456",
"primaryDisplayName": "Corporate Server",
},
"geoLocation": {
"country": "United States",
"countryCode": "US",
"city": "New York",
"cityCode": "NYC",
"latitude": 40.7128,
"longitude": -74.0060,
},
"locationAssociatedWithUser": True,
"userDisplayName": "Global User",
"endpointDisplayName": "NYC-Workstation-01",
"ipAddress": "192.168.1.100",
},
{
"eventId": "auth-event-456",
"eventType": "AUTHENTICATION",
"eventSeverity": "HIGH",
"timestamp": "2024-01-02T08:30:00Z",
"sourceEntity": {
"entityId": "test-entity-geo-123",
"primaryDisplayName": "Global User",
},
"targetEntity": {
"entityId": "server-789",
"primaryDisplayName": "Remote Server",
},
"geoLocation": {
"country": "Germany",
"countryCode": "DE",
"city": "Berlin",
"cityCode": "BER",
"latitude": 52.5200,
"longitude": 13.4050,
},
"locationAssociatedWithUser": True,
"userDisplayName": "Global User",
"endpointDisplayName": "BER-Laptop-02",
"ipAddress": "10.0.0.50",
},
],
"pageInfo": {"hasNextPage": False},
}
}
},
},
]
self.mock_client.command.side_effect = mock_responses
# Call investigate_entity with timeline analysis to get geographic data
result = self.module.investigate_entity(
entity_names=["Global User"],
investigation_types=["timeline_analysis"],
timeline_start_time="2024-01-01T00:00:00Z",
timeline_end_time="2024-01-02T23:59:59Z",
limit=50,
)
# Verify result structure
self.assertIn("investigation_summary", result)
self.assertIn("timeline_analysis", result)
self.assertEqual(result["investigation_summary"]["status"], "completed")
# Verify geographic location data is present in timeline events
timeline_data = result["timeline_analysis"]["timelines"][0]["timeline"]
self.assertGreater(len(timeline_data), 0)
# Check first event has geographic location data
first_event = timeline_data[0]
self.assertIn("geoLocation", first_event)
self.assertIn("country", first_event["geoLocation"])
self.assertIn("countryCode", first_event["geoLocation"])
self.assertIn("city", first_event["geoLocation"])
self.assertIn("cityCode", first_event["geoLocation"])
self.assertIn("latitude", first_event["geoLocation"])
self.assertIn("longitude", first_event["geoLocation"])
# Verify geographic location values
self.assertEqual(first_event["geoLocation"]["country"], "United States")
self.assertEqual(first_event["geoLocation"]["countryCode"], "US")
self.assertEqual(first_event["geoLocation"]["city"], "New York")
self.assertEqual(first_event["geoLocation"]["cityCode"], "NYC")
# Check additional location fields
self.assertIn("locationAssociatedWithUser", first_event)
self.assertIn("userDisplayName", first_event)
self.assertIn("endpointDisplayName", first_event)
self.assertIn("ipAddress", first_event)
# Verify second event has different country (multi-location user)
second_event = timeline_data[1]
self.assertEqual(second_event["geoLocation"]["country"], "Germany")
self.assertEqual(second_event["geoLocation"]["countryCode"], "DE")
def test_investigate_entity_with_geo_location_associations(self):
"""Test entity investigation includes geographic location associations."""
# Setup mock GraphQL responses with geographic location associations
mock_responses = [
# Entity resolution response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-assoc-456",
"primaryDisplayName": "Travel User",
"secondaryDisplayName": "[email protected]",
}
]
}
}
},
},
# Entity details response with geographic associations
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "test-entity-assoc-456",
"primaryDisplayName": "Travel User",
"secondaryDisplayName": "[email protected]",
"type": "USER",
"riskScore": 60,
"riskScoreSeverity": "MEDIUM",
"associations": [
{
"bindingType": "LOCATION_ACCESS",
"geoLocation": {
"country": "France",
"countryCode": "FR",
"city": "Paris",
"cityCode": "PAR",
"latitude": 48.8566,
"longitude": 2.3522,
},
},
{
"bindingType": "LOCATION_ACCESS",
"geoLocation": {
"country": "Japan",
"countryCode": "JP",
"city": "Tokyo",
"cityCode": "TYO",
"latitude": 35.6762,
"longitude": 139.6503,
},
},
],
}
]
}
}
},
},
]
self.mock_client.command.side_effect = mock_responses
# Call investigate_entity with entity details to get geographic associations
result = self.module.investigate_entity(
entity_names=["Travel User"],
investigation_types=["entity_details"],
include_associations=True,
limit=50,
)
# Verify result structure
self.assertIn("investigation_summary", result)
self.assertIn("entity_details", result)
self.assertEqual(result["investigation_summary"]["status"], "completed")
# Verify geographic location associations are present
entity_data = result["entity_details"]["entities"][0]
self.assertIn("associations", entity_data)
associations = entity_data["associations"]
self.assertGreater(len(associations), 0)
# Check geographic location associations
geo_associations = [
assoc for assoc in associations if "geoLocation" in assoc
]
self.assertGreater(len(geo_associations), 0)
# Verify first geographic association
first_geo_assoc = geo_associations[0]
self.assertIn("geoLocation", first_geo_assoc)
geo_location = first_geo_assoc["geoLocation"]
self.assertIn("country", geo_location)
self.assertIn("countryCode", geo_location)
self.assertIn("city", geo_location)
self.assertIn("cityCode", geo_location)
self.assertIn("latitude", geo_location)
self.assertIn("longitude", geo_location)
# Verify geographic location values
self.assertEqual(geo_location["country"], "France")
self.assertEqual(geo_location["countryCode"], "FR")
def test_investigate_entity_multi_country_detection(self):
"""Test detection of users active in multiple countries."""
# Setup mock GraphQL responses simulating user activity in 4+ countries
mock_responses = [
# Entity resolution response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "multi-country-user-789",
"primaryDisplayName": "Global Executive",
"secondaryDisplayName": "[email protected]",
}
]
}
}
},
},
# Timeline response with activities from multiple countries
{
"status_code": 200,
"body": {
"data": {
"timeline": {
"nodes": [
{
"eventId": "event-us-001",
"eventType": "SUCCESSFUL_AUTHENTICATION",
"timestamp": "2024-01-01T09:00:00Z",
"geoLocation": {
"country": "United States",
"countryCode": "US",
"city": "San Francisco",
"cityCode": "SFO",
},
"locationAssociatedWithUser": True,
},
{
"eventId": "event-uk-002",
"eventType": "SUCCESSFUL_AUTHENTICATION",
"timestamp": "2024-01-02T14:30:00Z",
"geoLocation": {
"country": "United Kingdom",
"countryCode": "GB",
"city": "London",
"cityCode": "LDN",
},
"locationAssociatedWithUser": True,
},
{
"eventId": "event-sg-003",
"eventType": "SUCCESSFUL_AUTHENTICATION",
"timestamp": "2024-01-03T22:15:00Z",
"geoLocation": {
"country": "Singapore",
"countryCode": "SG",
"city": "Singapore",
"cityCode": "SIN",
},
"locationAssociatedWithUser": True,
},
{
"eventId": "event-au-004",
"eventType": "SUCCESSFUL_AUTHENTICATION",
"timestamp": "2024-01-04T05:45:00Z",
"geoLocation": {
"country": "Australia",
"countryCode": "AU",
"city": "Sydney",
"cityCode": "SYD",
},
"locationAssociatedWithUser": True,
},
],
"pageInfo": {"hasNextPage": False},
}
}
},
},
]
self.mock_client.command.side_effect = mock_responses
# Call investigate_entity with timeline analysis
result = self.module.investigate_entity(
entity_names=["Global Executive"],
investigation_types=["timeline_analysis"],
timeline_start_time="2024-01-01T00:00:00Z",
timeline_end_time="2024-01-04T23:59:59Z",
limit=100,
)
# Verify result structure
self.assertIn("timeline_analysis", result)
timeline_events = result["timeline_analysis"]["timelines"][0]["timeline"]
self.assertEqual(len(timeline_events), 4)
# Extract unique countries from timeline events
countries = set()
for event in timeline_events:
if "geoLocation" in event and "country" in event["geoLocation"]:
countries.add(event["geoLocation"]["country"])
# Verify user has been active in 4 different countries
expected_countries = {"United States", "United Kingdom", "Singapore", "Australia"}
self.assertEqual(countries, expected_countries)
self.assertEqual(len(countries), 4)
# Verify each event has proper geographic location structure
for event in timeline_events:
self.assertIn("geoLocation", event)
geo_loc = event["geoLocation"]
self.assertIn("country", geo_loc)
self.assertIn("countryCode", geo_loc)
self.assertIn("city", geo_loc)
self.assertIn("cityCode", geo_loc)
self.assertTrue(event.get("locationAssociatedWithUser", False))
def test_investigate_entity_file_operation_geographic_data(self):
"""Test geographic location data in file operation events (targetEntity only)."""
# Setup mock response for file operation event with geographic data
mock_responses = [
# Entity resolution response
{
"status_code": 200,
"body": {
"data": {
"entities": {
"nodes": [
{
"entityId": "file-user-123",
"primaryDisplayName": "File User",
}
]
}
}
},
},
# Timeline response with file operation event
{
"status_code": 200,
"body": {
"data": {
"timeline": {
"nodes": [
{
"eventId": "file-op-001",
"eventType": "FILE_OPERATION",
"timestamp": "2024-01-01T15:30:00Z",
"targetEntity": {
"entityId": "file-server-456",
"primaryDisplayName": "Shared File Server",
},
"geoLocation": {
"country": "Canada",
"countryCode": "CA",
"city": "Toronto",
"cityCode": "YYZ",
"latitude": 43.6532,
"longitude": -79.3832,
},
"locationAssociatedWithUser": True,
"userDisplayName": "File User",
"endpointDisplayName": "TOR-Desktop-01",
"ipAddress": "172.16.0.25",
}
],
"pageInfo": {"hasNextPage": False},
}
}
},
},
]
self.mock_client.command.side_effect = mock_responses
# Call investigate_entity with timeline analysis
result = self.module.investigate_entity(
entity_names=["File User"],
investigation_types=["timeline_analysis"],
limit=50,
)
# Verify file operation event has geographic data
timeline_events = result["timeline_analysis"]["timelines"][0]["timeline"]
file_event = timeline_events[0]
# Verify event structure (should have targetEntity but no sourceEntity for file operations)
self.assertIn("targetEntity", file_event)
self.assertNotIn("sourceEntity", file_event)
# Verify geographic location data is present
self.assertIn("geoLocation", file_event)
geo_loc = file_event["geoLocation"]
self.assertEqual(geo_loc["country"], "Canada")
self.assertEqual(geo_loc["countryCode"], "CA")
self.assertEqual(geo_loc["city"], "Toronto")
# Verify additional location context
self.assertIn("locationAssociatedWithUser", file_event)
self.assertIn("userDisplayName", file_event)
self.assertIn("endpointDisplayName", file_event)
self.assertIn("ipAddress", file_event)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/idp.py:
--------------------------------------------------------------------------------
```python
"""
Identity Protection (IDP) module for Falcon MCP Server
This module provides tool for accessing and managing CrowdStrike Falcon Identity Protection capabilities.
Core use cases:
1. Entity Lookup & Investigation
"""
import json
from datetime import datetime
from typing import Any, Dict, List
from mcp.server import FastMCP
from pydantic import Field
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import sanitize_input
from falcon_mcp.modules.base import BaseModule
logger = get_logger(__name__)
class IdpModule(BaseModule):
"""Module for accessing and managing CrowdStrike Falcon Identity Protection."""
def register_tools(self, server: FastMCP) -> None:
"""Register IDP tools with the MCP server.
Args:
server: MCP server instance
"""
# Entity Investigation Tool
self._add_tool(
server=server,
method=self.investigate_entity,
name="idp_investigate_entity",
)
# ==========================================
# Entity Investigation Tool
# ==========================================
def investigate_entity(
self,
# Entity Identification (Required - at least one)
entity_ids: list[str] | None = Field(
default=None,
description="List of specific entity IDs to investigate (e.g., ['entity-001'])",
),
entity_names: list[str] | None = Field(
default=None,
description="List of entity names to search for (e.g., ['Administrator', 'John Doe']). When combined with other parameters, uses AND logic.",
),
email_addresses: list[str] | None = Field(
default=None,
description="List of email addresses to investigate (e.g., ['[email protected]']). When combined with other parameters, uses AND logic.",
),
ip_addresses: list[str] | None = Field(
default=None,
description="List of IP addresses/endpoints to investigate (e.g., ['1.1.1.1']). When combined with other parameters, uses AND logic.",
),
domain_names: list[str] | None = Field(
default=None,
description="List of domain names to search for (e.g., ['XDRHOLDINGS.COM', 'CORP.LOCAL']). When combined with other parameters, uses AND logic. Example: entity_names=['Administrator'] + domain_names=['DOMAIN.COM'] finds Administrator user in that specific domain.",
),
# Investigation Scope Control
investigation_types: list[str] = Field(
default=["entity_details"],
description="Types of investigation to perform: 'entity_details', 'timeline_analysis', 'relationship_analysis', 'risk_assessment'. Use multiple for comprehensive analysis.",
),
# Timeline Parameters (when timeline_analysis is included)
timeline_start_time: str | None = Field(
default=None,
description="Start time for timeline analysis in ISO format (e.g., '2024-01-01T00:00:00Z')",
),
timeline_end_time: str | None = Field(
default=None,
description="End time for timeline analysis in ISO format",
),
timeline_event_types: list[str] | None = Field(
default=None,
description="Filter timeline by event types: 'ACTIVITY', 'NOTIFICATION', 'THREAT', 'ENTITY', 'AUDIT', 'POLICY', 'SYSTEM'",
),
# Relationship Parameters (when relationship_analysis is included)
relationship_depth: int = Field(
default=2,
ge=1,
le=3,
description="Depth of relationship analysis (1-3 levels)",
),
# General Parameters
limit: int = Field(
default=10,
ge=1,
le=200,
description="Maximum number of results to return",
),
include_associations: bool = Field(
default=True,
description="Include entity associations and relationships in results",
),
include_accounts: bool = Field(
default=True,
description="Include account information in results",
),
include_incidents: bool = Field(
default=True,
description="Include open security incidents in results",
),
) -> Dict[str, Any]:
"""Comprehensive entity investigation tool.
This tool provides complete entity investigation capabilities including:
- Entity search and details lookup
- Activity timeline analysis
- Relationship and association mapping
- Risk assessment
"""
logger.debug("Starting comprehensive entity investigation")
# Step 1: Validate inputs
validation_error = self._validate_entity_identifiers(
entity_ids,
entity_names,
email_addresses,
ip_addresses,
domain_names,
investigation_types,
)
if validation_error:
return validation_error
# Step 2: Entity Resolution - Find entities from various identifiers
logger.debug("Resolving entities from provided identifiers")
search_criteria = {
"entity_ids": entity_ids,
"entity_names": entity_names,
"email_addresses": email_addresses,
"ip_addresses": ip_addresses,
"domain_names": domain_names,
}
resolved_entity_ids = self._resolve_entities(
{
"entity_ids": entity_ids if entity_ids is not None else None,
"entity_names": entity_names if entity_names is not None else None,
"email_addresses": email_addresses if email_addresses is not None else None,
"ip_addresses": ip_addresses if ip_addresses is not None else None,
"domain_names": domain_names if domain_names is not None else None,
"limit": limit,
}
)
# Check if entity resolution failed
if isinstance(resolved_entity_ids, dict) and "error" in resolved_entity_ids:
return self._create_error_response(
resolved_entity_ids["error"],
0,
investigation_types,
search_criteria,
)
if not resolved_entity_ids:
return self._create_error_response(
"No entities found matching the provided criteria",
0,
investigation_types,
search_criteria,
)
logger.debug(f"Resolved {len(resolved_entity_ids)} entities for investigation")
# Step 3: Execute investigations based on requested types
investigation_results = {}
investigation_params = {
"include_associations": include_associations,
"include_accounts": include_accounts,
"include_incidents": include_incidents,
"timeline_start_time": timeline_start_time,
"timeline_end_time": timeline_end_time,
"timeline_event_types": timeline_event_types,
"relationship_depth": relationship_depth,
"limit": limit,
}
for investigation_type in investigation_types:
result = self._execute_single_investigation(
investigation_type, resolved_entity_ids, investigation_params
)
if "error" in result:
logger.error(f"Error in {investigation_type} investigation: {result['error']}")
return self._create_error_response(
f"Investigation failed during {investigation_type}: {result['error']}",
len(resolved_entity_ids),
investigation_types,
)
investigation_results[investigation_type] = result
# Step 4: Synthesize comprehensive response
return self._synthesize_investigation_response(
resolved_entity_ids,
investigation_results,
{
"investigation_types": investigation_types,
"search_criteria": search_criteria,
},
)
# ==========================================
# Investigation Helper Methods
# ==========================================
def _validate_entity_identifiers(
self,
entity_ids,
entity_names,
email_addresses,
ip_addresses,
domain_names,
investigation_types,
):
"""Validate that at least one entity identifier is provided."""
if not any(
[
entity_ids,
entity_names,
email_addresses,
ip_addresses,
domain_names,
]
):
return {
"error": "At least one entity identifier must be provided (entity_ids, entity_names, email_addresses, ip_addresses, or domain_names)",
"investigation_summary": {
"entity_count": 0,
"investigation_types": investigation_types,
"timestamp": datetime.utcnow().isoformat(),
"status": "failed",
},
}
return None
def _create_error_response(
self,
error_message,
entity_count,
investigation_types,
search_criteria=None,
):
"""Create a standardized error response."""
response = {
"error": error_message,
"investigation_summary": {
"entity_count": entity_count,
"investigation_types": investigation_types,
"timestamp": datetime.utcnow().isoformat(),
"status": "failed",
},
}
if search_criteria:
response["search_criteria"] = search_criteria
return response
def _execute_single_investigation(
self,
investigation_type,
resolved_entity_ids,
params,
):
"""Execute a single investigation type and return results or error."""
logger.debug(f"Executing {investigation_type} investigation")
if investigation_type == "entity_details":
return self._get_entity_details_batch(
resolved_entity_ids,
{
"include_associations": params.get("include_associations", True),
"include_accounts": params.get("include_accounts", True),
"include_incidents": params.get("include_incidents", True),
},
)
if investigation_type == "timeline_analysis":
return self._get_entity_timelines_batch(
resolved_entity_ids,
{
"start_time": params.get("timeline_start_time"),
"end_time": params.get("timeline_end_time"),
"event_types": params.get("timeline_event_types"),
"limit": params.get("limit", 50),
},
)
if investigation_type == "relationship_analysis":
return self._analyze_relationships_batch(
resolved_entity_ids,
{
"relationship_depth": params.get("relationship_depth", 2),
"include_risk_context": True,
"limit": params.get("limit", 50),
},
)
if investigation_type == "risk_assessment":
return self._assess_risks_batch(
resolved_entity_ids,
{"include_risk_factors": True},
)
logger.warning(f"Unknown investigation type: {investigation_type}")
return {"error": f"Unknown investigation type: {investigation_type}"}
# ==========================================
# GraphQL Query Building Helper Methods
# ==========================================
def _build_entity_details_query(
self,
entity_ids: List[str],
include_risk_factors: bool,
include_associations: bool,
include_incidents: bool,
include_accounts: bool,
) -> str:
"""Build GraphQL query for detailed entity information."""
entity_ids_json = json.dumps(entity_ids)
# Start with minimal safe fields
fields = [
"entityId",
"primaryDisplayName",
"secondaryDisplayName",
"type",
"riskScore",
"riskScoreSeverity",
]
if include_risk_factors:
fields.append("""
riskFactors {
type
severity
}
""")
if include_associations:
fields.append("""
associations {
bindingType
... on EntityAssociation {
entity {
entityId
primaryDisplayName
secondaryDisplayName
type
}
}
... on LocalAdminLocalUserAssociation {
accountName
}
... on LocalAdminDomainEntityAssociation {
entityType
entity {
entityId
primaryDisplayName
secondaryDisplayName
}
}
... on GeoLocationAssociation {
geoLocation {
country
countryCode
city
cityCode
latitude
longitude
}
}
}
""")
if include_incidents:
fields.append("""
openIncidents(first: 10) {
nodes {
type
startTime
endTime
compromisedEntities {
entityId
primaryDisplayName
}
}
}
""")
if include_accounts:
fields.append("""
accounts {
... on ActiveDirectoryAccountDescriptor {
domain
samAccountName
ou
servicePrincipalNames
passwordAttributes {
lastChange
strength
}
expirationTime
}
... on SsoUserAccountDescriptor {
dataSource
mostRecentActivity
title
creationTime
passwordAttributes {
lastChange
}
}
... on AzureCloudServiceAdapterDescriptor {
registeredTenantType
appOwnerOrganizationId
publisherDomain
signInAudience
}
... on CloudServiceAdapterDescriptor {
dataSourceParticipantIdentifier
}
}
""")
fields_string = "\n".join(fields)
return f"""
query {{
entities(entityIds: {entity_ids_json}, first: 50) {{
nodes {{
{fields_string}
}}
}}
}}
"""
def _build_timeline_query(
self,
entity_id: str,
start_time: str | None,
end_time: str | None,
event_types: list[str] | None,
limit: int,
) -> str:
"""Build GraphQL query for entity timeline."""
filters = [f'sourceEntityQuery: {{entityIds: ["{entity_id}"]}}']
if start_time and isinstance(start_time, str):
filters.append(f'startTime: "{start_time}"')
if end_time and isinstance(end_time, str):
filters.append(f'endTime: "{end_time}"')
if event_types and isinstance(event_types, list):
# Format event types as unquoted GraphQL enums
categories_str = "[" + ", ".join(event_types) + "]"
filters.append(f"categories: {categories_str}")
filter_string = ", ".join(filters)
return f"""
query {{
timeline({filter_string}, first: {limit}) {{
nodes {{
eventId
eventType
eventSeverity
timestamp
... on TimelineUserOnEndpointActivityEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineAuthenticationEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineAlertEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
}}
... on TimelineDceRpcEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineFailedAuthenticationEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineSuccessfulAuthenticationEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineServiceAccessEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineFileOperationEvent {{
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineLdapSearchEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineRemoteCodeExecutionEvent {{
sourceEntity {{
entityId
primaryDisplayName
}}
targetEntity {{
entityId
primaryDisplayName
}}
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
locationAssociatedWithUser
userDisplayName
endpointDisplayName
ipAddress
}}
... on TimelineConnectorConfigurationEvent {{
category
}}
... on TimelineConnectorConfigurationAddedEvent {{
category
}}
... on TimelineConnectorConfigurationDeletedEvent {{
category
}}
... on TimelineConnectorConfigurationModifiedEvent {{
category
}}
}}
pageInfo {{
hasNextPage
endCursor
}}
}}
}}
"""
def _build_relationship_analysis_query(
self,
entity_id: str,
relationship_depth: int,
include_risk_context: bool,
limit: int,
) -> str:
"""Build GraphQL query for relationship analysis."""
risk_fields = ""
if include_risk_context:
risk_fields = """
riskScore
riskScoreSeverity
riskFactors {
type
severity
}
"""
# Build nested association fields based on relationship_depth
def build_association_fields(depth: int) -> str:
if depth <= 0:
return ""
nested_associations = ""
if depth > 1:
nested_associations = build_association_fields(depth - 1)
return f"""
associations {{
bindingType
... on EntityAssociation {{
entity {{
entityId
primaryDisplayName
secondaryDisplayName
type
{risk_fields}
{nested_associations}
}}
}}
... on LocalAdminLocalUserAssociation {{
accountName
}}
... on LocalAdminDomainEntityAssociation {{
entityType
entity {{
entityId
primaryDisplayName
secondaryDisplayName
type
{risk_fields}
{nested_associations}
}}
}}
... on GeoLocationAssociation {{
geoLocation {{
country
countryCode
city
cityCode
latitude
longitude
}}
}}
}}
"""
association_fields = build_association_fields(relationship_depth)
return f"""
query {{
entities(entityIds: ["{entity_id}"], first: {limit}) {{
nodes {{
entityId
primaryDisplayName
secondaryDisplayName
type
{risk_fields}
{association_fields}
}}
}}
}}
"""
def _build_risk_assessment_query(
self, entity_ids: List[str], include_risk_factors: bool
) -> str:
"""Build GraphQL query for risk assessment."""
entity_ids_json = json.dumps(entity_ids)
risk_fields = """
riskScore
riskScoreSeverity
"""
if include_risk_factors:
risk_fields += """
riskFactors {
type
severity
}
"""
return f"""
query {{
entities(entityIds: {entity_ids_json}, first: 50) {{
nodes {{
entityId
primaryDisplayName
{risk_fields}
}}
}}
}}
"""
def _resolve_entities(self, identifiers: Dict[str, Any]) -> List[str] | Dict[str, Any]:
"""Resolve entity IDs from various identifier types using unified AND-based query.
All provided identifiers are combined using AND logic in a single GraphQL query.
For example: entity_names=["Administrator"] + domain_names=["XDRHOLDINGS.COM"]
will find entities that match BOTH criteria.
Returns:
List[str]: List of resolved entity IDs on success
Dict[str, Any]: Error response on failure
"""
resolved_ids = []
# Direct entity IDs - no resolution needed
entity_ids = identifiers.get("entity_ids")
if entity_ids and isinstance(entity_ids, list):
resolved_ids.extend(entity_ids)
# Check if we have conflicting entity types (USER vs ENDPOINT)
email_addresses = identifiers.get("email_addresses")
ip_addresses = identifiers.get("ip_addresses")
has_user_criteria = bool(email_addresses)
has_endpoint_criteria = bool(ip_addresses)
# If we have both USER and ENDPOINT criteria, we need separate queries
if has_user_criteria and has_endpoint_criteria:
# This is a conflict - cannot search for both USER and ENDPOINT in same query
# For now, prioritize USER entities (emails) over ENDPOINT entities (IPs)
logger.warning(
"Cannot combine email addresses (USER) and IP addresses (ENDPOINT) in single query. Prioritizing USER entities."
)
ip_addresses = None
# Build unified GraphQL query with AND logic
query_filters = []
query_fields = []
# Add entity names filter
self._add_entity_filters(identifiers, query_fields, query_filters)
# Add email addresses filter (USER entities)
self._add_email_filter(email_addresses, query_fields, query_filters)
# Add IP addresses filter (ENDPOINT entities) - only if no USER criteria
self._add_ip_filter(has_user_criteria, ip_addresses, query_fields, query_filters)
# Add domain names filter
domain_names = self._add_domain_filter(identifiers, query_fields, query_filters)
# If we have filters to apply, execute unified query
if query_filters:
# Remove duplicates from fields
query_fields = list(set(query_fields))
fields_string = "\n".join(query_fields)
# Add account information for domain context
if domain_names:
fields_string += """
accounts {
... on ActiveDirectoryAccountDescriptor {
domain
samAccountName
}
}"""
filters_string = ", ".join(query_filters)
limit = identifiers.get("limit", 50)
query = f"""
query {{
entities({filters_string}, first: {limit}) {{
nodes {{
entityId
{fields_string}
}}
}}
}}
"""
response = self.client.command("api_preempt_proxy_post_graphql", body={"query": query})
result = handle_api_response(
response,
operation="api_preempt_proxy_post_graphql",
error_message="Failed to resolve entities with combined filters",
default_result=None,
)
if self._is_error(result):
return result
# Extract entities from GraphQL response structure
data = response.get("body", {}).get("data", {})
entities = data.get("entities", {}).get("nodes", [])
resolved_ids.extend([entity["entityId"] for entity in entities])
# Remove duplicates and return
return list(set(resolved_ids))
def _add_domain_filter(
self,
identifiers,
query_fields,
query_filters,
):
domain_names = identifiers.get("domain_names")
if domain_names and isinstance(domain_names, list):
sanitized_domains = [sanitize_input(domain) for domain in domain_names]
domains_json = json.dumps(sanitized_domains)
query_filters.append(f"domains: {domains_json}")
query_fields.extend(["primaryDisplayName", "secondaryDisplayName"])
return domain_names
def _add_ip_filter(
self,
has_user_criteria,
ip_addresses,
query_fields,
query_filters,
):
if ip_addresses and isinstance(ip_addresses, list) and not has_user_criteria:
sanitized_ips = [sanitize_input(ip) for ip in ip_addresses]
ips_json = json.dumps(sanitized_ips)
query_filters.append(f"primaryDisplayNames: {ips_json}")
query_filters.append("types: [ENDPOINT]")
query_fields.append("primaryDisplayName")
def _add_email_filter(
self,
email_addresses,
query_fields,
query_filters,
):
if email_addresses and isinstance(email_addresses, list):
sanitized_emails = [sanitize_input(email) for email in email_addresses]
emails_json = json.dumps(sanitized_emails)
query_filters.append(f"secondaryDisplayNames: {emails_json}")
query_filters.append("types: [USER]")
query_fields.extend(["primaryDisplayName", "secondaryDisplayName"])
def _add_entity_filters(
self,
identifiers,
query_fields,
query_filters,
):
entity_names = identifiers.get("entity_names")
if entity_names and isinstance(entity_names, list):
sanitized_names = [sanitize_input(name) for name in entity_names]
names_json = json.dumps(sanitized_names)
query_filters.append(f"primaryDisplayNames: {names_json}")
query_fields.append("primaryDisplayName")
def _get_entity_details_batch(
self,
entity_ids: List[str],
options: Dict[str, Any],
) -> Dict[str, Any]:
"""Get detailed entity information for multiple entities."""
graphql_query = self._build_entity_details_query(
entity_ids=entity_ids,
include_risk_factors=True,
include_associations=options.get("include_associations", True),
include_incidents=options.get("include_incidents", True),
include_accounts=options.get("include_accounts", True),
)
response = self.client.command(
"api_preempt_proxy_post_graphql",
body={"query": graphql_query},
)
result = handle_api_response(
response,
operation="api_preempt_proxy_post_graphql",
error_message="Failed to get entity details",
default_result=None,
)
if self._is_error(result):
return result
# Extract entities from GraphQL response structure
data = response.get("body", {}).get("data", {})
entities = data.get("entities", {}).get("nodes", [])
return {"entities": entities, "entity_count": len(entities)}
def _get_entity_timelines_batch(
self, entity_ids: List[str], options: Dict[str, Any]
) -> Dict[str, Any]:
"""Get timeline analysis for multiple entities."""
timeline_results = []
for entity_id in entity_ids:
graphql_query = self._build_timeline_query(
entity_id=entity_id,
start_time=options.get("start_time"),
end_time=options.get("end_time"),
event_types=options.get("event_types"),
limit=options.get("limit", 50),
)
response = self.client.command(
"api_preempt_proxy_post_graphql",
body={"query": graphql_query},
)
result = handle_api_response(
response,
operation="api_preempt_proxy_post_graphql",
error_message=f"Failed to get timeline for entity '{entity_id}'",
default_result=None,
)
if self._is_error(result):
return result
# Extract timeline from GraphQL response structure
data = response.get("body", {}).get("data", {})
timeline_data = data.get("timeline", {})
timeline_results.append(
{
"entity_id": entity_id,
"timeline": timeline_data.get("nodes", []),
"page_info": timeline_data.get("pageInfo", {}),
}
)
return {"timelines": timeline_results, "entity_count": len(entity_ids)}
def _analyze_relationships_batch(
self,
entity_ids: List[str],
options: Dict[str, Any],
) -> Dict[str, Any]:
"""Analyze relationships for multiple entities."""
relationship_results = []
for entity_id in entity_ids:
# Handle FieldInfo objects - extract the actual value
relationship_depth = options.get("relationship_depth", 2)
if hasattr(relationship_depth, "default"):
relationship_depth = relationship_depth.default
graphql_query = self._build_relationship_analysis_query(
entity_id=entity_id,
relationship_depth=relationship_depth,
include_risk_context=options.get("include_risk_context", True),
limit=options.get("limit", 50),
)
response = self.client.command(
"api_preempt_proxy_post_graphql",
body={"query": graphql_query},
)
result = handle_api_response(
response,
operation="api_preempt_proxy_post_graphql",
error_message=f"Failed to analyze relationships for entity '{entity_id}'",
default_result=None,
)
if self._is_error(result):
return result
# Extract entities from GraphQL response structure
data = response.get("body", {}).get("data", {})
entities = data.get("entities", {}).get("nodes", [])
if entities:
entity_data = entities[0]
relationship_results.append(
{
"entity_id": entity_id,
"associations": entity_data.get("associations", []),
"relationship_count": len(entity_data.get("associations", [])),
}
)
else:
relationship_results.append(
{
"entity_id": entity_id,
"associations": [],
"relationship_count": 0,
}
)
return {"relationships": relationship_results, "entity_count": len(entity_ids)}
def _assess_risks_batch(
self,
entity_ids: List[str],
options: Dict[str, Any],
) -> Dict[str, Any]:
"""Perform risk assessment for multiple entities."""
graphql_query = self._build_risk_assessment_query(
entity_ids=entity_ids,
include_risk_factors=options.get("include_risk_factors", True),
)
response = self.client.command(
"api_preempt_proxy_post_graphql",
body={"query": graphql_query},
)
result = handle_api_response(
response,
operation="api_preempt_proxy_post_graphql",
error_message="Failed to assess risks",
default_result=None,
)
if self._is_error(result):
return result
# Extract entities from GraphQL response structure
data = response.get("body", {}).get("data", {})
entities = data.get("entities", {}).get("nodes", [])
risk_assessments = []
for entity in entities:
risk_assessments.append(
{
"entityId": entity.get("entityId"),
"primaryDisplayName": entity.get("primaryDisplayName"),
"riskScore": entity.get("riskScore", 0),
"riskScoreSeverity": entity.get("riskScoreSeverity", "LOW"),
"riskFactors": entity.get("riskFactors", []),
}
)
return {
"risk_assessments": risk_assessments,
"entity_count": len(risk_assessments),
}
def _synthesize_investigation_response(
self,
entity_ids: List[str],
investigation_results: Dict[str, Any],
metadata: Dict[str, Any],
) -> Dict[str, Any]:
"""Synthesize comprehensive investigation response from multiple API results."""
# Build investigation summary
investigation_summary = {
"entity_count": len(entity_ids),
"resolved_entity_ids": entity_ids,
"investigation_types": metadata.get("investigation_types", []),
"timestamp": datetime.utcnow().isoformat(),
"status": "completed",
}
# Add search criteria to summary
search_criteria = metadata.get("search_criteria", {})
if any(search_criteria.values()):
investigation_summary["search_criteria"] = search_criteria
# Start building comprehensive response
response = {
"investigation_summary": investigation_summary,
"entities": entity_ids,
}
# Add investigation results based on what was requested
for investigation_type, results in investigation_results.items():
response[investigation_type] = results
# Generate cross-investigation insights
insights = self._generate_investigation_insights(investigation_results, entity_ids)
if insights:
response["cross_investigation_insights"] = insights
return response
def _generate_investigation_insights(
self,
investigation_results: Dict[str, Any],
entity_ids: List[str],
) -> Dict[str, Any]:
"""Generate insights by analyzing results across different investigation types."""
insights = {}
# Timeline and relationship correlation
if (
"timeline_analysis" in investigation_results
and "relationship_analysis" in investigation_results
):
insights["activity_relationship_correlation"] = self._analyze_activity_relationships(
investigation_results["timeline_analysis"],
investigation_results["relationship_analysis"],
)
# Multi-entity patterns (if investigating multiple entities)
if len(entity_ids) > 1:
insights["multi_entity_patterns"] = self._analyze_multi_entity_patterns(
investigation_results, entity_ids
)
return insights
def _analyze_activity_relationships(
self,
timeline_analysis: Dict[str, Any],
relationship_analysis: Dict[str, Any],
) -> Dict[str, Any]:
"""Analyze correlation between timeline activities and entity relationships."""
correlation = {"related_entity_activities": [], "suspicious_patterns": []}
# This would involve complex analysis of timeline events and relationships
# For now, provide basic structure
timelines = timeline_analysis.get("timelines", [])
relationships = relationship_analysis.get("relationships", [])
correlation["timeline_count"] = len(timelines)
correlation["relationship_count"] = len(relationships)
return correlation
def _analyze_multi_entity_patterns(
self,
investigation_results: Dict[str, Any],
entity_ids: List[str],
) -> Dict[str, Any]:
"""Analyze patterns across multiple entities being investigated."""
patterns = {
"common_risk_factors": [],
"shared_relationships": [],
"coordinated_activities": [],
}
# Analyze common risk factors across entities
if "risk_assessment" in investigation_results:
risk_assessments = investigation_results["risk_assessment"].get("risk_assessments", [])
risk_factor_counts = {}
for assessment in risk_assessments:
for risk_factor in assessment.get("riskFactors", []):
risk_type = risk_factor.get("type")
if risk_type in risk_factor_counts:
risk_factor_counts[risk_type] += 1
else:
risk_factor_counts[risk_type] = 1
# Find common risk factors (present in multiple entities)
for risk_type, count in risk_factor_counts.items():
if count > 1:
patterns["common_risk_factors"].append(
{
"risk_type": risk_type,
"entity_count": count,
"percentage": round((count / len(entity_ids)) * 100, 1),
}
)
return patterns
```