This is page 1 of 4. Use http://codebase.md/crowdstrike/falcon-mcp?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.dev.example
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug.yaml
│ │ ├── config.yml
│ │ ├── feature-request.yaml
│ │ └── question.yaml
│ └── workflows
│ ├── docker-build-push.yml
│ ├── docker-build-test.yml
│ ├── markdown-lint.yml
│ ├── python-lint.yml
│ ├── python-test-e2e.yml
│ ├── python-test.yml
│ └── release.yml
├── .gitignore
├── .markdownlint.json
├── CHANGELOG.md
├── Dockerfile
├── docs
│ ├── CODE_OF_CONDUCT.md
│ ├── CONTRIBUTING.md
│ ├── deployment
│ │ ├── amazon_bedrock_agentcore.md
│ │ └── google_cloud.md
│ ├── e2e_testing.md
│ ├── module_development.md
│ ├── resource_development.md
│ └── SECURITY.md
├── examples
│ ├── adk
│ │ ├── adk_agent_operations.sh
│ │ ├── falcon_agent
│ │ │ ├── __init__.py
│ │ │ ├── agent.py
│ │ │ ├── env.properties
│ │ │ └── requirements.txt
│ │ └── README.md
│ ├── basic_usage.py
│ ├── mcp_config.json
│ ├── sse_usage.py
│ └── streamable_http_usage.py
├── falcon_mcp
│ ├── __init__.py
│ ├── client.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── api_scopes.py
│ │ ├── errors.py
│ │ ├── logging.py
│ │ └── utils.py
│ ├── modules
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── detections.py
│ │ ├── discover.py
│ │ ├── hosts.py
│ │ ├── idp.py
│ │ ├── incidents.py
│ │ ├── intel.py
│ │ ├── sensor_usage.py
│ │ ├── serverless.py
│ │ └── spotlight.py
│ ├── registry.py
│ ├── resources
│ │ ├── __init__.py
│ │ ├── cloud.py
│ │ ├── detections.py
│ │ ├── discover.py
│ │ ├── hosts.py
│ │ ├── incidents.py
│ │ ├── intel.py
│ │ ├── sensor_usage.py
│ │ ├── serverless.py
│ │ └── spotlight.py
│ └── server.py
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── generate_e2e_report.py
│ └── test_results_viewer.html
├── SUPPORT.md
├── tests
│ ├── __init__.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── test_api_scopes.py
│ │ ├── test_errors.py
│ │ ├── test_logging.py
│ │ └── test_utils.py
│ ├── conftest.py
│ ├── e2e
│ │ ├── __init__.py
│ │ ├── modules
│ │ │ ├── __init__.py
│ │ │ ├── test_cloud.py
│ │ │ ├── test_detections.py
│ │ │ ├── test_discover.py
│ │ │ ├── test_hosts.py
│ │ │ ├── test_idp.py
│ │ │ ├── test_incidents.py
│ │ │ ├── test_intel.py
│ │ │ ├── test_sensor_usage.py
│ │ │ ├── test_serverless.py
│ │ │ └── test_spotlight.py
│ │ └── utils
│ │ ├── __init__.py
│ │ └── base_e2e_test.py
│ ├── modules
│ │ ├── __init__.py
│ │ ├── test_base.py
│ │ ├── test_cloud.py
│ │ ├── test_detections.py
│ │ ├── test_discover.py
│ │ ├── test_hosts.py
│ │ ├── test_idp.py
│ │ ├── test_incidents.py
│ │ ├── test_intel.py
│ │ ├── test_sensor_usage.py
│ │ ├── test_serverless.py
│ │ ├── test_spotlight.py
│ │ └── utils
│ │ └── test_modules.py
│ ├── test_client.py
│ ├── test_registry.py
│ ├── test_server.py
│ └── test_streamable_http_transport.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.markdownlint.json:
--------------------------------------------------------------------------------
```json
{
"default": true,
"MD013": false,
"MD024": false,
"MD033": false,
"MD041": false
}
```
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
# Git files
.git
.gitignore
.gitattributes
# Documentation
*.md
docs/
README.md
# Python cache and bytecode
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.env
.venv/
venv/
ENV/
env/
# IDE and editor files
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
# Testing
.pytest_cache/
.coverage
.tox/
htmlcov/
.cache
# CI/CD
.github/
.gitlab-ci.yml
.travis.yml
azure-pipelines.yml
# Docker files (except what's needed)
Dockerfile*
docker-compose*
.dockerignore
# Logs
*.log
logs/
# Temporary files
*.tmp
*.temp
.tmp/
# OS files
Thumbs.db
ehthumbs.db
# Security sensitive files
*.pem
*.key
*.p12
*.pfx
.env.local
.env.production
secrets/
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# =============================================================================
# CrowdStrike Falcon API Credentials
# =============================================================================
# Required: Get these from your CrowdStrike console (Support > API Clients and Keys)
FALCON_CLIENT_ID=your-client-id
FALCON_CLIENT_SECRET=your-client-secret
# =============================================================================
# CrowdStrike Falcon API Base URL
# =============================================================================
# Required: Choose the correct region for your CrowdStrike instance
# US-1 (Default): https://api.crowdstrike.com
# US-2: https://api.us-2.crowdstrike.com
# EU-1: https://api.eu-1.crowdstrike.com
# US-GOV: https://api.laggar.gcw.crowdstrike.com
FALCON_BASE_URL=https://api.us-2.crowdstrike.com
# =============================================================================
# Optional: Server Configuration
# =============================================================================
# Modules to enable (comma-separated list)
# Options: detections,incidents,intel,hosts,spotlight,cloud,idp,sensorusage
# Default: all modules enabled if not specified
#FALCON_MCP_MODULES=detections,incidents,intel,hosts,spotlight,cloud,idp,sensorusage,serverless,discover
# Transport method to use
# Options: stdio, sse, streamable-http
# Default: stdio
#FALCON_MCP_TRANSPORT=stdio
# Enable debug logging
# Options: true, false
# Default: false
#FALCON_MCP_DEBUG=false
# Host for HTTP transports (sse, streamable-http)
# Default: 127.0.0.1
#FALCON_MCP_HOST=127.0.0.1
# Port for HTTP transports (sse, streamable-http)
# Default: 8000
#FALCON_MCP_PORT=8000
# User agent comment to include in API requests
# This will be added to the User-Agent header comment section
# Example: CustomApp/1.0
#FALCON_MCP_USER_AGENT_COMMENT=
```
--------------------------------------------------------------------------------
/.env.dev.example:
--------------------------------------------------------------------------------
```
# =============================================================================
# CrowdStrike Falcon API Credentials
# =============================================================================
# Required: Get these from your CrowdStrike console (Support > API Clients and Keys)
FALCON_CLIENT_ID=your-client-id
FALCON_CLIENT_SECRET=your-client-secret
# =============================================================================
# CrowdStrike Falcon API Base URL
# =============================================================================
# Required: Choose the correct region for your CrowdStrike instance
# US-1 (Default): https://api.crowdstrike.com
# US-2: https://api.us-2.crowdstrike.com
# EU-1: https://api.eu-1.crowdstrike.com
# US-GOV: https://api.laggar.gcw.crowdstrike.com
FALCON_BASE_URL=https://api.us-2.crowdstrike.com
# =============================================================================
# Optional: Server Configuration
# =============================================================================
# Modules to enable (comma-separated list)
# Options: detections,incidents,intel,hosts,spotlight,cloud,idp,sensorusage
# Default: all modules enabled if not specified
#FALCON_MCP_MODULES=detections,incidents,intel,hosts,spotlight,cloud,idp,sensorusage,serverless,discover
# Transport method to use
# Options: stdio, sse, streamable-http
# Default: stdio
#FALCON_MCP_TRANSPORT=stdio
# Enable debug logging
# Options: true, false
# Default: false
#FALCON_MCP_DEBUG=false
# Host for HTTP transports (sse, streamable-http)
# Default: 127.0.0.1
#FALCON_MCP_HOST=127.0.0.1
# Port for HTTP transports (sse, streamable-http)
# Default: 8000
#FALCON_MCP_PORT=8000
# User agent comment to include in API requests
# This will be added to the User-Agent header comment section
# Example: CustomApp/1.0
#FALCON_MCP_USER_AGENT_COMMENT=
# =============================================================================
# Development & E2E Testing Configuration
# =============================================================================
# Only needed if you plan to run end-to-end tests or contribute to development
# OpenAI API key for E2E testing (required for running E2E tests)
#OPENAI_API_KEY=your-openai-api-key
# Custom LLM API endpoint for testing (optional)
#OPENAI_BASE_URL=https://your-custom-llm-endpoint.com/v1
# Comma-separated list of models to test (optional)
#MODELS_TO_TEST=example-model-1,example-model-2
# Number of test runs per test case (optional)
#RUNS_PER_TEST=2
# Success threshold for E2E tests (optional)
#SUCCESS_TRESHOLD=0.7
MCP_USE_ANONYMIZED_TELEMETRY=false
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the enitre vscode folder
.vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# AI Assistants
# Various AI coding assistants create local cache, settings, and conversation history
# These contain user-specific data and should not be committed to version control
.cursorignore
.cursorindexingignore
.claude/
CLAUDE.md
.anthropic/
.openai/
.codeium/
.tabnine/
.github-copilot/
.roo/
.aider/
.aider*
.clinerules/
memory-bank/
# E2E artifacts
static_test_report.html
test_results.json
```
--------------------------------------------------------------------------------
/examples/adk/README.md:
--------------------------------------------------------------------------------
```markdown
# Running/Deploying with a prebuilt agent
This repository includes a prebuilt [Google ADK](https://google.github.io/adk-docs/) based agent integrated with the `falcon-mcp` server.
The goal is to provide customers an opinionated and validated set of instructions for running falcon-mcp and deploying it for their teams.
## Table of Contents
1. [Setting up and running locally (5 minutes)](#setting-up-and-running-locally-5-minutes)
2. [Deployment - Why Deploy?](#deployment---why-deploy)
3. [Deploying the agent to Cloud Run](#deploying-the-agent-to-cloud-run)
4. [Deploying to Vertex AI Agent Engine and registering on Agentspace](#deploying-to-vertex-ai-agent-engine-and-registering-on-agentspace)
5. [Securing access, Evaluating, Optimizing performance and costs](#securing-access-evaluating-optimizing-performance-and-costs)
### Setting up and running locally (5 minutes)
You can run the following commands locally on Linux / Mac or in Google Cloud Shell.
If you plan to deploy the agent, it is recommended to run in Google Cloud Shell.
```bash
git clone https://github.com/CrowdStrike/falcon-mcp.git
cd falcon-mcp
cd examples/adk
# create and activate python environment
python3 -m venv .venv
. .venv/bin/activate
# install depenencies
pip install -r falcon_agent/requirements.txt
chmod +x adk_agent_operations.sh
./adk_agent_operations.sh
```
The script will create `.env` file in `falcon_agent/` directory and prompt you to update it. At a minimum update the `General Agent Configuration` section.
<details>
<summary><b>Sample Output - Very First Run</b></summary>
```bash
./adk_agent_operations.sh
INFO: No operation mode provided and './falcon_agent/.env' is not found.
INFO: Attempting to copy template './falcon_agent/env.properties' to './falcon_agent/.env'.
SUCCESS: './falcon_agent/env.properties' copied to './falcon_agent/.env'.
ACTION REQUIRED: Please update the variables in './falcon_agent/.env' before running this script with an operation mode.
```
</details>
<br>
> [!NOTE]
> Make sure you get and update the GOOGLE_API_KEY using these [instructions](https://ai.google.dev/gemini-api/docs/api-key).
Now run the script with `local_run` parameter.
```bash
# local run
./adk_agent_operations.sh local_run
```
Here is the sample output
<details>
<summary><b>Sample Output - Local Run</b></summary>
```bash
./adk_agent_operations.sh local_run
INFO: Operation mode selected: 'local_run'.
--- Loading environment variables from './falcon_agent/.env' ---
--- Environment variables loaded. ---
--- Validating required environment variables for 'local_run' mode ---
INFO: Variable 'GOOGLE_GENAI_USE_VERTEXAI' is set and valid.
INFO: Variable 'GOOGLE_API_KEY' is set and valid.
INFO: Variable 'GOOGLE_MODEL' is set and valid.
INFO: Variable 'FALCON_CLIENT_ID' is set and valid.
INFO: Variable 'FALCON_CLIENT_SECRET' is set and valid.
INFO: Variable 'FALCON_BASE_URL' is set and valid.
INFO: Variable 'FALCON_AGENT_PROMPT' is set and valid.
--- All required environment variables are VALID. ---
INFO: Running ADK Agent for local development...
INFO: Started server process [20071]
INFO: Waiting for application startup.
+-----------------------------------------------------------------------------+
| ADK Web Server started |
| |
| For local testing, access at http://localhost:8000. |
+-----------------------------------------------------------------------------+
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
```
</details>
<br>
You can access the agent on http://localhost:8000 🚀
> If running in the Google Cloud Shell - please use the web preview with port 8000.
You can stop the agent with `ctrl+C`
### Deployment - Why Deploy?
You may want to deploy the agent (with the `falcon-mcp` server) for following reasons
1. You do not want to hand out credentials to everyone to run MCP server locally
2. You want to share the ready to use agent with your team
3. Use it for demos without any setup
You have two distinct paths to deployment:
1. Deploy on Cloud Run
2. Deploy on Vertex AI Agent Engine (and access through Agentspace after registration)
<br>
> [!NOTE]
> For all the following sections - If you are not running in Google Cloud Shell, make sure you have `gcloud` CLI [installed](https://cloud.google.com/sdk/docs/install) and you have authenticated with your username (preferably as owner of the project) on your local computer.
### Deploying the agent to Cloud Run
This section covers deployment to cloud run. Make sure you have all the required [APIs enabled](https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service#before-you-begin) on the GCP project.
```bash
cd examples/adk/
./adk_agent_operations.sh cloudrun_deploy
```
In the sample output below, note the lines marked with ➡️
1. You will have to provide input for `Allow unauthenticated invocations?` (say N)
2. Once deployment is completed you get a URL to access your agent.
<details>
<summary><b>Sample Output - Cloud Run Deloyment</b></summary>
```bash
INFO: Operation mode selected: 'cloudrun_deploy'.
--- Loading environment variables from './falcon_agent/.env' ---
--- Environment variables loaded. ---
--- Validating required environment variables for 'cloudrun_deploy' mode ---
INFO: Variable 'GOOGLE_GENAI_USE_VERTEXAI' is set and valid.
INFO: Variable 'GOOGLE_MODEL' is set and valid.
INFO: Variable 'FALCON_CLIENT_ID' is set and valid.
INFO: Variable 'FALCON_CLIENT_SECRET' is set and valid.
INFO: Variable 'FALCON_BASE_URL' is set and valid.
INFO: Variable 'FALCON_AGENT_PROMPT' is set and valid.
INFO: Variable 'PROJECT_ID' is set and valid.
INFO: Variable 'REGION' is set and valid.
--- All required environment variables are VALID. ---
INFO: Preparing for Cloud Run deployment...
INFO: Backing up './falcon_agent/.env' to './falcon_agent/.env.bak'.
INFO: Modifying './falcon_agent/.env': Deleting GOOGLE_API_KEY and setting GOOGLE_GENAI_USE_VERTEXAI=True.
INFO: Re-loading modified environment variables.
INFO: Deploying ADK Agent to Cloud Run...
Start generating Cloud Run source files in /tmp/cloud_run_deploy_src/20250801_071151
Copying agent source code...
Copying agent source code complete.
Creating Dockerfile...
Creating Dockerfile complete: /tmp/cloud_run_deploy_src/20250801_071151/Dockerfile
Deploying to Cloud Run...
➡️ Allow unauthenticated invocations to [falcon-agent-service] (y/N)? N
Building using Dockerfile and deploying container to Cloud Run service [falcon-agent-service] in project [crowdstrikexxxxxxx] region [us-central1]
⠛ Building and deploying new service... Uploading sources.
⠛ Uploading sources...
✓ Building and deploying new service... Done.
✓ Uploading sources...
✓ Building Container... Logs are available at [https://console.cloud.google.com/cloud-build/builds;region=us-central1/b1dbfe60-46fe-4cc1-ba6a-xxxx?project=xxxxx].
✓ Creating Revision...
✓ Routing traffic...
✓ Setting IAM Policy...
Done.
Service [falcon-agent-service] revision [falcon-agent-service-00001-abc] has been deployed and is serving 100 percent of traffic.
➡️ Service URL: https://falcon-agent-service-xxxxx.us-central1.run.app
INFO: Display format: "none"
Cleaning up the temp folder: /tmp/cloud_run_deploy_src/20250801_071151
SUCCESS: Cloud Run deployment completed successfully.
--- Operation 'cloudrun_deploy' complete. ---
INFO: Restoring .env file from backup: './falcon_agent/.env.bak'.
```
</details>
<br>
> [!NOTE]
> By default the service has IAM authentication enabled for it. Please follow steps below to enable access to yourself and your team.
1. Cloud Run - Services - select `falcon-agent-service`, by clicking the checkbox next to it.
2. At the top click `permissions`, a pane `Permissions for falcon-agent-service` should open on the right hand side.
3. Click `Add principal`
4. Add the users you want to provide access to and provide them `Cloud Run Invoker` role.
5. Wait for some time.
**Accessing the service**
1. Ask your users to run the following command (replace project id and region with the project id & region in which you have deployed the service)
```bash
gcloud run services proxy falcon-agent-service --project PROJECT-ID --region YOUR-REGION
```
<details>
<summary><b>Sample Output Accessing Cloud Run service through local proxy</b></summary>
```bash
# You might be asked to install a component, for the proxy to work locally
This command requires the `cloud-run-proxy` component to be installed. Would
you like to install the `cloud-run-proxy` component to continue command
execution? (Y/n)? Y
Proxying to Cloud Run service [falcon-agent-service] in project [crowdstrike-xxx-yyy] region [us-central1]
http://127.0.0.1:8080 proxies to https://falcon-agent-service-abc1234-uc.a.run.app
```
</details>
2. Now they can access the Cloud Run Service locally on `http://localhost:8080`
### Deploying to Vertex AI Agent Engine and registering on Agentspace
This section covers deployment to Vetex AI Agent Engine. To acces the agent and to consolidate all your agents under one umbrella you can also register the deployed agent to Agentspace.
1. Make sure that you create a bucket for staging the Agent Engine artifacts in the same project as the deployment (env variable - `AGENT_ENGINE_STAGING_BUCKET`).
```bash
cd examples/adk/
./adk_agent_operations.sh agent_engine_deploy
```
And here is the sample output.
Make sure you copy the Agent Engine Number from the output (marked by ➡️ for illustration)
<details>
<summary><b>Sample Output - Agent Engine Deployment</b></summary>
```bash
INFO: Operation mode selected: 'agent_engine_deploy'.
--- Loading environment variables from './falcon_agent/.env' ---
--- Environment variables loaded. ---
--- Validating required environment variables for 'agent_engine_deploy' mode ---
INFO: Variable 'GOOGLE_GENAI_USE_VERTEXAI' is set and valid.
INFO: Variable 'GOOGLE_MODEL' is set and valid.
INFO: Variable 'FALCON_CLIENT_ID' is set and valid.
INFO: Variable 'FALCON_CLIENT_SECRET' is set and valid.
INFO: Variable 'FALCON_BASE_URL' is set and valid.
INFO: Variable 'FALCON_AGENT_PROMPT' is set and valid.
INFO: Variable 'PROJECT_ID' is set and valid.
INFO: Variable 'REGION' is set and valid.
INFO: Variable 'AGENT_ENGINE_STAGING_BUCKET' is set and valid.
--- All required environment variables are VALID. ---
INFO: Preparing for Agent Engine deployment...
INFO: Backing up './falcon_agent/.env' to './falcon_agent/.env.bak'.
INFO: Modifying './falcon_agent/.env': Deleting GOOGLE_API_KEY and setting GOOGLE_GENAI_USE_VERTEXAI=True.
INFO: Re-loading modified environment variables.
INFO: Deploying ADK Agent to Agent Engine...
Copying agent source code...
Copying agent source code complete.
Initializing Vertex AI...
Resolving files and dependencies...
Reading environment variables from /tmp/agent_engine_deploy_src/20250801_103024/.env
Vertex AI initialized.
Created /tmp/agent_engine_deploy_src/20250801_103024/agent_engine_app.py
Files and dependencies resolved
Deploying to agent engine...
Reading requirements from requirements='/tmp/agent_engine_deploy_src/20250801_103024/requirements.txt'
Read the following lines: ['google-adk[eval]', 'falcon-mcp', 'google-cloud-aiplatform[agent_engines]', 'cloudpickle']
Identified the following requirements: {'google-cloud-aiplatform': '1.105.0', 'cloudpickle': '3.1.1', 'pydantic': '2.11.7'}
The following requirements are missing: {'pydantic'}
The following requirements are appended: {'pydantic==2.11.7'}
The final list of requirements: ['google-adk[eval]', 'falcon-mcp', 'google-cloud-aiplatform[agent_engines]', 'cloudpickle', 'pydantic==2.11.7']
Using bucket agent-engine-xxyyzz
Wrote to gs://agent-engine-xxyyzz/agent_engine/agent_engine.pkl
Writing to gs://agent-engine-xxyyzz/agent_engine/requirements.txt
Creating in-memory tarfile of extra_packages
Writing to gs://agent-engine-xxyyzz/agent_engine/dependencies.tar.gz
Creating AgentEngine
INFO:vertexai.agent_engines:Creating AgentEngine
Create AgentEngine backing LRO: projects/123456789101/locations/us-central1/reasoningEngines/3670952665795123456/operations/5379102769057612345
INFO:vertexai.agent_engines:Create AgentEngine backing LRO: projects/123456789101/locations/us-central1/reasoningEngines/3670952665795123456/operations/5379102769057612345
View progress and logs at https://console.cloud.google.com/logs/query?project=crowdstrike-xxxx-yyyy
INFO:vertexai.agent_engines:View progress and logs at https://console.cloud.google.com/logs/query?project=crowdstrike-xxxx-yyyy
➡️ AgentEngine created. Resource name: projects/123456789101/locations/us-central1/reasoningEngines/3670952665795123456
INFO:vertexai.agent_engines:AgentEngine created. Resource name: projects/123456789101/locations/us-central1/reasoningEngines/3670952665795123456
To use this AgentEngine in another session:
INFO:vertexai.agent_engines:To use this AgentEngine in another session:
agent_engine = vertexai.agent_engines.get('projects/123456789101/locations/us-central1/reasoningEngines/3670952665795123456')
INFO:vertexai.agent_engines:agent_engine = vertexai.agent_engines.get('projects/123456789101/locations/us-central1/reasoningEngines/3670952665795123456')
Cleaning up the temp folder: /tmp/agent_engine_deploy_src/20250801_103024
SUCCESS: Agent Engine deployment completed successfully.
--- Operation 'agent_engine_deploy' complete. ---
INFO: Restoring .env file from backup: './falcon_agent/.env.bak'.
```
</details>
<br>
Once the agent is deployed on Agent Engine, you can register it on Agentspace to work with an Agent Engine Application.
Make sure you have the Agent Engine Number from the previous step
1. Go to the Agentspace [page](https://console.cloud.google.com/gen-app-builder/engines) in Google Cloud Console.
2. Create an App (Type - Agentspace)
3. Note down the app details including the app name (e.g. google-security-agent-app_1750057151234)
4. Make sure that you have the Agent Space Admin role while performing the following actions
5. Enable Discovery Engine API for your project
6. Provide the following roles to the Discovery Engine Service Account
- Vertex AI viewer
- Vertex AI user
7. Please note that these roles need to be provided into the project housing your Agent Engine Agent. Also you need to enable the show Google provided role grants to access the Discovery Engine Service Account.
Update the environment variables `PROJECT_NUMBER`, `AGENT_LOCATION`, `REASONING_ENGINE_NUMBER` and `AGENT_SPACE_APP_NAME` in the `# Agentspace Specific` section.
Now to register the agent and make it available to your application use the following command.
```bash
cd examples/adk/
./adk_agent_operations.sh agentspace_register
```
<details>
<summary><b>Sample Output - Agentspace Registration</b></summary>
```bash
INFO: Operation mode selected: 'agentspace_register'.
--- Loading environment variables from './falcon_agent/.env' ---
--- Environment variables loaded. ---
--- Validating required environment variables for 'agentspace_register' mode ---
INFO: Variable 'GOOGLE_GENAI_USE_VERTEXAI' is set and valid.
INFO: Variable 'GOOGLE_MODEL' is set and valid.
INFO: Variable 'FALCON_CLIENT_ID' is set and valid.
INFO: Variable 'FALCON_CLIENT_SECRET' is set and valid.
INFO: Variable 'FALCON_BASE_URL' is set and valid.
INFO: Variable 'FALCON_AGENT_PROMPT' is set and valid.
INFO: Variable 'PROJECT_ID' is set and valid.
INFO: Variable 'REGION' is set and valid.
INFO: Variable 'PROJECT_NUMBER' is set and valid.
INFO: Variable 'AGENT_LOCATION' is set and valid.
INFO: Variable 'REASONING_ENGINE_NUMBER' is set and valid.
INFO: Variable 'AGENT_SPACE_APP_NAME' is set and valid.
--- All required environment variables are VALID. ---
INFO: Registering ADK Agent with AgentSpace...
INFO: Sending POST request to: https://discoveryengine.googleapis.com/v1alpha/projects/security-xyzabc-123456/locations/global/collections/default_collection/engines/google-security-agent-app_1750057112345/assistants/default_assistant/agents
DEBUG: Request Body :
{
"displayName": "Crowdstrike Falcon Agent",
"description": "Allows users interact with Crowdstrike Falcon backend",
"adk_agent_definition":
{
"tool_settings": {
"tool_description": "Crowdstrike Falcon tools"
},
"provisioned_reasoning_engine": {
"reasoning_engine":"projects/707099123456/locations/us-central1/reasoningEngines/5047646776881234567"
}
}
}
...
{
"name": "projects/707099123456/locations/global/collections/default_collection/engines/google-security-agent-app_1750057112345/assistants/default_assistant/agents/2662627860861234567",
"displayName": "Crowdstrike Falcon Agent",
"description": "Allows users interact with Crowdstrike Falcon backend",
"createTime": "2025-08-03T15:39:03.129318186Z",
"adkAgentDefinition": {
"toolSettings": {
"toolDescription": "Crowdstrike Falcon tools"
},
"provisionedReasoningEngine": {
"reasoningEngine": "projects/707099123456/locations/us-central1/reasoningEngines/5047646776881234567"
}
},
"state": "ENABLED"
}
SUCCESS: cURL command completed successfully for AgentSpace registration.
--- Operation 'agentspace_register' complete. ---
```
</details>
<br>
> You can find more about Agentspace registration [here](https://cloud.google.com/agentspace/agentspace-enterprise/docs/assistant#create-assistant-existing-app).
Now you can access the agent in the Agentspace application you created earlier.
In case you want to delete the agent from the Agentspace application, use the following set of commands (replace the variables as needed).
<details>
<summary><b>List and deregister the Agent</b></summary>
```bash
# List the agents for your application
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
-H "X-Goog-User-Project: $PROJECT_ID" \
"https://discoveryengine.googleapis.com/v1alpha/projects/$PROJECT_ID/locations/global/collections/default_collection/engines/$AGENT_ENGINE_APP_NAME/assistants/default_assistant/agents"
# note down the agent number (export as REASONING_ENGINE_NUMBER) and use that in the next command.
curl -X DELETE -H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
-H "X-Goog-User-Project: $PROJECT_ID" \
https://discoveryengine.googleapis.com/v1alpha/projects/$PROJECT_ID/locations/global/collections/default_collection/engines/$AGENT_ENGINE_APP_NAME/assistants/default_assistant/agents/$REASONING_ENGINE_NUMBER
```
</details>
### Securing access, Evaluating, Optimizing performance and costs
#### Securing access
1. For local runs make sure that you are not using a shared machine
2. For Cloud Run deployment you can use - [Control access on an individual service or job](https://cloud.google.com/run/docs/securing/managing-access#control-service-or-job-access) - that is the default behavior for this deployment.
3. For agent running in Agentspace - you can provide access (Predefined role - `Discovery Engine User`) selectively by navigating to Agentspace-Apps-Your App -Integration-Grant Permissions.
#### Evaluating
It is advised to evaluate the agent for the trajectory it takes and the output it produces - you can use [ADK documentation](https://google.github.io/adk-docs/evaluate/) to evaluate this agent. You can also test with different models.
#### Optimizing performance and costs
Various native performance improvements are already part of the codebase. You can further optimize the performance and reduce the LLM costs by controlling the value of the environment variable `MAX_PREV_USER_INTERACTIONS`. You can test how many previous conversations (instead of ALL conversations by default) work for your use case (recommended 5). You can also use the appropriate [Gemini Model](https://ai.google.dev/gemini-api/docs/models#model-variations) for both cost and performance optimizations.
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown

# falcon-mcp
[](https://badge.fury.io/py/falcon-mcp)
[](https://pypi.org/project/falcon-mcp/)
[](https://opensource.org/licenses/MIT)
**falcon-mcp** is a Model Context Protocol (MCP) server that connects AI agents with the CrowdStrike Falcon platform, powering intelligent security analysis in your agentic workflows. It delivers programmatic access to essential security capabilities—including detections, incidents, and behaviors—establishing the foundation for advanced security operations and automation.
> [!IMPORTANT]
> **🚧 Public Preview**: This project is currently in public preview and under active development. Features and functionality may change before the stable 1.0 release. While we encourage exploration and testing, please avoid production deployments. We welcome your feedback through [GitHub Issues](https://github.com/crowdstrike/falcon-mcp/issues) to help shape the final release.
## Table of Contents
- [API Credentials \& Required Scopes](#api-credentials--required-scopes)
- [Setting Up CrowdStrike API Credentials](#setting-up-crowdstrike-api-credentials)
- [Required API Scopes by Module](#required-api-scopes-by-module)
- [Available Modules, Tools \& Resources](#available-modules-tools--resources)
- [Cloud Security Module](#cloud-security-module)
- [Core Functionality (Built into Server)](#core-functionality-built-into-server)
- [Detections Module](#detections-module)
- [Discover Module](#discover-module)
- [Hosts Module](#hosts-module)
- [Identity Protection Module](#identity-protection-module)
- [Incidents Module](#incidents-module)
- [Intel Module](#intel-module)
- [Sensor Usage Module](#sensor-usage-module)
- [Serverless Module](#serverless-module)
- [Spotlight Module](#spotlight-module)
- [Installation \& Setup](#installation--setup)
- [Prerequisites](#prerequisites)
- [Environment Configuration](#environment-configuration)
- [Installation](#installation)
- [Usage](#usage)
- [Command Line](#command-line)
- [Module Configuration](#module-configuration)
- [Additional Command Line Options](#additional-command-line-options)
- [As a Library](#as-a-library)
- [Running Examples](#running-examples)
- [Container Usage](#container-usage)
- [Using Pre-built Image (Recommended)](#using-pre-built-image-recommended)
- [Building Locally (Development)](#building-locally-development)
- [Editor/Assistant Integration](#editorassistant-integration)
- [Using `uvx` (recommended)](#using-uvx-recommended)
- [With Module Selection](#with-module-selection)
- [Using Individual Environment Variables](#using-individual-environment-variables)
- [Docker Version](#docker-version)
- [Additional Deployment Options](#additional-deployment-options)
- [Amazon Bedrock AgentCore](#amazon-bedrock-agentcore)
- [Google Cloud (Cloud Run and Vertex AI)](#google-cloud-cloud-run-and-vertex-ai)
- [Contributing](#contributing)
- [Getting Started for Contributors](#getting-started-for-contributors)
- [Running Tests](#running-tests)
- [Developer Documentation](#developer-documentation)
- [License](#license)
- [Support](#support)
## API Credentials & Required Scopes
### Setting Up CrowdStrike API Credentials
Before using the Falcon MCP Server, you need to create API credentials in your CrowdStrike console:
1. **Log into your CrowdStrike console**
2. **Navigate to Support > API Clients and Keys**
3. **Click "Add new API client"**
4. **Configure your API client**:
- **Client Name**: Choose a descriptive name (e.g., "Falcon MCP Server")
- **Description**: Optional description for your records
- **API Scopes**: Select the scopes based on which modules you plan to use (see below)
> **Important**: Ensure your API client has the necessary scopes for the modules you plan to use. You can always update scopes later in the CrowdStrike console.
### Required API Scopes by Module
The Falcon MCP Server supports different modules, each requiring specific API scopes:
| Module | Required API Scopes | Purpose |
|-|-|-|
| **Cloud Security** | `Falcon Container Image:read` | Find and analyze kubernetes containers inventory and container imges vulnerabilities |
| **Core** | _No additional scopes_ | Basic connectivity and system information |
| **Detections** | `Alerts:read` | Find and analyze detections to understand malicious activity |
| **Discover** | `Assets:read` | Search and analyze application inventory across your environment |
| **Hosts** | `Hosts:read` | Manage and query host/device information |
| **Identity Protection** | `Identity Protection Entities:read`<br>`Identity Protection Timeline:read`<br>`Identity Protection Detections:read`<br>`Identity Protection Assessment:read`<br>`Identity Protection GraphQL:write` | Comprehensive entity investigation and identity protection analysis |
| **Incidents** | `Incidents:read` | Analyze security incidents and coordinated activities |
| **Intel** | `Actors (Falcon Intelligence):read`<br>`Indicators (Falcon Intelligence):read`<br>`Reports (Falcon Intelligence):read` | Research threat actors, IOCs, and intelligence reports |
| **Sensor Usage** | `Sensor Usage:read` | Access and analyze sensor usage data |
| **Serverless** | `Falcon Container Image:read` | Search for vulnerabilities in serverless functions across cloud service providers |
| **Spotlight** | `Vulnerabilities:read` | Manage and analyze vulnerability data and security assessments |
## Available Modules, Tools & Resources
> [!IMPORTANT]
> ⚠️ **Important Note on FQL Guide Resources**: Several modules include FQL (Falcon Query Language) guide resources that provide comprehensive query documentation and examples. While these resources are designed to assist AI assistants and users with query construction, **FQL has nuanced syntax requirements and field-specific behaviors** that may not be immediately apparent. AI-generated FQL filters should be **tested and validated** before use in production environments. We recommend starting with simple queries and gradually building complexity while verifying results in a test environment first.
**About Tools & Resources**: This server provides both tools (actions you can perform) and resources (documentation and context). Tools execute operations like searching for detections or analyzing threats, while resources provide comprehensive documentation like FQL query guides that AI assistants can reference for context without requiring tool calls.
### Cloud Security Module
**API Scopes Required**:
- `Falcon Container Image:read`
Provides tools for accessing and analyzing CrowdStrike Cloud Security resources:
- `falcon_search_kubernetes_containers`: Search for containers from CrowdStrike Kubernetes & Containers inventory
- `falcon_count_kubernetes_containers`: Count for containers by filter criteria from CrowdStrike Kubernetes & Containers inventory
- `falcon_search_images_vulnerabilities`: Search for images vulnerabilities from CrowdStrike Image Assessments
**Resources**:
- `falcon://cloud/kubernetes-containers/fql-guide`: Comprehensive FQL documentation and examples for kubernetes containers searches
- `falcon://cloud/images-vulnerabilities/fql-guide`: Comprehensive FQL documentation and examples for images vulnerabilities searches
**Use Cases**: Manage kubernetes containers inventory, container images vulnerabilities analysis
### Core Functionality (Built into Server)
**API Scopes**: _None required beyond basic API access_
The server provides core tools for interacting with the Falcon API:
- `falcon_check_connectivity`: Check connectivity to the Falcon API
- `falcon_list_enabled_modules`: Lists enabled modules in the falcon-mcp server
> These modules are determined by the `--modules` [flag](#module-configuration) when starting the server. If no modules are specified, all available modules are enabled.
- `falcon_list_modules`: Lists all available modules in the falcon-mcp server
### Detections Module
**API Scopes Required**: `Alerts:read`
Provides tools for accessing and analyzing CrowdStrike Falcon detections:
- `falcon_search_detections`: Find and analyze detections to understand malicious activity in your environment
- `falcon_get_detection_details`: Get comprehensive detection details for specific detection IDs to understand security threats
**Resources**:
- `falcon://detections/search/fql-guide`: Comprehensive FQL documentation and examples for detection searches
**Use Cases**: Threat hunting, security analysis, incident response, malware investigation
### Discover Module
**API Scopes Required**: `Assets:read`
Provides tools for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets:
- `falcon_search_applications`: Search for applications in your CrowdStrike environment
- `falcon_search_unmanaged_assets`: Search for unmanaged assets (systems without Falcon sensor installed) that have been discovered by managed systems
**Resources**:
- `falcon://discover/applications/fql-guide`: Comprehensive FQL documentation and examples for application searches
- `falcon://discover/hosts/fql-guide`: Comprehensive FQL documentation and examples for unmanaged assets searches
**Use Cases**: Application inventory management, software asset management, license compliance, vulnerability assessment, unmanaged asset discovery, security gap analysis
### Hosts Module
**API Scopes Required**: `Hosts:read`
Provides tools for accessing and managing CrowdStrike Falcon hosts/devices:
- `falcon_search_hosts`: Search for hosts in your CrowdStrike environment
- `falcon_get_host_details`: Retrieve detailed information for specified host device IDs
**Resources**:
- `falcon://hosts/search/fql-guide`: Comprehensive FQL documentation and examples for host searches
**Use Cases**: Asset management, device inventory, host monitoring, compliance reporting
### Identity Protection Module
**API Scopes Required**: `Identity Protection Entities:read`, `Identity Protection Timeline:read`, `Identity Protection Detections:read`, `Identity Protection Assessment:read`, `Identity Protection GraphQL:write`
Provides tools for accessing and managing CrowdStrike Falcon Identity Protection capabilities:
- `idp_investigate_entity`: Entity investigation tool for analyzing users, endpoints, and other entities with support for timeline analysis, relationship mapping, and risk assessment
**Use Cases**: Entity investigation, identity protection analysis, user behavior analysis, endpoint security assessment, relationship mapping, risk assessment
### Incidents Module
**API Scopes Required**: `Incidents:read`
Provides tools for accessing and analyzing CrowdStrike Falcon incidents:
- `falcon_show_crowd_score`: View calculated CrowdScores and security posture metrics for your environment
- `falcon_search_incidents`: Find and analyze security incidents to understand coordinated activity in your environment
- `falcon_get_incident_details`: Get comprehensive incident details to understand attack patterns and coordinated activities
- `falcon_search_behaviors`: Find and analyze behaviors to understand suspicious activity in your environment
- `falcon_get_behavior_details`: Get detailed behavior information to understand attack techniques and tactics
**Resources**:
- `falcon://incidents/crowd-score/fql-guide`: Comprehensive FQL documentation for CrowdScore queries
- `falcon://incidents/search/fql-guide`: Comprehensive FQL documentation and examples for incident searches
- `falcon://incidents/behaviors/fql-guide`: Comprehensive FQL documentation and examples for behavior searches
**Use Cases**: Incident management, threat assessment, attack pattern analysis, security posture monitoring
### Intel Module
**API Scopes Required**:
- `Actors (Falcon Intelligence):read`
- `Indicators (Falcon Intelligence):read`
- `Reports (Falcon Intelligence):read`
Provides tools for accessing and analyzing CrowdStrike Intelligence:
- `falcon_search_actors`: Research threat actors and adversary groups tracked by CrowdStrike intelligence
- `falcon_search_indicators`: Search for threat indicators and indicators of compromise (IOCs) from CrowdStrike intelligence
- `falcon_search_reports`: Access CrowdStrike intelligence publications and threat reports
**Resources**:
- `falcon://intel/actors/fql-guide`: Comprehensive FQL documentation and examples for threat actor searches
- `falcon://intel/indicators/fql-guide`: Comprehensive FQL documentation and examples for indicator searches
- `falcon://intel/reports/fql-guide`: Comprehensive FQL documentation and examples for intelligence report searches
**Use Cases**: Threat intelligence research, adversary tracking, IOC analysis, threat landscape assessment
### Sensor Usage Module
**API Scopes Required**: `Sensor Usage:read`
Provides tools for accessing and analyzing CrowdStrike Falcon sensor usage data:
- `falcon_search_sensor_usage`: Search for weekly sensor usage data in your CrowdStrike environment
**Resources**:
- `falcon://sensor-usage/weekly/fql-guide`: Comprehensive FQL documentation and examples for sensor usage searches
**Use Cases**: Sensor deployment monitoring, license utilization analysis, sensor health tracking
### Serverless Module
**API Scopes Required**: `Falcon Container Image:read`
Provides tools for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities:
- `falcon_search_serverless_vulnerabilities`: Search for vulnerabilities in your serverless functions across all cloud service providers
**Resources**:
- `falcon://serverless/vulnerabilities/fql-guide`: Comprehensive FQL documentation and examples for serverless vulnerabilities searches
**Use Cases**: Serverless security assessment, vulnerability management, cloud security monitoring
### Spotlight Module
**API Scopes Required**: `Vulnerabilities:read`
Provides tools for accessing and managing CrowdStrike Spotlight vulnerabilities:
- `falcon_search_vulnerabilities`: Search for vulnerabilities in your CrowdStrike environment
**Resources**:
- `falcon://spotlight/vulnerabilities/fql-guide`: Comprehensive FQL documentation and examples for vulnerability searches
**Use Cases**: Vulnerability management, security assessments, compliance reporting, risk analysis, patch prioritization
## Installation & Setup
### Prerequisites
- Python 3.11 or higher
- [`uv`](https://docs.astral.sh/uv/) or pip
- CrowdStrike Falcon API credentials (see above)
### Environment Configuration
You can configure your CrowdStrike API credentials in several ways:
#### Use a `.env` File
If you prefer using a `.env` file, you have several options:
##### Option 1: Copy from cloned repository (if you've cloned it)
```bash
cp .env.example .env
```
##### Option 2: Download the example file from GitHub
```bash
curl -o .env https://raw.githubusercontent.com/CrowdStrike/falcon-mcp/main/.env.example
```
##### Option 3: Create manually with the following content
```bash
# Required Configuration
FALCON_CLIENT_ID=your-client-id
FALCON_CLIENT_SECRET=your-client-secret
FALCON_BASE_URL=https://api.crowdstrike.com
# Optional Configuration (uncomment and modify as needed)
#FALCON_MCP_MODULES=detections,incidents,intel
#FALCON_MCP_TRANSPORT=stdio
#FALCON_MCP_DEBUG=false
#FALCON_MCP_HOST=127.0.0.1
#FALCON_MCP_PORT=8000
```
#### Environment Variables
Alternatively, you can use environment variables directly.
Set the following environment variables in your shell:
```bash
# Required Configuration
export FALCON_CLIENT_ID="your-client-id"
export FALCON_CLIENT_SECRET="your-client-secret"
export FALCON_BASE_URL="https://api.crowdstrike.com"
# Optional Configuration
export FALCON_MCP_MODULES="detections,incidents,intel" # Comma-separated list (default: all modules)
export FALCON_MCP_TRANSPORT="stdio" # Transport method: stdio, sse, streamable-http
export FALCON_MCP_DEBUG="false" # Enable debug logging: true, false
export FALCON_MCP_HOST="127.0.0.1" # Host for HTTP transports
export FALCON_MCP_PORT="8000" # Port for HTTP transports
```
**CrowdStrike API Region URLs:**
- **US-1 (Default)**: `https://api.crowdstrike.com`
- **US-2**: `https://api.us-2.crowdstrike.com`
- **EU-1**: `https://api.eu-1.crowdstrike.com`
- **US-GOV**: `https://api.laggar.gcw.crowdstrike.com`
### Installation
> [!NOTE]
> If you just want to interact with falcon-mcp via an agent chat interface rather than running the server itself, take a look at [Additional Deployment Options](#additional-deployment-options). Otherwise continue to the installations steps below.
#### Install using uv
```bash
uv tool install falcon-mcp
```
#### Install using pip
```bash
pip install falcon-mcp
```
> [!TIP]
> If `falcon-mcp` isn't found, update your shell PATH.
For installation via code editors/assistants, see the [Editor/Assitant](#editorassistant-integration) section below
## Usage
### Command Line
Run the server with default settings (stdio transport):
```bash
falcon-mcp
```
Run with SSE transport:
```bash
falcon-mcp --transport sse
```
Run with streamable-http transport:
```bash
falcon-mcp --transport streamable-http
```
Run with streamable-http transport on custom port:
```bash
falcon-mcp --transport streamable-http --host 0.0.0.0 --port 8080
```
### Module Configuration
The Falcon MCP Server supports multiple ways to specify which modules to enable:
#### 1. Command Line Arguments (highest priority)
Specify modules using comma-separated lists:
```bash
# Enable specific modules
falcon-mcp --modules detections,incidents,intel,spotlight,idp
# Enable only one module
falcon-mcp --modules detections
```
#### 2. Environment Variable (fallback)
Set the `FALCON_MCP_MODULES` environment variable:
```bash
# Export environment variable
export FALCON_MCP_MODULES=detections,incidents,intel,spotlight,idp
falcon-mcp
# Or set inline
FALCON_MCP_MODULES=detections,incidents,intel,spotlight,idp falcon-mcp
```
#### 3. Default Behavior (all modules)
If no modules are specified via command line or environment variable, all available modules are enabled by default.
**Module Priority Order:**
1. Command line `--modules` argument (overrides all)
2. `FALCON_MCP_MODULES` environment variable (fallback)
3. All modules (default when none specified)
### Additional Command Line Options
For all available options:
```bash
falcon-mcp --help
```
### As a Library
```python
from falcon_mcp.server import FalconMCPServer
# Create and run the server
server = FalconMCPServer(
base_url="https://api.us-2.crowdstrike.com", # Optional, defaults to env var
debug=True, # Optional, enable debug logging
enabled_modules=["detections", "incidents", "spotlight", "idp"] # Optional, defaults to all modules
)
# Run with stdio transport (default)
server.run()
# Or run with SSE transport
server.run("sse")
# Or run with streamable-http transport
server.run("streamable-http")
# Or run with streamable-http transport on custom host/port
server.run("streamable-http", host="0.0.0.0", port=8080)
```
### Running Examples
```bash
# Run with stdio transport
python examples/basic_usage.py
# Run with SSE transport
python examples/sse_usage.py
# Run with streamable-http transport
python examples/streamable_http_usage.py
```
## Container Usage
The Falcon MCP Server is available as a pre-built container image for easy deployment:
### Using Pre-built Image (Recommended)
```bash
# Pull the latest pre-built image
docker pull quay.io/crowdstrike/falcon-mcp:latest
# Run with .env file (recommended)
docker run -i --rm --env-file /path/to/.env quay.io/crowdstrike/falcon-mcp:latest
# Run with .env file and SSE transport
docker run --rm -p 8000:8000 --env-file /path/to/.env \
quay.io/crowdstrike/falcon-mcp:latest --transport sse --host 0.0.0.0
# Run with .env file and streamable-http transport
docker run --rm -p 8000:8000 --env-file /path/to/.env \
quay.io/crowdstrike/falcon-mcp:latest --transport streamable-http --host 0.0.0.0
# Run with .env file and custom port
docker run --rm -p 8080:8080 --env-file /path/to/.env \
quay.io/crowdstrike/falcon-mcp:latest --transport streamable-http --host 0.0.0.0 --port 8080
# Run with .env file and specific modules (stdio transport - requires -i flag)
docker run -i --rm --env-file /path/to/.env \
quay.io/crowdstrike/falcon-mcp:latest --modules detections,incidents,spotlight,idp
# Use a specific version instead of latest (stdio transport - requires -i flag)
docker run -i --rm --env-file /path/to/.env \
quay.io/crowdstrike/falcon-mcp:1.2.3
# Alternative: Individual environment variables (stdio transport - requires -i flag)
docker run -i --rm -e FALCON_CLIENT_ID=your_client_id -e FALCON_CLIENT_SECRET=your_secret \
quay.io/crowdstrike/falcon-mcp:latest
```
### Building Locally (Development)
For development or customization purposes, you can build the image locally:
```bash
# Build the Docker image
docker build -t falcon-mcp .
# Run the locally built image
docker run --rm -e FALCON_CLIENT_ID=your_client_id -e FALCON_CLIENT_SECRET=your_secret falcon-mcp
```
> [!NOTE]
> When using HTTP transports in Docker, always set `--host 0.0.0.0` to allow external connections to the container.
## Editor/Assistant Integration
You can integrate the Falcon MCP server with your editor or AI assistant. Here are configuration examples for popular MCP clients:
### Using `uvx` (recommended)
```json
{
"mcpServers": {
"falcon-mcp": {
"command": "uvx",
"args": [
"--env-file",
"/path/to/.env",
"falcon-mcp"
]
}
}
}
```
### With Module Selection
```json
{
"mcpServers": {
"falcon-mcp": {
"command": "uvx",
"args": [
"--env-file",
"/path/to/.env",
"falcon-mcp",
"--modules",
"detections,incidents,intel"
]
}
}
}
```
### Using Individual Environment Variables
```json
{
"mcpServers": {
"falcon-mcp": {
"command": "uvx",
"args": ["falcon-mcp"],
"env": {
"FALCON_CLIENT_ID": "your-client-id",
"FALCON_CLIENT_SECRET": "your-client-secret",
"FALCON_BASE_URL": "https://api.crowdstrike.com"
}
}
}
}
```
### Docker Version
```json
{
"mcpServers": {
"falcon-mcp-docker": {
"command": "docker",
"args": [
"run",
"-i",
"--rm",
"--env-file",
"/full/path/to/.env",
"quay.io/crowdstrike/falcon-mcp:latest"
]
}
}
}
```
> [!NOTE]
> The `-i` flag is required when using the default stdio transport.
## Additional Deployment Options
### Amazon Bedrock AgentCore
To deploy the MCP Server as a tool in Amazon Bedrock AgentCore, please refer to the [following document](./docs/deployment/amazon_bedrock_agentcore.md).
### Google Cloud (Cloud Run and Vertex AI)
To deploy the MCP server as an agent within Cloud Run or Vertex AI Agent Engine (including for registration within Agentspace), refer to the [Google ADK example](./examples/adk/README.md).
## Contributing
### Getting Started for Contributors
1. Clone the repository:
```bash
git clone https://github.com/CrowdStrike/falcon-mcp.git
cd falcon-mcp
```
2. Install in development mode:
```bash
# Create .venv and install dependencies
uv sync --all-extras
# Activate the venv
source .venv/bin/activate
```
> [!IMPORTANT]
> This project uses [Conventional Commits](https://www.conventionalcommits.org/) for automated releases and semantic versioning. Please follow the commit message format outlined in our [Contributing Guide](docs/CONTRIBUTING.md) when submitting changes.
### Running Tests
```bash
# Run all tests
pytest
# Run end-to-end tests
pytest --run-e2e tests/e2e/
# Run end-to-end tests with verbose output (note: -s is required to see output)
pytest --run-e2e -v -s tests/e2e/
```
> **Note**: The `-s` flag is required to see detailed output from E2E tests.
### Developer Documentation
- [Module Development Guide](docs/module_development.md): Instructions for implementing new modules
- [Resource Development Guide](docs/resource_development.md): Instructions for implementing resources
- [End-to-End Testing Guide](docs/e2e_testing.md): Guide for running and understanding E2E tests
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Support
This is a community-driven, open source project. While it is not an official CrowdStroke product, it is actively maintained by CrowdStrike and supported in collaboration with the open source developer community.
For more information, please see our [SUPPORT](SUPPORT.md) file.
```
--------------------------------------------------------------------------------
/docs/SECURITY.md:
--------------------------------------------------------------------------------
```markdown
# Security
This document outlines the security policy and procedures for projects under the CrowdStrike organization.
## Supported Versions
For each project, we aim to release security vulnerability patches for the most recent version at an accelerated cadence. Please refer to the specific project repository for details on supported versions.
## Reporting a Potential Security Vulnerability
We encourage the reporting of security-related vulnerabilities. To report a suspected vulnerability in any CrowdStrike project, please use one of the following methods:
+ Submitting an __issue__ to the relevant project repository.
+ Submitting a __pull request__ with a potential fix to the relevant project repository.
+ Sending an email to [email protected]__.
## Disclosure and Mitigation Process
Upon receiving a security bug report, the issue will be triaged and assigned to a project maintainer. The maintainer will coordinate the fix and release process, typically involving:
+ Initial communication with the reporter to acknowledge receipt and provide status updates.
+ Confirmation of the issue and determination of affected versions.
+ Codebase audit to identify similar potential vulnerabilities.
+ Preparation of patches for all supported versions.
+ Patches will be submitted through pull requests, flagged as security fixes.
+ After merging and successful post-merge testing, patches will be released accordingly.
## Comments and Suggestions
We welcome suggestions for improving this process. Please share your ideas by creating an issue in the relevant project repository.
```
--------------------------------------------------------------------------------
/docs/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------
```markdown
# CrowdStrike Community Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Each project has one or more project maintainers. These individuals are
responsible for enforcing the Code of Conduct within a given project.
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the project maintainers at any time. Concerns can also be escalated
directly to community leaders at:
* [[email protected]](mailto:[email protected])
* [Ethics and Compliance Hotline](https://crowdstrike.ethicspoint.com/)
All complaints will be reviewed and investigated promptly and fairly.
Community leaders are obligated to respect the privacy and security of the
reporter of any incident.
### Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
#### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
#### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
#### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
#### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the Contributor Covenant homepage,
version 2.0, available at
[https://www.contributor-covenant.org/version/2/0/code_of_conduct.html](https://www.contributor-covenant.org/version/2/0/code_of_conduct.html).
Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq](https://www.contributor-covenant.org/faq). Translations are available at
[https://www.contributor-covenant.org/translations](https://www.contributor-covenant.org/translations).
## Improving the Code of Conduct
Suggestions welcome on how to improve this code of conduct!
* Have a suggestion or idea to discuss? Open a discussion
at [https://github.com/CrowdStrike/community/discussions](https://github.com/CrowdStrike/community/discussions)!
* Want to submit a pull request with recommended changes? Submit a PR
against [https://github.com/CrowdStrike/community/blob/main/docs/pages/code-of-conduct.md](https://github.com/CrowdStrike/community/blob/main/docs/pages/code-of-conduct.md).
```
--------------------------------------------------------------------------------
/docs/CONTRIBUTING.md:
--------------------------------------------------------------------------------
```markdown
# Welcome
Welcome and thank you for your interest in contributing to a CrowdStrike project! We recognize contributing to a project is no small feat! The guidance here aspires to help onboard new community members into how CrowdStrike-led projects tend to operate, and by extension, make the contribution process easier.
## How do I make a contribution?
Never made an open source contribution before? Wondering how contributions work in CrowdStrike projects? Here is a quick rundown!
1. Find an issue that you are interested in addressing, or a feature you would like to add. These are often documented in the project repositories themselves, frequently in the `issues` section.
1. Fork the repository associated with project to your GitHub account. This means that you will have a copy of the repository under *your-GitHub-username/repository-name*.
Guidance on how to fork a repository can be found at [https://docs.github.com/en/github/getting-started-with-github/fork-a-repo#fork-an-example-repository](https://docs.github.com/en/github/getting-started-with-github/fork-a-repo#fork-an-example-repository).
1. Clone the repository to your local machine using ``git clone https://github.com/github-username/repository-name.git``.
GitHub provides documentation on this process, including screenshots, here:
[https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/cloning-a-repository#about-cloning-a-repository](https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/cloning-a-repository#about-cloning-a-repository)
1. Create a new branch for your changes. This ensures your modifications can be uniquely identified and can help prevent rebasing and history problems. A local development branch can be created by running a command similar to:
``git checkout -b BRANCH-NAME-HERE``
1. Make the appropriate changes for the issue you are trying to address or the feature you would like to add.
1. Follow [this guide](https://google.github.io/styleguide/pyguide.html#docstrings) for docstrings.
1. Run [`ruff`](https://docs.astral.sh/ruff/) to format your code and check for linting issues. This helps maintain consistent code style across the project.
``ruff check . --select I``
``ruff check .``
1. Add the file contents of the changed files to the "snapshot" git uses to manage the state of the project (also known as the index). Here is the git command that will add your changes:
``git add insert-paths-of-changed-files-here``
1. **Use conventional commits** to store the contents of the index with a descriptive, standardized message. This project uses [Conventional Commits](https://www.conventionalcommits.org/) to enable automated release workflows and maintain proper semantic versioning.
**Conventional Commit Format:**
```text
<type>[optional scope]: <description>
[optional body]
[optional footer(s)]
```
**Common Types Used in This Project:**
- `feat:` - A new feature (triggers minor version bump)
- `fix:` - A bug fix (triggers patch version bump)
- `docs:` - Documentation only changes
- `refactor:` - Code changes that neither fix bugs nor add features
- `test:` - Adding missing tests or correcting existing tests
- `chore:` - Changes to build process, auxiliary tools, or maintenance
**Examples with Good Scoping (Recommended):**
```bash
# Module changes with specific scopes (preferred)
git commit -m "feat(modules/cloud): add list kubernetes clusters tool"
git commit -m "feat(modules/hosts): add list devices tool"
git commit -m "fix(modules/detections): resolve authentication error"
# Resource changes
git commit -m "refactor(resources): reword FQL guide in cloud resource"
git commit -m "feat(resources): add FQL guide for hosts module"
# Documentation changes with scope
git commit -m "docs(contributing): update conventional commits guidance"
git commit -m "docs(modules): enhance module development guide"
# Infrastructure changes
git commit -m "feat(ci): add automated testing workflow"
git commit -m "chore(docker): update container configurations"
```
**How Scoped Commits Improve Changelogs:**
The above commits would generate organized changelog entries like:
```markdown
# Features
- modules/cloud: add list kubernetes clusters tool
- modules/hosts: add list devices tool
- resources: add FQL guide for hosts module
- ci: add automated testing workflow
# Bug Fixes
- modules/detections: resolve authentication error
# Refactors
- resources: reword FQL guide in cloud resource
# Documentation
- contributing: update conventional commits guidance
- modules: enhance module development guide
```
**Basic Examples (Less Preferred but Acceptable):**
```bash
# General examples without specific scopes
git commit -m "feat: add new functionality"
git commit -m "fix: resolve issue in application"
git commit -m "docs: update documentation"
```
**Breaking Changes:**
For breaking changes, add `!` after the type or include `BREAKING CHANGE:` in the footer:
```bash
git commit -m "feat!: change API authentication method"
# or
git commit -m "feat: update authentication system
BREAKING CHANGE: API key format has changed"
```
**Why Conventional Commits?**
- **Automated Releases**: Enables automatic semantic version bumps and changelog generation
- **Clear History**: Makes it easy to understand what type of changes were made
- **Consistent Format**: Standardizes commit messages across all contributors
For more details, see the [Conventional Commits specification](https://www.conventionalcommits.org/).
1. Push your local changes back to your account on github.com:
``git push origin BRANCH-NAME-HERE``
1. Submit a pull request to the upstream project. Documentation on this process, including screen shots, can be found at [https://docs.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request-from-a-fork](https://docs.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request-from-a-fork)
1. Once submitted, a maintainer will review your pull request. They may ask for additional changes, or clarification, so keep an eye out for communication! GitHub automatically sends an email to your email address whenever someone comments on your pull request.
1. While not all pull requests may be merged, celebrate your contribution whether or not your pull request is merged! All changes move the project forward, and we thank you for helping the community!
### Rebase Early, Rebase Often
Projects tend to move at a fast pace, which means your fork may become behind upstream. Keeping your local fork in sync with upstream is called `rebasing`. This ensures your local copy is frequently refreshed with the latest changes from the community.
Frequenty rebasing is *strongly* encouraged. If your local copy falls to far behind, you may encounter merge conflicts when submitting pull request. If this happens, you will have to triage (often by hand!) the differences in your local repository versus the changes upstream.
- Documentation on how to sync/rebase your fork can be found at [https://docs.github.com/en/github/collaborating-with-issues-and-pull-requests/syncing-a-fork](https://docs.github.com/en/github/collaborating-with-issues-and-pull-requests/syncing-a-fork)
- For handling merge conflicts, refer to [https://opensource.com/article/20/4/git-merge-conflict](https://opensource.com/article/20/4/git-merge-conflict)
## Where can I go for help?
### Submitting a Ticket
General questions relating a project should be opened in that projects repository. Examples would be troubleshooting errors, submitting bug reports, or asking a general question/request for clarification.
If your question is of the broader CrowdStrike community, please [open a community discussion](https://github.com/CrowdStrike/community/discussions/new).
### Submitting a New Project Idea
If you do not see a project, repository, or would like the community to consider working on a specific piece of technology, please [open a community ticket](https://github.com/CrowdStrike/community/issues/new).
## What does the Code of Conduct mean for me?
Our community Code of Conduct helps us establish community norms and how they'll be enforced. Community members are expected to treat each other with respect and courtesy regardless of their identity.
CrowdStrike open source project maintainers are responsible for enforcing the CrowdStrike Code of Conduct within the project, issues may be raised directly to the maintainer should the need arise.
### Escalation Path
If you do not feel your concern has been addressed, if you are unable to communicate your concern with project maintainers, or if you feel the situation warrants, please escalate to:
- [[email protected]](mailto:[email protected])
- [Ethics and Compliance Hotline](https://crowdstrike.ethicspoint.com/)
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/tests/e2e/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/tests/e2e/utils/__init__.py:
--------------------------------------------------------------------------------
```python
"""E2E test utils."""
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/config.yml:
--------------------------------------------------------------------------------
```yaml
blank_issues_enabled: false
```
--------------------------------------------------------------------------------
/examples/adk/falcon_agent/__init__.py:
--------------------------------------------------------------------------------
```python
from . import agent as agent
```
--------------------------------------------------------------------------------
/tests/e2e/modules/__init__.py:
--------------------------------------------------------------------------------
```python
"""E2E tests for modules."""
```
--------------------------------------------------------------------------------
/falcon_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Falcon MCP Server package
"""
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Test package for Falcon MCP Server
"""
```
--------------------------------------------------------------------------------
/tests/common/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Common tests package for Falcon MCP Server
"""
```
--------------------------------------------------------------------------------
/tests/modules/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Module tests package for Falcon MCP Server
"""
```
--------------------------------------------------------------------------------
/falcon_mcp/common/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Common utilities package for Falcon MCP Server
"""
```
--------------------------------------------------------------------------------
/examples/adk/falcon_agent/requirements.txt:
--------------------------------------------------------------------------------
```
google-adk[eval]==1.8.0
falcon-mcp
google-cloud-aiplatform[agent_engines]==1.105.0
cloudpickle==3.1.1
pydantic==2.11.7
```
--------------------------------------------------------------------------------
/docs/deployment/google_cloud.md:
--------------------------------------------------------------------------------
```markdown
# Deploying to Google Cloud (Cloud Run, Vertex AI Agent Engine, and Agentspace)
Refer to the [Google ADK example](../../examples/adk/README.md).
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/__init__.py:
--------------------------------------------------------------------------------
```python
# ruff: noqa: F401
"""
Modules package for Falcon MCP Server
Modules are automatically discovered by the registry system via dynamic import scanning.
No manual imports are required - the registry uses pkgutil.iter_modules() and
importlib.import_module() to find and load all *Module classes at runtime.
"""
```
--------------------------------------------------------------------------------
/examples/mcp_config.json:
--------------------------------------------------------------------------------
```json
{
"servers": [
{
"name": "falcon-stdio",
"transport": {
"type": "stdio",
"command": "python -m falcon_mcp.server"
}
},
{
"name": "falcon-sse",
"transport": {
"type": "sse",
"url": "http://127.0.0.1:8000/sse"
}
},
{
"name": "falcon-streamable-http",
"transport": {
"type": "streamable-http",
"url": "http://127.0.0.1:8000/mcp"
}
}
]
}
```
--------------------------------------------------------------------------------
/.github/workflows/markdown-lint.yml:
--------------------------------------------------------------------------------
```yaml
name: Markdown Lint
on:
push:
branches: [ main ]
paths:
- '**.md'
pull_request:
branches: [ main ]
paths:
- '**.md'
permissions:
contents: read
jobs:
markdown-lint:
runs-on: ubuntu-latest
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Lint Markdown files
uses: DavidAnson/markdownlint-cli2-action@992badcdf24e3b8eb7e87ff9287fe931bcb00c6e
with:
config: '.markdownlint.json'
globs: |
README.md
docs/**/*.md
```
--------------------------------------------------------------------------------
/examples/basic_usage.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Basic usage example for Falcon MCP Server.
This script demonstrates how to initialize and run the Falcon MCP server.
"""
import os
from dotenv import load_dotenv
from falcon_mcp.server import FalconMCPServer
def main():
"""Run the Falcon MCP server with default settings."""
# Load environment variables from .env file
load_dotenv()
# Create and run the server with stdio transport
server = FalconMCPServer(
# You can override the base URL if needed
# base_url="https://api.us-2.crowdstrike.com",
debug=os.environ.get("DEBUG", "").lower() == "true",
)
# Run the server with stdio transport (default)
server.run()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/examples/sse_usage.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
SSE transport example for Falcon MCP Server.
This script demonstrates how to initialize and run the Falcon MCP server with SSE transport.
"""
import os
from dotenv import load_dotenv
from falcon_mcp.server import FalconMCPServer
def main():
"""Run the Falcon MCP server with SSE transport."""
# Load environment variables from .env file
load_dotenv()
# Create and run the server with SSE transport
server = FalconMCPServer(
# You can override the base URL if needed
# base_url="https://api.us-2.crowdstrike.com",
debug=os.environ.get("DEBUG", "").lower() == "true",
)
# Run the server with SSE transport
server.run("sse")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/examples/adk/falcon_agent/env.properties:
--------------------------------------------------------------------------------
```
# General Agent Configuration
GOOGLE_GENAI_USE_VERTEXAI=False
GOOGLE_API_KEY=NOT_SET
GOOGLE_MODEL=NOT_SET
FALCON_CLIENT_ID=NOT_SET
FALCON_CLIENT_SECRET=NOT_SET
FALCON_BASE_URL=NOT_SET
# Should be single line and only use single quotes.
FALCON_AGENT_PROMPT=NOT_SET
# Cloud Run Specific
PROJECT_ID=NOT_SET
REGION=NOT_SET
# Agent Engine Specific - Should be using format - gs://your-agent-engine-staging-bucket
AGENT_ENGINE_STAGING_BUCKET=NOT_SET
# Agentspace Specific
PROJECT_NUMBER=NOT_SET
# only 'global' location supported
AGENT_LOCATION=NOT_SET
REASONING_ENGINE_NUMBER=NOT_SET
AGENT_SPACE_APP_NAME=NOT_SET
# Other variables
# default -1 means send all user conversations to the LLM
# recommended value - 5. Please check the documentation for more.
MAX_PREV_USER_INTERACTIONS=-1
```
--------------------------------------------------------------------------------
/SUPPORT.md:
--------------------------------------------------------------------------------
```markdown
# Support
This is a community-driven, open source project. While it is not an official CrowdStrike product, it is actively maintained by CrowdStrike and supported in collaboration with the open source developer community.
## How to Request Help
Support for this project is primarily provided through this GitHub repository. When you submit issues publicly, you contribute to our shared knowledge base, improve self-service options for others, and often receive faster solutions.
To request assistance, please open a GitHub Issue. This is the appropriate channel for questions, bug reports, feature requests, enhancement suggestions, and documentation updates.
For CrowdStrike customers who prefer direct engagement, you may alternatively contact CrowdStrike Technical Support through your established support channels.
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""
Pytest configuration file for the tests.
"""
import pytest
def pytest_addoption(parser):
"""
Add the --run-e2e option to pytest.
"""
parser.addoption(
"--run-e2e",
action="store_true",
default=False,
help="run e2e tests",
)
def pytest_configure(config):
"""
Register the e2e marker.
"""
config.addinivalue_line("markers", "e2e: mark test as e2e to run")
def pytest_collection_modifyitems(config, items):
"""
Skip e2e tests if --run-e2e is not given.
"""
if config.getoption("--run-e2e"):
return
skip_e2e = pytest.mark.skip(reason="need --run-e2e option to run")
for item in items:
if "e2e" in item.keywords:
item.add_marker(skip_e2e)
@pytest.fixture
def verbosity_level(request):
"""Return the verbosity level from pytest config."""
return request.config.option.verbose
```
--------------------------------------------------------------------------------
/.github/dependabot.yml:
--------------------------------------------------------------------------------
```yaml
version: 2
updates:
# Python dependencies configuration (uv ecosystem)
- package-ecosystem: "uv"
directory: "/"
schedule:
interval: "daily"
commit-message:
prefix: "deps(python):"
labels:
- "dependencies"
- "python"
# Backup configuration (pip ecosystem)
# - package-ecosystem: "pip"
# directory: "/"
# schedule:
# interval: "daily"
# commit-message:
# prefix: "deps(python):"
# labels:
# - "dependencies"
# - "python"
# GitHub Actions configuration
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
commit-message:
prefix: "deps(actions):"
labels:
- "dependencies"
- "github-actions"
# Docker configuration
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "weekly"
commit-message:
prefix: "deps(docker):"
labels:
- "dependencies"
- "docker"
```
--------------------------------------------------------------------------------
/.github/workflows/python-lint.yml:
--------------------------------------------------------------------------------
```yaml
name: Python Lint
on:
push:
branches: [main]
pull_request:
branches: [main]
permissions:
contents: read
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Set up Python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065
with:
python-version: "3.11"
cache: "pip"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@39eb6c9dde236bbc368681611e63120a6eb4afac
with:
version: "latest"
activate-environment: true
- name: Install dependencies
run: |
uv sync --extra dev
- name: Lint imports
run: |
ruff check . --select I
- name: Lint with Ruff
run: |
ruff check .
```
--------------------------------------------------------------------------------
/.github/workflows/python-test.yml:
--------------------------------------------------------------------------------
```yaml
name: Python Tests
on:
push:
branches: [main]
pull_request:
branches: [main]
permissions:
contents: read
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@39eb6c9dde236bbc368681611e63120a6eb4afac
with:
version: "latest"
activate-environment: true
- name: Install dependencies
run: |
uv sync --extra dev
- name: Test with pytest
run: |
pytest
```
--------------------------------------------------------------------------------
/falcon_mcp/common/logging.py:
--------------------------------------------------------------------------------
```python
"""
Logging configuration for Falcon MCP Server
This module provides logging utilities for the Falcon MCP server.
"""
import logging
import sys
from typing import Optional
def configure_logging(debug: bool = False, name: str = "falcon_mcp") -> logging.Logger:
"""Configure logging for the Falcon MCP server.
Args:
debug: Enable debug logging
name: Logger name
Returns:
logging.Logger: Configured logger
"""
log_level = logging.DEBUG if debug else logging.INFO
# Configure root logger
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# Set third-party loggers to a higher level to reduce noise
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
# Get and return the logger for this application
logger = logging.getLogger(name)
logger.setLevel(log_level)
return logger
def get_logger(name: Optional[str] = None) -> logging.Logger:
"""Get a logger with the specified name.
Args:
name: Logger name (defaults to "falcon_mcp")
Returns:
logging.Logger: Logger instance
"""
logger_name = name if name else "falcon_mcp"
return logging.getLogger(logger_name)
```
--------------------------------------------------------------------------------
/examples/streamable_http_usage.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Streamable HTTP transport example for Falcon MCP Server.
This script demonstrates how to initialize and run the Falcon MCP server
with streamable-http transport for custom integrations and web-based deployments.
"""
import os
from dotenv import load_dotenv
from falcon_mcp.server import FalconMCPServer
def main():
"""Run the Falcon MCP server with streamable-http transport."""
# Load environment variables from .env file
load_dotenv()
# Create and run the server with streamable-http transport
server = FalconMCPServer(
# You can override the base URL if needed
# base_url="https://api.us-2.crowdstrike.com",
debug=os.environ.get("DEBUG", "").lower() == "true",
)
# Example 1: Run with default settings (port 8000, localhost)
print("Example 1: Default streamable-http configuration")
print(" - Host: 127.0.0.1 (localhost only)")
print(" - Port: 8000")
print(" - Path: /mcp")
print(" - URL: http://127.0.0.1:8000/mcp")
print()
# Uncomment to run with defaults:
# server.run("streamable-http")
# Example 2: Custom configuration
print("Example 2: Custom configuration")
print(" - Host: 0.0.0.0 (external access)")
print(" - Port: 8080")
print(" - URL: http://0.0.0.0:8080/mcp")
print()
# Run with custom requirements
server.run("streamable-http", host="0.0.0.0", port=8080)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "falcon-mcp"
version = "0.3.0"
description = "CrowdStrike Falcon MCP Server"
readme = "README.md"
requires-python = ">=3.11"
license = {text = "MIT"}
authors = [
{name = "CrowdStrike", email = "[email protected]"}
]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
]
dependencies = [
"crowdstrike-falconpy>=1.3.0",
"mcp>=1.12.1,<2.0.0",
"python-dotenv>=1.1.1",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"mypy>=1.0.0",
"langchain-openai>=0.3.28",
"mcp-use[search]>=1.3.7",
"ruff>=0.12.5",
"black>=23.0.0",
]
[project.scripts]
falcon-mcp = "falcon_mcp.server:main"
[tool.black]
line-length = 100
target-version = ["py311"]
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
[tool.ruff]
target-version = "py311"
line-length = 100
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
filterwarnings = [
"ignore::DeprecationWarning:websockets.*:",
"ignore::DeprecationWarning:uvicorn.protocols.websockets.*:",
"ignore::pydantic.PydanticDeprecatedSince20:langchain_core.*:"
]
```
--------------------------------------------------------------------------------
/tests/common/test_api_scopes.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the API scope utilities.
"""
import unittest
from falcon_mcp.common.api_scopes import API_SCOPE_REQUIREMENTS, get_required_scopes
class TestApiScopes(unittest.TestCase):
"""Test cases for the API scope utilities."""
def test_api_scope_requirements_structure(self):
"""Test API_SCOPE_REQUIREMENTS dictionary structure."""
# Verify it's a dictionary
self.assertIsInstance(API_SCOPE_REQUIREMENTS, dict)
# Verify it has entries
self.assertGreater(len(API_SCOPE_REQUIREMENTS), 0)
# Verify structure of entries (keys are strings, values are lists of strings)
for operation, scopes in API_SCOPE_REQUIREMENTS.items():
self.assertIsInstance(operation, str)
self.assertIsInstance(scopes, list)
for scope in scopes:
self.assertIsInstance(scope, str)
def test_get_required_scopes(self):
"""Test get_required_scopes function."""
# Test with known operations
self.assertEqual(get_required_scopes("GetQueriesAlertsV2"), ["Alerts:read"])
self.assertEqual(get_required_scopes("PostEntitiesAlertsV2"), ["Alerts:read"])
self.assertEqual(get_required_scopes("QueryIncidents"), ["Incidents:read"])
# Test with unknown operation
self.assertEqual(get_required_scopes("UnknownOperation"), [])
# Test with empty string
self.assertEqual(get_required_scopes(""), [])
# Test with None (should handle gracefully)
self.assertEqual(get_required_scopes(None), [])
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/.github/workflows/python-test-e2e.yml:
--------------------------------------------------------------------------------
```yaml
name: Manual E2E Tests
on:
workflow_dispatch:
inputs:
models:
description: "Models to test"
required: false
default: "gpt-4.1-mini,gpt-4o-mini"
type: string
permissions:
contents: read
actions: write
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Set up Python 3.12
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065
with:
python-version: "3.12"
cache: "pip"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@39eb6c9dde236bbc368681611e63120a6eb4afac
with:
version: "latest"
activate-environment: true
- name: Install dependencies
run: |
uv sync --extra dev
- name: Test with pytest
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
MODELS_TO_TEST: ${{ inputs.models }}
MCP_USE_ANONYMIZED_TELEMETRY: false
run: |
pytest --run-e2e
- name: Generate HTML report
if: always()
run: |
python scripts/generate_e2e_report.py
- name: Upload HTML report artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02
if: always()
with:
name: e2e-test-report
path: ./static_test_report.html
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Use a Python image with uv pre-installed
# ghcr.io/astral-sh/uv:python3.13-alpine (multi-arch: amd64, arm64)
FROM ghcr.io/astral-sh/uv@sha256:3ce89663b5309e77087de25ca805c49988f2716cdb2c6469b1dec2764f58b141 AS uv
# Install the project into `/app`
WORKDIR /app
# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1
# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy
# Generate proper TOML lockfile first
RUN --mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv lock
# Install the project's dependencies using the lockfile
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
--mount=type=bind,source=uv.lock,target=uv.lock \
uv sync --frozen --no-install-project --no-dev --no-editable
# Then, add the rest of the project source code and install it
ADD . /app
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=uv.lock,target=uv.lock \
uv sync --frozen --no-dev --no-editable
# Remove unnecessary files from the virtual environment before copying
RUN find /app/.venv -name '__pycache__' -type d -exec rm -rf {} + && \
find /app/.venv -name '*.pyc' -delete && \
find /app/.venv -name '*.pyo' -delete && \
echo "Cleaned up .venv"
# Final stage
# python:3.13-alpine (multi-arch: amd64, arm64)
FROM python@sha256:9ba6d8cbebf0fb6546ae71f2a1c14f6ffd2fdab83af7fa5669734ef30ad48844
# Create a non-root user 'app'
RUN adduser -D -h /home/app -s /bin/sh app
WORKDIR /app
USER app
COPY --from=uv --chown=app:app /app/.venv /app/.venv
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
ENTRYPOINT ["falcon-mcp"]
```
--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------
```yaml
name: release-please
on:
push:
branches: [main]
permissions:
contents: write
pull-requests: write
issues: write
jobs:
release-please:
runs-on: ubuntu-latest
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- uses: google-github-actions/release-please-action@db8f2c60ee802b3748b512940dde88eabd7b7e01 # v3.7.13
id: release-please
with:
release-type: python
package-name: falcon-mcp
pull-request-header: ':rocket: New Release Incoming! :rocket:'
changelog-types: '[{"type":"feat","section":"Features","hidden":false},{"type":"fix","section":"Bug Fixes","hidden":false},{"type":"refactor","section":"Refactoring","hidden":false},{"type":"chore","section":"Miscellaneous","hidden":true}]'
# Add any extra files that contain version references
# extra-files: |
# README.md
- name: Checkout
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
if: ${{ steps.release-please.outputs.release_created }}
- name: Set up Python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065
with:
python-version: '3.11'
if: ${{ steps.release-please.outputs.release_created }}
- name: Build and publish to PyPI
run: |
pip install uv
uv pip install --system build twine
python -m build
python -m twine upload dist/* --username __token__ --password ${{ secrets.PYPI_API_TOKEN }}
if: ${{ steps.release-please.outputs.release_created }}
```
--------------------------------------------------------------------------------
/.github/workflows/docker-build-test.yml:
--------------------------------------------------------------------------------
```yaml
name: Docker Build Test
on:
pull_request:
branches: [ main ]
paths:
- 'Dockerfile'
- 'pyproject.toml'
- 'uv.lock'
- 'falcon_mcp/**'
- '.github/workflows/docker-build-test.yml'
permissions:
contents: read
jobs:
docker-build-test:
runs-on: ubuntu-latest
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- name: Checkout code
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435
- name: Build multi-platform Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83
with:
context: .
platforms: linux/amd64,linux/arm64
push: false
cache-from: type=gha
cache-to: type=gha,mode=max
tags: |
falcon-mcp:test
falcon-mcp:pr-${{ github.event.pull_request.number }}
- name: Test Docker image (amd64)
run: |
# Build for local testing (amd64 only for running tests)
docker buildx build \
--platform linux/amd64 \
--load \
--tag falcon-mcp:test-local \
.
# Test basic functionality - should show help without errors
echo "Testing falcon-mcp --help..."
docker run --rm falcon-mcp:test-local --help
# Test version command | TBD: Add version check
#echo "Testing falcon-mcp --version..."
#docker run --rm falcon-mcp:test-local --version || true
echo "✅ Docker image smoke tests passed!"
```
--------------------------------------------------------------------------------
/falcon_mcp/registry.py:
--------------------------------------------------------------------------------
```python
"""
Module registry for Falcon MCP Server
This module provides a registry of available modules for the Falcon MCP server.
"""
import importlib
import os
import pkgutil
from typing import Dict, List, Type
from falcon_mcp.common.logging import get_logger
logger = get_logger(__name__)
# Forward reference for type hints
# Using string to avoid circular import
MODULE_TYPE = "BaseModule" # type: ignore
# This will be populated by the discovery process
AVAILABLE_MODULES: Dict[str, Type[MODULE_TYPE]] = {}
def discover_modules():
"""Discover available modules by scanning the modules directory."""
# Get the path to the modules directory
current_dir = os.path.dirname(__file__)
modules_path = os.path.join(current_dir, "modules")
# Scan for module files
for _, name, is_pkg in pkgutil.iter_modules([modules_path]):
if not is_pkg and name != "base": # Skip base.py and packages
# Import the module
module = importlib.import_module(f"falcon_mcp.modules.{name}")
# Look for *Module classes
for attr_name in dir(module):
if attr_name.endswith("Module") and attr_name != "BaseModule":
# Get the class
module_class = getattr(module, attr_name)
# Register it
module_name = attr_name.lower().replace("module", "")
AVAILABLE_MODULES[module_name] = module_class
logger.debug("Discovered module: %s", module_name)
def get_available_modules() -> Dict[str, Type[MODULE_TYPE]]:
"""Get available modules dict, discovering if needed (lazy loading).
Returns:
Dict mapping module names to module classes
"""
if not AVAILABLE_MODULES:
logger.debug("No modules discovered yet, performing lazy discovery")
discover_modules()
return AVAILABLE_MODULES
def get_module_names() -> List[str]:
"""Get the names of all registered modules, discovering if needed (lazy loading).
Returns:
List of module names
"""
return list(get_available_modules().keys())
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/question.yaml:
--------------------------------------------------------------------------------
```yaml
name: ❓ Question or Discussion
description: Ask questions or start discussions about falcon-mcp
title: "[Question]: "
labels: ["question"]
body:
- type: markdown
attributes:
value: |
Thank you for your interest in falcon-mcp! 🚀
We're here to help answer your questions.
- type: checkboxes
id: checks
attributes:
label: Initial Checks
description: Just making sure you've checked the basics first.
options:
- label: I searched existing issues and discussions
required: true
- label: I checked the README and documentation
required: true
- type: dropdown
id: question_type
attributes:
label: Question Type
description: What kind of help do you need?
options:
- Installation and setup
- Configuration and authentication
- Module usage and capabilities
- Integration with MCP clients
- Performance and troubleshooting
- Development and contributing
- API scopes and permissions
- Best practices
- Other
validations:
required: true
- type: textarea
id: question
attributes:
label: Your Question
description: |
What would you like to know?
Please be as specific as possible so we can give you the best help. 🙏
validations:
required: true
- type: textarea
id: context
attributes:
label: Context (Optional)
description: |
Any additional context that might help us understand your situation?
Your use case, setup, or what you're trying to achieve.
placeholder: |
I'm trying to...
My setup is...
I've already tried...
- type: textarea
id: environment
attributes:
label: Environment (Optional)
description: |
If relevant, please share basic environment information.
**Please don't include API credentials or sensitive information!**
placeholder: |
- Installation method: pip/uvx/docker
- Python version: 3.11
- Operating System: macOS/Windows/Linux
- MCP Client: Claude Desktop/Cline/etc.
render: text
```
--------------------------------------------------------------------------------
/tests/modules/utils/test_modules.py:
--------------------------------------------------------------------------------
```python
import unittest
from unittest.mock import MagicMock
from mcp.server import FastMCP
from falcon_mcp.client import FalconClient
class TestModules(unittest.TestCase):
def setup_module(self, module_class):
"""
Set up test fixtures with the specified module class.
Args:
module_class: The module class to instantiate
"""
# Create a mock client
self.mock_client = MagicMock(spec=FalconClient)
# Create a mock server
self.mock_server = MagicMock(spec=FastMCP)
# Create the module
self.module = module_class(self.mock_client)
def assert_tools_registered(self, expected_tools):
"""
Helper method to verify that a module correctly registers its tools.
Args:
expected_tools: List of tool names that should be registered
"""
# Call register_tools
self.module.register_tools(self.mock_server)
# Verify that add_tool was called for each tool
self.assertEqual(self.mock_server.add_tool.call_count, len(expected_tools))
# Get the tool names that were registered
registered_tools = [
call.kwargs["name"] for call in self.mock_server.add_tool.call_args_list
]
# Verify that all expected tools were registered
for tool in expected_tools:
self.assertIn(tool, registered_tools)
def assert_resources_registered(self, expected_resources):
"""
Helper method to verify that a module correctly registers its resources.
Args:
expected_tools: List of resources names that should be registered
"""
# Call register_tools
self.module.register_resources(self.mock_server)
# Verify that add_resource was called for each resource
self.assertEqual(
self.mock_server.add_resource.call_count, len(expected_resources)
)
# Get the tool names that were registered
registered_resources = [
call.kwargs["resource"].name
for call in self.mock_server.add_resource.call_args_list
]
# Verify that all expected tools were registered
for tool in expected_resources:
self.assertIn(tool, registered_resources)
```
--------------------------------------------------------------------------------
/falcon_mcp/common/api_scopes.py:
--------------------------------------------------------------------------------
```python
"""
API scope definitions and utilities for Falcon MCP Server
This module provides API scope definitions and related utilities for the Falcon MCP server.
"""
from typing import List, Optional
from .logging import get_logger
logger = get_logger(__name__)
# Map of API operations to required scopes
# This can be expanded as more modules and operations are added
API_SCOPE_REQUIREMENTS = {
# Alerts operations (migrated from detections)
"GetQueriesAlertsV2": ["Alerts:read"],
"PostEntitiesAlertsV2": ["Alerts:read"],
# Hosts operations
"QueryDevicesByFilter": ["Hosts:read"],
"PostDeviceDetailsV2": ["Hosts:read"],
# Incidents operations
"QueryIncidents": ["Incidents:read"],
"GetIncidentDetails": ["Incidents:read"],
"CrowdScore": ["Incidents:read"],
"GetIncidents": ["Incidents:read"],
"GetBehaviors": ["Incidents:read"],
"QueryBehaviors": ["Incidents:read"],
# Intel operations
"QueryIntelActorEntities": ["Actors (Falcon Intelligence):read"],
"QueryIntelIndicatorEntities": ["Indicators (Falcon Intelligence):read"],
"QueryIntelReportEntities": ["Reports (Falcon Intelligence):read"],
# Spotlight operations
"combinedQueryVulnerabilities": ["Vulnerabilities:read"],
# Discover operations
"combined_applications": ["Assets:read"],
"combined_hosts": ["Assets:read"],
# Cloud operations
"ReadContainerCombined": ["Falcon Container Image:read"],
"ReadContainerCount": ["Falcon Container Image:read"],
"ReadCombinedVulnerabilities": ["Falcon Container Image:read"],
# Identity Protection operations
"api_preempt_proxy_post_graphql": [
"Identity Protection Entities:read",
"Identity Protection Timeline:read",
"Identity Protection Detections:read",
"Identity Protection Assessment:read",
"Identity Protection GraphQL:write",
],
# Sensor Usage operations
"GetSensorUsageWeekly": ["Sensor Usage:read"],
# Serverless operations
"GetCombinedVulnerabilitiesSARIF": ["Falcon Container Image:read"],
# Add more mappings as needed
}
def get_required_scopes(operation: Optional[str]) -> List[str]:
"""Get the required API scopes for a specific operation.
Args:
operation: The API operation name
Returns:
List[str]: List of required API scopes
"""
return API_SCOPE_REQUIREMENTS.get(operation, [])
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature-request.yaml:
--------------------------------------------------------------------------------
```yaml
name: 🚀 Feature Request
description: Suggest a new feature for falcon-mcp
title: "[Feature Request]: "
labels: ["enhancement"]
body:
- type: markdown
attributes:
value: |
Thank you for contributing to falcon-mcp! 🚀
Your ideas help make this project better for everyone.
- type: textarea
id: description
attributes:
label: Feature Description
description: |
Please describe the feature you'd like to see added.
Be as detailed as possible about what you want and why it would be valuable. 🙏
validations:
required: true
- type: textarea
id: use_case
attributes:
label: Use Case
description: |
What problem would this feature solve? How would you use it?
Real-world examples help us understand the value and priority.
placeholder: |
As a security analyst, I want to...
This would help me...
Currently I have to...
validations:
required: true
- type: dropdown
id: module_area
attributes:
label: Related Module/Area
description: Which part of falcon-mcp would this feature affect?
options:
- Core functionality
- Detections module
- Incidents module
- Intel module
- Hosts module
- Identity Protection module
- Cloud Security module
- Discover module
- Spotlight (vulnerabilities) module
- Sensor Usage module
- Serverless module
- New module needed
- Documentation/Examples
- Not sure
validations:
required: true
- type: textarea
id: proposed_solution
attributes:
label: Proposed Solution (Optional)
description: |
Do you have ideas for how this could be implemented?
Code examples, API endpoints, or similar features in other tools are helpful!
placeholder: |
Maybe something like:
```python
# Example of how it might work
```
- type: textarea
id: alternatives
attributes:
label: Alternatives Considered (Optional)
description: |
Are there other ways to solve this problem?
Have you tried any workarounds or alternative approaches?
- type: textarea
id: additional
attributes:
label: Additional Context (Optional)
description: |
Anything else that might help us understand your request?
Links to documentation, similar features, or related issues are helpful!
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug.yaml:
--------------------------------------------------------------------------------
```yaml
name: 🐛 Bug Report
description: Report a bug or unexpected behavior in falcon-mcp
title: "[Bug]: "
labels: ["bug", "triage"]
body:
- type: markdown
attributes:
value: |
Thank you for reporting an issue to falcon-mcp! 🚀
- type: checkboxes
id: checks
attributes:
label: Initial Checks
description: Just making sure you're using the latest version and searched existing issues.
options:
- label: I confirm that I'm using the latest version of falcon-mcp
required: true
- label: I searched existing issues before opening this report
required: true
- type: textarea
id: description
attributes:
label: Bug Description
description: |
Please explain what you're seeing and what you expected to see.
Include any error messages or unexpected behavior. 🙏
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Steps to Reproduce
description: |
How can we reproduce this issue? Please be as specific as possible.
placeholder: |
1. Set up falcon-mcp with...
2. Run command...
3. Observe error...
validations:
required: true
- type: dropdown
id: installation
attributes:
label: Installation Method
description: How did you install falcon-mcp?
options:
- pip install falcon-mcp
- uvx run falcon-mcp
- Docker container
- Development setup (git clone)
- Other (please specify in description)
validations:
required: true
- type: textarea
id: environment
attributes:
label: Environment Details
description: |
Please share relevant environment information.
**Please don't include API credentials or sensitive information!**
placeholder: |
- Python version: 3.11
- Operating System: macOS 14.1
- MCP Client: Claude Desktop
- Enabled modules: detections,incidents,hosts
- Falcon API region: us-1
render: text
validations:
required: true
- type: textarea
id: logs
attributes:
label: Error Logs (Optional)
description: |
If you have error logs, please include them here.
**Important: Remove any sensitive information like API keys before pasting!**
render: text
- type: textarea
id: additional
attributes:
label: Additional Context (Optional)
description: |
Anything else that might help us understand the issue?
Screenshots, configuration files, or related issues are helpful!
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/sensor_usage.py:
--------------------------------------------------------------------------------
```python
"""
Contains Sensor Usage resources.
"""
from falcon_mcp.common.utils import generate_md_table
# List of tuples containing filter options data: (name, type, operators, description)
SEARCH_SENSOR_USAGE_FQL_FILTERS = [
(
"Name",
"Type",
"Operators",
"Description"
),
(
"event_date",
"Date",
"Yes",
"""
The final date of the results to be returned in ISO 8601 format (YYYY-MM-DD).
Data is available for retrieval starting with the current date minus 2 days
and going back 395 days.
Data is not available for the current date or the current date minus 1 day.
Default: the current date minus 2 days
Ex: event_date:'2024-06-11'
"""
),
(
"period",
"String",
"Yes",
"""
The number of days of data to return. Even though this looks like a number, make sure to always use quotes for period for example '3' instead of 3.
Minimum: 1
Maximum: 395
Default: 28
Ex: period:'30'
"""
),
(
"selected_cids",
"String",
"No",
"""
A comma-separated list of up to 100 CID IDs to return data for.
This filter is available to Falcon Flight Control parent CIDs and to CIDs
in multi-CID deployments with the access-account-billing-data feature flag enabled.
Note: This field is case-sensitive and requires the correct input of capital and lowercase letters.
Ex: selected_cids:'cid_1,cid_2,cid_3'
"""
),
]
SEARCH_SENSOR_USAGE_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Sensor Usage Guide
=== BASIC SYNTAX ===
property_name:[operator]'value'
=== AVAILABLE OPERATORS ===
✅ **WORKING OPERATORS:**
• No operator = equals (default) - ALL FIELDS
• ! = not equal to - ALL FIELDS
• > = greater than - DATE AND INTEGER FIELDS
• >= = greater than or equal - DATE AND INTEGER FIELDS
• < = less than - DATE AND INTEGER FIELDS
• <= = less than or equal - DATE AND INTEGER FIELDS
=== DATA TYPES & SYNTAX ===
• Dates: 'YYYY-MM-DD' (ISO 8601 format)
• Integers: 30 (without quotes)
• Strings: 'value' or ['exact_value'] for exact match
=== COMBINING CONDITIONS ===
• + = AND condition
• , = OR condition
• ( ) = Group expressions
=== falcon_search_sensor_usage FQL filter options ===
""" + generate_md_table(SEARCH_SENSOR_USAGE_FQL_FILTERS) + """
=== ✅ WORKING PATTERNS ===
**Basic Equality:**
• event_date:'2024-06-11'
• period:'30'
• selected_cids:'cid_1,cid_2,cid_3'
**Combined Conditions:**
• event_date:'2024-06-11'+period:'30'
• event_date:'2024-06-11'+selected_cids:'cid_1,cid_2'
**Date Comparisons:**
• event_date:>'2024-01-01'
• event_date:<='2024-06-11'
**Period Comparisons:**
• period:>='14'
• period:<='60'
=== 💡 SYNTAX RULES ===
• Use single quotes around values: 'value'
• Date format must be ISO 8601: 'YYYY-MM-DD'
• Combine conditions with + (AND) or , (OR)
• Use parentheses for grouping: (condition1,condition2)+condition3
"""
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_sensor_usage.py:
--------------------------------------------------------------------------------
```python
"""
E2E tests for the Sensor Usage module.
"""
import unittest
import pytest
from tests.e2e.utils.base_e2e_test import BaseE2ETest, ensure_dict
@pytest.mark.e2e
class TestSensorUsageModuleE2E(BaseE2ETest):
"""
End-to-end test suite for the Falcon MCP Server Sensor Usage Module.
"""
def test_search_sensor_usage(self):
"""Verify the agent can show sensor usage for a specific event_date"""
async def test_logic():
fixtures = [
{
"operation": "GetSensorUsageWeekly",
"validator": lambda kwargs: "event_date:'2025-08-02'" in kwargs.get("parameters", {}).get("filter", ""),
"response": {
"status_code": 200,
"body": {
"resources": [
{
"containers": 42.5,
"public_cloud_with_containers": 42,
"public_cloud_without_containers": 42.75,
"servers_with_containers": 42.25,
"servers_without_containers": 42.75,
"workstations": 42.75,
"mobile": 42.75,
"lumos": 42.25,
"chrome_os": 0,
"date": "2025-08-02"
}
]
},
},
}
]
self._mock_api_instance.command.side_effect = (
self._create_mock_api_side_effect(fixtures)
)
prompt = "Show me sensor usage on 2025-08-02"
return await self._run_agent_stream(prompt)
def assertions(tools, result):
tool_names_called = [tool["input"]["tool_name"] for tool in tools]
self.assertIn("falcon_search_sensor_usage_fql_guide", tool_names_called)
self.assertIn("falcon_search_sensor_usage", tool_names_called)
used_tool = tools[len(tools) - 1]
# Verify the tool input contains the filter parameter with proper FQL syntax
tool_input = ensure_dict(used_tool["input"]["tool_input"])
self.assertIn("filter", tool_input, "Tool input should contain a 'filter' parameter")
self.assertIn("event_date:'2025-08-02", tool_input.get("filter", ""), "Filter should contain event_date:'2025-08-02' in FQL syntax")
# # Verify API call parameters
self.assertGreaterEqual(
self._mock_api_instance.command.call_count,
1,
"Expected at least 1 API call",
)
api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
"parameters", {}
)
self.assertIn("event_date:'2025-08-02'", api_call_params.get("filter", ""))
self.run_test_with_retries(
"test_search_sensor_usage", test_logic, assertions
)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/tests/common/test_logging.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the logging utilities.
"""
import logging
import unittest
from unittest.mock import MagicMock, patch
from falcon_mcp.common.logging import configure_logging, get_logger
class TestLoggingUtils(unittest.TestCase):
"""Test cases for the logging utilities."""
@patch("falcon_mcp.common.logging.logging.basicConfig")
@patch("falcon_mcp.common.logging.logging.getLogger")
def test_configure_logging_debug(self, mock_get_logger, mock_basic_config):
"""Test configuring logging with debug enabled."""
# Setup mock
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
# Call configure_logging with debug=True
logger = configure_logging(debug=True, name="test_logger")
# Verify basicConfig was called with DEBUG level
mock_basic_config.assert_called_once()
_args, kwargs = mock_basic_config.call_args
self.assertEqual(kwargs["level"], logging.DEBUG)
# Verify logger was configured correctly
mock_get_logger.assert_called_with("test_logger")
mock_logger.setLevel.assert_called_with(logging.DEBUG)
# Verify logger was returned
self.assertEqual(logger, mock_logger)
@patch("falcon_mcp.common.logging.logging.basicConfig")
@patch("falcon_mcp.common.logging.logging.getLogger")
def test_configure_logging_info(self, mock_get_logger, mock_basic_config):
"""Test configuring logging with debug disabled."""
# Setup mock
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
# Call configure_logging with debug=False
logger = configure_logging(debug=False, name="test_logger")
# Verify basicConfig was called with INFO level
mock_basic_config.assert_called_once()
_args, kwargs = mock_basic_config.call_args
self.assertEqual(kwargs["level"], logging.INFO)
# Verify logger was configured correctly
mock_get_logger.assert_called_with("test_logger")
mock_logger.setLevel.assert_called_with(logging.INFO)
# Verify logger was returned
self.assertEqual(logger, mock_logger)
@patch("falcon_mcp.common.logging.logging.getLogger")
def test_get_logger_with_name(self, mock_get_logger):
"""Test getting a logger with a specific name."""
# Setup mock
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
# Call get_logger with a name
logger = get_logger("test_logger")
# Verify getLogger was called with the correct name
mock_get_logger.assert_called_with("test_logger")
# Verify logger was returned
self.assertEqual(logger, mock_logger)
@patch("falcon_mcp.common.logging.logging.getLogger")
def test_get_logger_default_name(self, mock_get_logger):
"""Test getting a logger with the default name."""
# Setup mock
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
# Call get_logger without a name
logger = get_logger()
# Verify getLogger was called with the default name
mock_get_logger.assert_called_with("falcon_mcp")
# Verify logger was returned
self.assertEqual(logger, mock_logger)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/sensor_usage.py:
--------------------------------------------------------------------------------
```python
"""
Sensor Usage module for Falcon MCP Server
This module provides tools for accessing CrowdStrike Falcon sensor usage data.
"""
from typing import Any, Dict, List
from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.sensor_usage import SEARCH_SENSOR_USAGE_FQL_DOCUMENTATION
logger = get_logger(__name__)
class SensorUsageModule(BaseModule):
"""Module for accessing CrowdStrike Falcon sensor usage data."""
def register_tools(self, server: FastMCP) -> None:
"""Register tools with the MCP server.
Args:
server: MCP server instance
"""
# Register tools
self._add_tool(
server=server,
method=self.search_sensor_usage,
name="search_sensor_usage",
)
def register_resources(self, server: FastMCP) -> None:
"""Register resources with the MCP server.
Args:
server: MCP server instance
"""
search_sensor_usage_fql_resource = TextResource(
uri=AnyUrl("falcon://sensor-usage/weekly/fql-guide"),
name="falcon_search_sensor_usage_fql_guide",
description="Contains the guide for the `filter` param of the `falcon_search_sensor_usage` tool.",
text=SEARCH_SENSOR_USAGE_FQL_DOCUMENTATION,
)
self._add_resource(
server,
search_sensor_usage_fql_resource,
)
def search_sensor_usage(
self,
filter: str | None = Field(
default=None,
description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://sensor-usage/weekly/fql-guide` resource when building this filter parameter.",
examples={"event_date:'2024-06-11'", "period:'30'"},
),
) -> List[Dict[str, Any]]:
"""Search for sensor usage data in your CrowdStrike environment.
IMPORTANT: You must use the `falcon://sensor-usage/weekly/fql-guide` resource when you need to use the `filter` parameter.
This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_sensor_usage` tool.
"""
# Prepare parameters for GetSensorUsageWeekly
params = prepare_api_parameters(
{
"filter": filter,
}
)
# Define the operation name
operation = "GetSensorUsageWeekly"
logger.debug("Searching sensor usage with params: %s", params)
# Make the API request
response = self.client.command(operation, parameters=params)
# Use handle_api_response to get the results
results = handle_api_response(
response,
operation=operation,
error_message="Failed to search sensor usage",
default_result=[],
)
# If handle_api_response returns an error dict instead of a list,
# it means there was an error, so we return it wrapped in a list
if self._is_error(results):
return [results]
return results
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/base.py:
--------------------------------------------------------------------------------
```python
"""
Base module for Falcon MCP Server
This module provides the base class for all Falcon MCP server modules.
"""
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List
from mcp import Resource
from mcp.server import FastMCP
from falcon_mcp.client import FalconClient
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
logger = get_logger(__name__)
class BaseModule(ABC):
"""Base class for all Falcon MCP server modules."""
def __init__(self, client: FalconClient):
"""Initialize the module.
Args:
client: Falcon API client
"""
self.client = client
self.tools = [] # List to track registered tools
self.resources = [] # List to track registered resources
@abstractmethod
def register_tools(self, server: FastMCP) -> None:
"""Register tools with the MCP server.
Args:
server: MCP server instance
"""
def register_resources(self, server: FastMCP) -> None:
"""Register resources with the MCP Server.
Args:
server: MCP server instance
"""
def _add_tool(self, server: FastMCP, method: Callable, name: str) -> None:
"""Add a tool to the MCP server and track it.
Args:
server: MCP server instance
method: Method to register
name: Tool name
"""
prefixed_name = f"falcon_{name}"
server.add_tool(method, name=prefixed_name)
self.tools.append(prefixed_name)
logger.debug("Added tool: %s", prefixed_name)
def _add_resource(self, server: FastMCP, resource: Resource) -> None:
"""Add a resource to the MCP server and track it.
Args:
server: MCP server instance
resource: Resource object
"""
server.add_resource(resource=resource)
resource_uri = resource.uri
self.resources.append(resource_uri)
logger.debug("Added resource: %s", resource_uri)
def _base_get_by_ids(
self,
operation: str,
ids: List[str],
id_key: str = "ids",
**additional_params,
) -> List[Dict[str, Any]] | Dict[str, Any]:
"""Helper method for API operations that retrieve entities by IDs.
Args:
operation: The API operation name
ids: List of entity IDs
id_key: The key name for IDs in the request body (default: "ids")
**additional_params: Additional parameters to include in the request body
Returns:
List of entity details or error dict
"""
# Build the request body with dynamic ID key and additional parameters
body_params = {id_key: ids}
body_params.update(additional_params)
body = prepare_api_parameters(body_params)
# Make the API request
response = self.client.command(operation, body=body)
# Handle the response
return handle_api_response(
response,
operation=operation,
error_message="Failed to perform operation",
default_result=[],
)
def _is_error(self, response: Any) -> bool:
return isinstance(response, dict) and "error" in response
```
--------------------------------------------------------------------------------
/tests/test_registry.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the module registry.
"""
import unittest
from unittest.mock import MagicMock
from falcon_mcp import registry
from falcon_mcp.modules.base import BaseModule
class TestRegistry(unittest.TestCase):
"""Test cases for the module registry."""
def setUp(self):
"""Set up test fixtures before each test method."""
# Clear the AVAILABLE_MODULES dictionary before each test
registry.AVAILABLE_MODULES.clear()
def tearDown(self):
"""Clean up after each test method."""
# Restore the original AVAILABLE_MODULES dictionary
registry.AVAILABLE_MODULES.clear()
# Re-discover modules to restore the original state
registry.discover_modules()
def test_discover_modules(self):
"""Test that discover_modules correctly populates AVAILABLE_MODULES."""
# Call discover_modules
registry.discover_modules()
# Verify that AVAILABLE_MODULES is not empty
self.assertGreater(len(registry.AVAILABLE_MODULES), 0)
# Verify that all registered modules are subclasses of BaseModule
for module_class in registry.AVAILABLE_MODULES.values():
self.assertTrue(issubclass(module_class, BaseModule))
def test_get_module_names(self):
"""Test that get_module_names returns the correct list of module names."""
# Manually populate AVAILABLE_MODULES with some test modules
registry.AVAILABLE_MODULES = {
"test1": MagicMock(),
"test2": MagicMock(),
"test3": MagicMock(),
}
# Call get_module_names
module_names = registry.get_module_names()
# Verify that the returned list contains all the expected module names
self.assertEqual(set(module_names), {"test1", "test2", "test3"})
self.assertEqual(len(module_names), 3)
def test_get_module_names_lazy_discovery(self):
"""Test that get_module_names performs lazy discovery when no modules are registered."""
# Ensure AVAILABLE_MODULES is empty
registry.AVAILABLE_MODULES.clear()
# Call get_module_names (should trigger lazy discovery)
module_names = registry.get_module_names()
# Verify that modules were discovered (should not be empty)
self.assertGreater(len(module_names), 0)
# Verify that the expected modules are discovered
expected_modules = ["detections", "incidents", "intel"]
for module_name in expected_modules:
self.assertIn(module_name, module_names)
def test_actual_modules_discovery(self):
"""Test that actual modules in the project are discovered correctly."""
# Clear the AVAILABLE_MODULES dictionary
registry.AVAILABLE_MODULES.clear()
# Call discover_modules
registry.discover_modules()
# Get the list of expected module names based on the project structure
# This assumes that the project has modules like 'incidents', 'intel'
expected_modules = ["incidents", "intel"]
# Verify that all expected modules are discovered
for module_name in expected_modules:
self.assertIn(module_name, registry.AVAILABLE_MODULES)
# Verify that all discovered modules are subclasses of BaseModule
for module_class in registry.AVAILABLE_MODULES.values():
self.assertTrue(issubclass(module_class, BaseModule))
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/.github/workflows/docker-build-push.yml:
--------------------------------------------------------------------------------
```yaml
name: Docker Build & Push
on:
push:
branches: [main]
release:
types: [published]
permissions:
contents: read
packages: write
jobs:
docker-build-push:
runs-on: ubuntu-latest
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- name: Checkout code
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435
- name: Log in to Quay.io
uses: docker/login-action@184bdaa0721073962dff0199f1fb9940f07167d1
with:
registry: quay.io
username: ${{ secrets.QUAY_USERNAME }}
password: ${{ secrets.QUAY_PASSWORD }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@c1e51972afc2121e065aed6d45c65596fe445f3f
with:
images: quay.io/crowdstrike/falcon-mcp
tags: |
type=raw,value=latest,enable=${{ github.event_name == 'push' }}
type=semver,pattern={{version}},enable=${{ github.event_name == 'release' }}
flavor: |
latest=${{ github.event_name == 'push' }}
labels: |
org.opencontainers.image.title=Falcon MCP Server
org.opencontainers.image.description=Model Context Protocol server for CrowdStrike Falcon
org.opencontainers.image.vendor=CrowdStrike
org.opencontainers.image.licenses=MIT
org.opencontainers.image.source=https://github.com/CrowdStrike/falcon-mcp
org.opencontainers.image.documentation=https://github.com/CrowdStrike/falcon-mcp/blob/main/README.md
- name: Build and push Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Generate image summary
run: |
# Get generated tags and extract the actual tag for pull command
TAGS="${{ steps.meta.outputs.tags }}"
FULL_TAG=$(echo "$TAGS" | head -n1)
TAG_ONLY=$(echo "$FULL_TAG" | sed 's/.*://')
if [ "${{ github.event_name }}" = "push" ]; then
EVENT_TYPE="Main Branch Push"
else
EVENT_TYPE="Release"
fi
echo "## 🐳 Docker Image Published" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Event:** $EVENT_TYPE" >> $GITHUB_STEP_SUMMARY
echo "**Registry:** quay.io/crowdstrike/falcon-mcp" >> $GITHUB_STEP_SUMMARY
echo "**Tags:**" >> $GITHUB_STEP_SUMMARY
echo '```' >> $GITHUB_STEP_SUMMARY
echo "$TAGS" >> $GITHUB_STEP_SUMMARY
echo '```' >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Platforms:** linux/amd64, linux/arm64" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Pull Command:**" >> $GITHUB_STEP_SUMMARY
echo '```bash' >> $GITHUB_STEP_SUMMARY
echo "docker pull quay.io/crowdstrike/falcon-mcp:$TAG_ONLY" >> $GITHUB_STEP_SUMMARY
echo '```' >> $GITHUB_STEP_SUMMARY
```
--------------------------------------------------------------------------------
/falcon_mcp/common/errors.py:
--------------------------------------------------------------------------------
```python
"""
Error handling utilities for Falcon MCP Server
This module provides error handling utilities for the Falcon MCP server.
"""
from typing import Any, Dict, Optional
from .api_scopes import get_required_scopes
from .logging import get_logger
logger = get_logger(__name__)
# Common error codes and their meanings
ERROR_CODE_DESCRIPTIONS = {
403: "Permission denied. The API credentials don't have the required access.",
401: "Authentication failed. The API credentials are invalid or expired.",
404: "Resource not found. The requested resource does not exist.",
429: "Rate limit exceeded. Too many requests in a short period.",
500: "Server error. An unexpected error occurred on the server.",
503: "Service unavailable. The service is temporarily unavailable.",
}
class FalconError(Exception):
"""Base exception for all Falcon MCP server errors."""
class AuthenticationError(FalconError):
"""Raised when authentication with the Falcon API fails."""
class APIError(FalconError):
"""Raised when a Falcon API request fails."""
def __init__(
self,
message: str,
status_code: Optional[int] = None,
body: Optional[Dict[str, Any]] = None,
operation: Optional[str] = None,
):
self.status_code = status_code
self.body = body
self.operation = operation
super().__init__(message)
def is_success_response(response: Dict[str, Any]) -> bool:
"""Check if an API response indicates success.
Args:
response: The API response dictionary
Returns:
bool: True if the response indicates success (status code 200)
"""
return response.get("status_code") == 200
def _format_error_response(
message: str,
details: Optional[Dict[str, Any]] = None,
operation: Optional[str] = None,
) -> Dict[str, Any]:
"""Format an error as a standardized response.
Args:
message: The error message
details: Additional error details
operation: The API operation that failed (used for permission errors)
Returns:
Dict[str, Any]: Formatted error response
"""
response = {"error": message}
# Add details if provided
if details:
response["details"] = details
# Special handling for permission errors (403)
if details.get("status_code") == 403 and operation:
required_scopes = get_required_scopes(operation)
if required_scopes:
response["required_scopes"] = required_scopes
scopes_list = ", ".join(required_scopes)
response["resolution"] = (
f"This operation requires the following API scopes: {scopes_list}. "
"Please ensure your API client has been granted these scopes in the "
"CrowdStrike Falcon console."
)
# Log the error
logger.error("Error: %s", message)
return response
def handle_api_response(
response: Dict[str, Any],
operation: str,
error_message: str = "API request failed",
default_result: Any = None,
) -> Dict[str, Any] | Any:
"""Handle an API response, returning either the result or an error.
Args:
response: The API response dictionary
operation: The API operation that was performed
error_message: The error message to use if the request failed
default_result: The default result to return if the response is empty
Returns:
Dict[str, Any]|Any: The result or an error response
"""
status_code = response.get("status_code")
if status_code != 200:
# Get a more descriptive error message based on status code
status_message = ERROR_CODE_DESCRIPTIONS.get(
status_code, f"Request failed with status code {status_code}"
)
# For permission errors, add more context
if status_code == 403:
required_scopes = get_required_scopes(operation)
if required_scopes:
status_message += f" Required scopes: {', '.join(required_scopes)}"
# Log the error
logger.error("Error: %s: %s", error_message, status_message)
return _format_error_response(
f"{error_message}: {status_message}", details=response, operation=operation
)
# Extract resources from the response body
resources = response.get("body", {}).get("resources", [])
if not resources and default_result is not None:
return default_result
return resources
```
--------------------------------------------------------------------------------
/tests/modules/test_cloud.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the Cloud module.
"""
import unittest
from falcon_mcp.modules.cloud import CloudModule
from tests.modules.utils.test_modules import TestModules
class TestCloudModule(TestModules):
"""Test cases for the Cloud module."""
def setUp(self):
"""Set up test fixtures."""
self.setup_module(CloudModule)
def test_register_tools(self):
"""Test registering tools with the server."""
expected_tools = [
"falcon_search_kubernetes_containers",
"falcon_count_kubernetes_containers",
"falcon_search_images_vulnerabilities",
]
self.assert_tools_registered(expected_tools)
def test_register_resources(self):
"""Test registering resources with the server."""
expected_resources = [
"falcon_kubernetes_containers_fql_filter_guide",
"falcon_images_vulnerabilities_fql_filter_guide",
]
self.assert_resources_registered(expected_resources)
def test_search_kubernetes_containers(self):
"""Test searching for kubernetes containers."""
mock_response = {
"status_code": 200,
"body": {"resources": ["container_1", "container_2"]},
}
self.mock_client.command.return_value = mock_response
result = self.module.search_kubernetes_containers(
filter="cloud_name:'AWS'", limit=1
)
self.assertEqual(self.mock_client.command.call_count, 1)
first_call = self.mock_client.command.call_args_list[0]
self.assertEqual(first_call[0][0], "ReadContainerCombined")
self.assertEqual(first_call[1]["parameters"]["filter"], "cloud_name:'AWS'")
self.assertEqual(first_call[1]["parameters"]["limit"], 1)
self.assertEqual(result, ["container_1", "container_2"])
def test_search_kubernetes_containers_errors(self):
"""Test searching for kubernetes containers with API error."""
mock_response = {
"status_code": 400,
"body": {"errors": [{"message": "Invalid filter"}]},
}
self.mock_client.command.return_value = mock_response
result = self.module.search_kubernetes_containers(filter="invalid_filter")
self.assertIsInstance(result, dict)
self.assertIn("error", result)
self.assertIn("details", result)
def test_count_kubernetes_containers(self):
"""Test count for kubernetes containers."""
mock_response = {"status_code": 200, "body": {"resources": [500]}}
self.mock_client.command.return_value = mock_response
result = self.module.count_kubernetes_containers(filter="cloud_region:'us-1'")
self.assertEqual(self.mock_client.command.call_count, 1)
first_call = self.mock_client.command.call_args_list[0]
self.assertEqual(first_call[0][0], "ReadContainerCount")
self.assertEqual(first_call[1]["parameters"]["filter"], "cloud_region:'us-1'")
self.assertEqual(result, [500])
def test_count_kubernetes_containers_errors(self):
"""Test count for kubernetes containers with API error."""
mock_response = {
"status_code": 500,
"body": {"errors": [{"message": "internal error"}]},
}
self.mock_client.command.return_value = mock_response
result = self.module.search_kubernetes_containers(filter="invalid_filter")
self.assertIsInstance(result, dict)
self.assertIn("error", result)
self.assertIn("details", result)
def test_search_images_vulnerabilities(self):
"""Test search for images vulnerabilities."""
mock_response = {"status_code": 200, "body": {"resources": ["cve_id_1"]}}
self.mock_client.command.return_value = mock_response
result = self.module.search_images_vulnerabilities(
filter="cvss_score:>5", limit=1
)
self.assertEqual(self.mock_client.command.call_count, 1)
first_call = self.mock_client.command.call_args_list[0]
self.assertEqual(first_call[0][0], "ReadCombinedVulnerabilities")
self.assertEqual(first_call[1]["parameters"]["filter"], "cvss_score:>5")
self.assertEqual(first_call[1]["parameters"]["limit"], 1)
self.assertEqual(result, ["cve_id_1"])
def test_search_images_vulnerabilities_errors(self):
"""Test search for images vulnerabilities with API error."""
mock_response = {
"status_code": 400,
"body": {"errors": [{"message": "invalid sort"}]},
}
self.mock_client.command.return_value = mock_response
result = self.module.search_kubernetes_containers(sort="1|1")
self.assertIsInstance(result, dict)
self.assertIn("error", result)
self.assertIn("details", result)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/docs/e2e_testing.md:
--------------------------------------------------------------------------------
```markdown
# End-to-End Testing Guide
This document provides guidance on running and understanding the end-to-end tests for the Falcon MCP Server.
## Configuration
The E2E tests can be configured using environment variables or a `.env` file. For development and testing, copy the development example file:
```bash
cp .env.dev.example .env
```
Then configure the E2E testing variables:
### LLM Configuration
```bash
# API key for OpenAI or compatible API
OPENAI_API_KEY=your-api-key
# Optional: Custom base URL for LLM API (for VPN-only or custom endpoints)
OPENAI_BASE_URL=https://your-custom-llm-endpoint.com/v1
# Optional: Comma-separated list of models to test against
MODELS_TO_TEST=example-model-1,example-model-2
```
If not specified, the tests will use the default models defined in `tests/e2e/utils/base_e2e_test.py`.
## Running E2E Tests
End-to-end tests are marked with the `@pytest.mark.e2e` decorator and require the `--run-e2e` flag to run:
```bash
# Run all E2E tests
pytest --run-e2e tests/e2e/
# Run a specific E2E test
pytest --run-e2e tests/e2e/test_mcp_server.py::TestFalconMCPServerE2E::test_get_top_3_high_severity_detections
```
> [!IMPORTANT]
> When running E2E tests with verbose output, the `-s` flag is **required** to see any meaningful output.
> This is because pytest normally captures stdout/stderr, and our tests output information via print statements.
> Without the `-s` flag, you won't see any of the detailed output, even with `-v` or `-vv` flags.
## Verbose Output
The E2E tests support different levels of verbosity, but **all require the `-s` flag** to display detailed output:
### Standard Output (No Verbosity)
By default, tests run with minimal output and the agent runs silently:
```bash
pytest --run-e2e -s tests/e2e/
```
### Verbose Output
To see more detailed output, including basic agent debug information, use both `-v` and `-s` flags:
```bash
pytest --run-e2e -v -s tests/e2e/
```
With this level of verbosity, you'll see:
- Test execution progress
- Basic agent operations
- Tool calls and responses
- Test success/failure information
### Extra Verbose Output
For even more detailed output, including all agent events and detailed debugging information:
```bash
pytest --run-e2e -vv -s tests/e2e/
```
This level shows everything from the verbose level plus:
- Detailed agent thought processes
- Step-by-step execution flow
- Complete prompt and response content
- Detailed tool execution information
> [!NOTE]
> The `-s` flag disables pytest's output capture, allowing all print statements to be displayed.
> Without this flag, you won't see any of the detailed output from the tests.
>
> The verbosity level (`-v`, `-vv`) controls both test output verbosity AND agent debug output.
> Higher verbosity levels are extremely useful when diagnosing test failures or unexpected agent behavior.
## Test Retry Logic
The E2E tests use a retry mechanism to handle the non-deterministic nature of LLM responses. Each test is run multiple times against different models, and the test passes if a certain percentage of runs succeed.
The retry configuration can be found at the top of `tests/e2e/utils/base_e2e_test.py`:
```python
# Default models to test against
DEFAULT_MODLES_TO_TEST = ["gpt-4.1-mini", "gpt-4o-mini"]
# Default number of times to run each test
DEFAULT_RUNS_PER_TEST = 2
# Default success threshold for passing a test
DEFAULT_SUCCESS_TRESHOLD = 0.7
```
This means each test will run 2 times for each model and the test will pass if at least 70% of the runs succeed.
Each of these can be overridden by using the appropriate environment variable:
- MODELS_TO_TEST
- RUNS_PER_TEST
- SUCCESS_THRESHOLD
For example:
```bash
# Test with Claude models
MODELS_TO_TEST=example-model-1,example-model-2 pytest --run-e2e -s tests/e2e/
```
## Troubleshooting
### Not Seeing Any Output?
If you're running tests with `-v` but not seeing any detailed output, make sure you've included the `-s` flag:
```bash
# CORRECT: Will show detailed output
pytest --run-e2e -v -s tests/e2e/
# INCORRECT: Will not show detailed output
pytest --run-e2e -v tests/e2e/
```
### Diagnosing Test Failures
If a test is failing, try running it with full debug output (`-v -s` or `-vv -s` flags) to see what's happening. Look for:
1. Connection issues with the MCP server
2. Unexpected LLM responses
3. Assertion failures in the test logic
4. Agent debugging information (enabled with verbosity)
The verbose output will show you the exact prompts, responses, and tool calls, which can help diagnose the issue. With higher verbosity levels, you'll also see detailed agent debugging information that can help identify why the agent isn't behaving as expected.
### Using Custom LLM Endpoints
If you need to use a custom LLM endpoint (e.g., for VPN-only accessible models), set the `OPENAI_BASE_URL` environment variable:
```bash
# Use a custom LLM endpoint
OPENAI_BASE_URL=https://your-custom-llm-endpoint.com/v1 pytest --run-e2e -s tests/e2e/
```
This is particularly useful when testing with models that are only accessible through specific endpoints or when using a proxy server.
```
--------------------------------------------------------------------------------
/tests/modules/test_incidents.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the Incidents module.
"""
import unittest
from falcon_mcp.modules.incidents import IncidentsModule
from tests.modules.utils.test_modules import TestModules
class TestIncidentsModule(TestModules):
"""Test cases for the Incidents module."""
def setUp(self):
"""Set up test fixtures."""
self.setup_module(IncidentsModule)
def test_register_tools(self):
"""Test registering tools with the server."""
expected_tools = [
"falcon_show_crowd_score",
"falcon_get_incident_details",
"falcon_search_incidents",
"falcon_get_behavior_details",
"falcon_search_behaviors",
]
self.assert_tools_registered(expected_tools)
def test_register_resources(self):
"""Test registering resources with the server."""
expected_resources = [
"falcon_show_crowd_score_fql_guide",
"falcon_search_incidents_fql_guide",
"falcon_search_behaviors_fql_guide",
]
self.assert_resources_registered(expected_resources)
def test_crowd_score(self):
"""Test querying CrowdScore with successful response."""
# Setup mock response with sample scores
mock_response = {
"status_code": 200,
"body": {
"resources": [
{"id": "score1", "score": 50, "adjusted_score": 60},
{"id": "score2", "score": 70, "adjusted_score": 80},
]
},
}
self.mock_client.command.return_value = mock_response
# Call crowd_score with test parameters
result = self.module.show_crowd_score(
filter="test filter",
limit=100,
offset=0,
sort="modified_timestamp.desc",
)
# Verify client command was called correctly
self.mock_client.command.assert_called_once_with(
"CrowdScore",
parameters={
"filter": "test filter",
"limit": 100,
"offset": 0,
"sort": "modified_timestamp.desc",
},
)
# Verify result contains expected values
self.assertEqual(result["average_score"], 60) # (50 + 70) / 2
self.assertEqual(result["average_adjusted_score"], 70) # (60 + 80) / 2
self.assertEqual(len(result["scores"]), 2)
self.assertEqual(result["scores"][0]["id"], "score1")
self.assertEqual(result["scores"][1]["id"], "score2")
def test_crowd_score_empty_response(self):
"""Test querying CrowdScore with empty response."""
# Setup mock response with empty resources
mock_response = {"status_code": 200, "body": {"resources": []}}
self.mock_client.command.return_value = mock_response
# Call crowd_score
result = self.module.show_crowd_score()
# Verify client command was called with the correct operation
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "CrowdScore")
# Verify result contains expected default values
self.assertEqual(result["average_score"], 0)
self.assertEqual(result["average_adjusted_score"], 0)
self.assertEqual(result["scores"], [])
def test_crowd_score_error(self):
"""Test querying CrowdScore with API error."""
# Setup mock response with error
mock_response = {
"status_code": 400,
"body": {"errors": [{"message": "Invalid query"}]},
}
self.mock_client.command.return_value = mock_response
# Call crowd_score
result = self.module.show_crowd_score(filter="invalid query")
# Verify result contains error
self.assertIn("error", result)
self.assertIn("details", result)
# Check that the error message starts with the expected prefix
self.assertTrue(result["error"].startswith("Failed to perform operation"))
def test_crowd_score_with_default_parameters_and_rounding(self):
"""Test querying CrowdScore with default parameters and rounding"""
# Setup mock response
mock_response = {
"status_code": 200,
"body": {
"resources": [
{"id": "score1", "score": 30, "adjusted_score": 40},
{"id": "score1", "score": 31, "adjusted_score": 41},
]
},
}
self.mock_client.command.return_value = mock_response
# Call crowd_score with no parameters (using defaults)
result = self.module.show_crowd_score()
# Verify client command was called with the correct operation
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "CrowdScore")
# Verify result
self.assertEqual(result["average_score"], 30)
self.assertEqual(result["average_adjusted_score"], 40)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_serverless.py:
--------------------------------------------------------------------------------
```python
"""
E2E tests for the Serverless module.
"""
import unittest
import pytest
from tests.e2e.utils.base_e2e_test import BaseE2ETest, ensure_dict
@pytest.mark.e2e
class TestServerlessModuleE2E(BaseE2ETest):
"""
End-to-end test suite for the Falcon MCP Server Serverless Module.
"""
def test_search_serverless_vulnerabilities(self):
"""Verify the agent can search for high severity vulnerabilities in serverless environment"""
async def test_logic():
response = {
"status_code": 200,
"body": {
"runs": [
{
"tool": {
"driver": {
"name": "CrowdStrike",
"informationUri": "https://www.crowdstrike.com/",
"rules": [
{
"id": "CVE-2023-45678",
"name": "PythonPackageVulnerability",
"shortDescription": {"text": "Security vulnerability in package xyz"},
"fullDescription": {"text": "A critical vulnerability was found in package xyz that could lead to remote code execution"},
"help": {"text": "Package: xyz\nInstalled Version: 1.2.3\nVulnerability: CVE-2023-45678\nSeverity: HIGH\nRemediation: [Upgrade to version 2.0.0]"},
"properties": {
"severity": "HIGH",
"cvssBaseScore": 8.5,
"remediations": ["Upgrade to version 2.0.0"],
"cloudProvider": "AWS",
"region": "us-west-2",
"functionName": "sample-lambda-function"
}
}
]
}
}
}
]
},
}
fixtures = [
{
"operation": "GetCombinedVulnerabilitiesSARIF",
"validator": lambda kwargs: "severity:'HIGH'+cloud_provider:'aws'" in kwargs.get("parameters", {}).get("filter", ""),
"response": response,
},
{
"operation": "GetCombinedVulnerabilitiesSARIF",
"validator": lambda kwargs: "cloud_provider:'aws'+severity:'HIGH'" in kwargs.get("parameters", {}).get("filter", ""),
"response": response,
}
]
self._mock_api_instance.command.side_effect = (
self._create_mock_api_side_effect(fixtures)
)
prompt = "get vulnerabilities in my serverless environment with severity high only and part of AWS"
return await self._run_agent_stream(prompt)
def assertions(tools, result):
tool_names_called = [tool["input"]["tool_name"] for tool in tools]
self.assertIn("falcon_serverless_vulnerabilities_fql_guide", tool_names_called)
self.assertIn("falcon_search_serverless_vulnerabilities", tool_names_called)
# Find the search_serverless_vulnerabilities tool call
search_tool_call = None
for tool in tools:
if tool["input"]["tool_name"] == "falcon_search_serverless_vulnerabilities":
search_tool_call = tool
break
self.assertIsNotNone(search_tool_call, "Expected falcon_search_serverless_vulnerabilities tool to be called")
# # Verify the tool input contains the filter parameter with proper FQL syntax
tool_input = ensure_dict(search_tool_call["input"]["tool_input"])
self.assertIn("filter", tool_input, "Tool input should contain a 'filter' parameter")
self.assertIn("severity:'HIGH'", tool_input.get("filter", ""), "Filter should contain severity:'HIGH' in FQL syntax")
# # Verify API call parameters
self.assertGreaterEqual(
self._mock_api_instance.command.call_count,
1,
"Expected at least 1 API call",
)
api_call_args = self._mock_api_instance.command.call_args_list[0]
self.assertEqual(api_call_args[0][0], "GetCombinedVulnerabilitiesSARIF")
api_call_params = api_call_args[1].get("parameters", {})
self.assertIn("severity:'HIGH'", api_call_params.get("filter", ""))
self.assertIn("cloud_provider:'aws'", api_call_params.get("filter", ""))
self.run_test_with_retries(
"test_search_serverless_vulnerabilities", test_logic, assertions
)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/tests/modules/test_spotlight.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the Spotlight module.
"""
import unittest
from falcon_mcp.modules.spotlight import SpotlightModule
from tests.modules.utils.test_modules import TestModules
class TestSpotlightModule(TestModules):
"""Test cases for the Spotlight module."""
def setUp(self):
"""Set up test fixtures."""
self.setup_module(SpotlightModule)
def test_register_tools(self):
"""Test registering tools with the server."""
expected_tools = [
"falcon_search_vulnerabilities",
]
self.assert_tools_registered(expected_tools)
def test_register_resources(self):
"""Test registering resources with the server."""
expected_resources = [
"falcon_search_vulnerabilities_fql_guide",
]
self.assert_resources_registered(expected_resources)
def test_search_vulnerabilities_success(self):
"""Test searching vulnerabilities with successful response."""
# Setup mock response with sample vulnerability data
mock_response = {
"status_code": 200,
"body": {
"resources": [
{
"cve_id": "CVE-2023-12345",
"status": "open",
"severity": "HIGH",
"cvss_base_score": 8.5,
"created_timestamp": "2023-08-01T12:00:00Z",
"updated_timestamp": "2023-08-02T14:30:00Z",
"host_info": {
"hostname": "test-server",
"os_version": "Ubuntu 22.04"
}
}
]
},
}
self.mock_client.command.return_value = mock_response
# Call search_vulnerabilities with test parameters
result = self.module.search_vulnerabilities(filter="status:'open'")
# Verify client command was called correctly
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
# Check that the parameters dictionary contains the expected filter
params = call_args[1]["parameters"]
self.assertEqual(params["filter"], "status:'open'")
# Verify result contains expected values
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["cve_id"], "CVE-2023-12345")
self.assertEqual(result[0]["severity"], "HIGH")
self.assertEqual(result[0]["status"], "open")
self.assertEqual(result[0]["cvss_base_score"], 8.5)
def test_search_vulnerabilities_no_filter(self):
"""Test searching vulnerabilities with no filter parameter."""
# Setup mock response with sample vulnerability data
mock_response = {
"status_code": 200,
"body": {
"resources": [
{
"cve_id": "CVE-2023-12345",
"status": "open",
"severity": "HIGH"
}
]
},
}
self.mock_client.command.return_value = mock_response
# Call search_vulnerabilities with no filter
result = self.module.search_vulnerabilities()
# Verify client command was called with the correct operation
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
# Verify result contains expected values
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["cve_id"], "CVE-2023-12345")
def test_search_vulnerabilities_empty_response(self):
"""Test searching vulnerabilities with empty response."""
# Setup mock response with empty resources
mock_response = {"status_code": 200, "body": {"resources": []}}
self.mock_client.command.return_value = mock_response
# Call search_vulnerabilities
result = self.module.search_vulnerabilities(filter="status:'closed'")
# Verify client command was called with the correct operation
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
# Verify result is an empty list
self.assertEqual(result, [])
def test_search_vulnerabilities_error(self):
"""Test searching vulnerabilities with API error."""
# Setup mock response with error
mock_response = {
"status_code": 400,
"body": {"errors": [{"message": "Invalid query"}]},
}
self.mock_client.command.return_value = mock_response
# Call search_vulnerabilities
results = self.module.search_vulnerabilities(filter="invalid query")
result = results[0]
# Verify result contains error
self.assertIn("error", result)
self.assertIn("details", result)
# Check that the error message starts with the expected prefix
self.assertTrue(result["error"].startswith("Failed to search vulnerabilities"))
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/tests/modules/test_sensor_usage.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the Sensor Usage module.
"""
import unittest
from falcon_mcp.modules.sensor_usage import SensorUsageModule
from tests.modules.utils.test_modules import TestModules
class TestSensorUsageModule(TestModules):
"""Test cases for the Sensor Usage module."""
def setUp(self):
"""Set up test fixtures."""
self.setup_module(SensorUsageModule)
def test_register_tools(self):
"""Test registering tools with the server."""
expected_tools = [
"falcon_search_sensor_usage",
]
self.assert_tools_registered(expected_tools)
def test_register_resources(self):
"""Test registering resources with the server."""
expected_resources = [
"falcon_search_sensor_usage_fql_guide",
]
self.assert_resources_registered(expected_resources)
def test_search_sensor_usage_success(self):
"""Test searching sensor usage with successful response."""
# Setup mock response with sample sensor usage data
mock_response = {
"status_code": 200,
"body": {
"resources": [
{
"containers": 42.5,
"public_cloud_with_containers": 42,
"public_cloud_without_containers": 42.75,
"servers_with_containers": 42.25,
"servers_without_containers": 42.75,
"workstations": 42.75,
"mobile": 42.75,
"lumos": 42.25,
"chrome_os": 0,
"date": "2025-08-02"
}
]
},
}
self.mock_client.command.return_value = mock_response
# Call search_sensor_usage with test parameters
result = self.module.search_sensor_usage(filter="event_date:'2025-08-02'")
# Verify client command was called correctly
self.mock_client.command.assert_called_once_with(
"GetSensorUsageWeekly",
parameters={
"filter": "event_date:'2025-08-02'",
},
)
# Verify result contains expected values
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["date"], "2025-08-02")
self.assertEqual(result[0]["containers"], 42.5)
self.assertEqual(result[0]["workstations"], 42.75)
self.assertEqual(result[0]["mobile"], 42.75)
def test_search_sensor_usage_no_filter(self):
"""Test searching sensor usage with no filter parameter."""
# Setup mock response with sample sensor usage data
mock_response = {
"status_code": 200,
"body": {
"resources": [
{
"containers": 42.5,
"public_cloud_with_containers": 42,
"public_cloud_without_containers": 42.75,
"servers_with_containers": 42.25,
"servers_without_containers": 42.75,
"workstations": 42.75,
"mobile": 42.75,
"lumos": 42.25,
"chrome_os": 0,
"date": "2025-08-02"
}
]
},
}
self.mock_client.command.return_value = mock_response
# Call search_sensor_usage with no filter
result = self.module.search_sensor_usage()
# Verify client command was called with the correct operation
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "GetSensorUsageWeekly")
# Verify result contains expected values
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["date"], "2025-08-02")
def test_search_sensor_usage_empty_response(self):
"""Test searching sensor usage with empty response."""
# Setup mock response with empty resources
mock_response = {"status_code": 200, "body": {"resources": []}}
self.mock_client.command.return_value = mock_response
# Call search_sensor_usage
result = self.module.search_sensor_usage(filter="event_date:'2025-08-02'")
# Verify client command was called with the correct operation
self.assertEqual(self.mock_client.command.call_count, 1)
call_args = self.mock_client.command.call_args
self.assertEqual(call_args[0][0], "GetSensorUsageWeekly")
# Verify result is an empty list
self.assertEqual(result, [])
def test_search_sensor_usage_error(self):
"""Test searching sensor usage with API error."""
# Setup mock response with error
mock_response = {
"status_code": 400,
"body": {"errors": [{"message": "Invalid query"}]},
}
self.mock_client.command.return_value = mock_response
# Call search_sensor_usage
results = self.module.search_sensor_usage(filter="invalid query")
result = results[0]
# Verify result contains error
self.assertIn("error", result)
self.assertIn("details", result)
# Check that the error message starts with the expected prefix
self.assertTrue(result["error"].startswith("Failed to search sensor usage"))
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/serverless.py:
--------------------------------------------------------------------------------
```python
"""
Serverless Vulnerabilities module for Falcon MCP Server
This module provides tools for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities.
"""
from textwrap import dedent
from typing import Any, Dict, List
from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.serverless import SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION
logger = get_logger(__name__)
class ServerlessModule(BaseModule):
"""Module for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities."""
def register_tools(self, server: FastMCP) -> None:
"""Register tools with the MCP server.
Args:
server: MCP server instance
"""
# Register tools
self._add_tool(
server=server,
method=self.search_serverless_vulnerabilities,
name="search_serverless_vulnerabilities",
)
def register_resources(self, server: FastMCP) -> None:
"""Register resources with the MCP server.
Args:
server: MCP server instance
"""
serverless_vulnerabilities_fql_resource = TextResource(
uri=AnyUrl("falcon://serverless/vulnerabilities/fql-guide"),
name="falcon_serverless_vulnerabilities_fql_guide",
description="Contains the guide for the `filter` param of the `falcon_search_serverless_vulnerabilities` tool.",
text=SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION,
)
self._add_resource(
server,
serverless_vulnerabilities_fql_resource,
)
def search_serverless_vulnerabilities(
self,
filter: str = Field(
description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://serverless/vulnerabilities/fql-guide` resource when building this filter parameter.",
examples={"cloud_provider:'aws'", "severity:'HIGH'"},
),
limit: int | None = Field(
default=10,
ge=1,
description="The upper-bound on the number of records to retrieve. (Default: 10)",
),
offset: int | None = Field(
default=0,
description="The offset from where to begin.",
),
sort: str | None = Field(
default=None,
description=dedent("""
Sort serverless vulnerabilities using FQL syntax.
Supported sorting fields:
• application_name: Name of the application
• application_name_version: Version of the application
• cid: Customer ID
• cloud_account_id: Cloud account ID
• cloud_account_name: Cloud account name
• cloud_provider: Cloud provider
• cve_id: CVE ID
• cvss_base_score: CVSS base score
• exprt_rating: ExPRT rating
• first_seen_timestamp: When the vulnerability was first seen
• function_resource_id: Function resource ID
• is_supported: Whether the function is supported
• layer: Layer where the vulnerability was found
• region: Cloud region
• runtime: Runtime environment
• severity: Severity level
• timestamp: When the vulnerability was last updated
• type: Type of vulnerability
Format: 'field'
Examples: 'severity', 'cloud_provider', 'first_seen_timestamp'
""").strip(),
examples={
"severity",
"cloud_provider",
"first_seen_timestamp",
},
),
) -> List[Dict[str, Any]]:
"""Search for vulnerabilities in your serverless functions across all cloud service providers.
This endpoint provides security information in SARIF format, including:
- CVE IDs for identified vulnerabilities
- Severity levels
- Vulnerability descriptions
- Additional relevant details
IMPORTANT: You must use the `falcon://serverless/vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_serverless_vulnerabilities` tool.
"""
# Prepare parameters for GetCombinedVulnerabilitiesSARIF
params = prepare_api_parameters(
{
"filter": filter,
"limit": limit,
"offset": offset,
"sort": sort,
}
)
# Define the operation name
operation = "GetCombinedVulnerabilitiesSARIF"
logger.debug("Searching serverless vulnerabilities with params: %s", params)
# Make the API request
response = self.client.command(operation, parameters=params)
# Use handle_api_response to get vulnerability data
vulnerabilities = handle_api_response(
response,
operation=operation,
error_message="Failed to search serverless vulnerabilities",
)
# If handle_api_response returns an error dict instead of a list,
# it means there was an error, so we return it wrapped in a list
if self._is_error(vulnerabilities):
return [vulnerabilities]
return vulnerabilities.get("runs") or []
```
--------------------------------------------------------------------------------
/scripts/generate_e2e_report.py:
--------------------------------------------------------------------------------
```python
"""
Generate a static HTML report from test result data
"""
import json
import re
import sys
from html import escape
def generate_static_report(
data,
template_path="scripts/test_results_viewer.html",
output_path="static_test_report.html",
):
"""
Generates a static HTML report from test result data.
Args:
data (list): A list of test result dictionaries.
template_path (str): The path to the HTML template file to extract styles from.
output_path (str): The path to write the final static HTML file.
"""
try:
with open(template_path, "r", encoding="utf-8") as f:
html_template = f.read()
style_content = re.search(
r"<style>(.*?)</style>", html_template, re.DOTALL
).group(1)
except (FileNotFoundError, AttributeError):
print(
f"Warning: Could not read styles from {template_path}. Using default styles."
)
style_content = "body { font-family: sans-serif; } /* Basic fallback styles */"
# --- Group and process data ---
total_runs = len(data)
successful_runs = sum(1 for run in data if run.get("status") == "success")
success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0
# Group first by module, then by test name
grouped_by_module = {}
for run in data:
module_name = run.get("module_name", "Unknown Module")
test_name = run.get("test_name", "Unnamed Test")
if module_name not in grouped_by_module:
grouped_by_module[module_name] = {}
if test_name not in grouped_by_module[module_name]:
grouped_by_module[module_name][test_name] = []
grouped_by_module[module_name][test_name].append(run)
# Further group by model within each test
for module_name, tests in grouped_by_module.items():
for test_name, runs in tests.items():
grouped_by_model = {}
for run in runs:
model_name = run.get("model_name", "Unnamed Model")
grouped_by_model.setdefault(model_name, []).append(run)
grouped_by_module[module_name][test_name] = grouped_by_model
# --- Build HTML body content ---
body_content = f"""
<h1>MCP E2E Static Test Report</h1>
<div id="summary">
<h2>Summary</h2>
<p>Total Tests Run: <span>{total_runs}</span></p>
<p>Success Rate: <span>{success_rate:.2f}%</span></p>
</div>
<div id="results-container">
"""
for module_name, tests in sorted(grouped_by_module.items()):
body_content += f'<div class="module-group"><h2>{escape(module_name)}</h2>'
for test_name, models in sorted(tests.items()):
body_content += f'<div class="test-group"><h3>{escape(test_name)}</h3>'
for model_name, runs in sorted(models.items()):
body_content += (
f'<div class="model-group"><h4>{escape(model_name)}</h4>'
)
body_content += '<div class="run-grid">'
for run in sorted(runs, key=lambda x: x.get("run_number", 0)):
status_class = escape(run.get("status", "unknown"))
run_html = f"""
<div class="test-run {status_class}">
<h5>Run {run.get("run_number", "#")} - {status_class.upper()}</h5>
"""
if status_class == "failure" and run.get("failure_reason"):
reason = escape(run["failure_reason"])
run_html += f'<p><strong>Failure Reason:</strong></p><pre class="failure-reason"><code>{reason}</code></pre>'
agent_result = escape(
run.get("agent_result", "No result") or "No result"
)
run_html += f"""
<details>
<summary>Agent Result</summary>
<div class="agent-result"><pre><code>{agent_result}</code></pre></div>
</details>
"""
if run.get("tools_used"):
tools_json = escape(json.dumps(run["tools_used"], indent=2))
run_html += f"""
<details>
<summary>Tools Used ({len(run["tools_used"])})</summary>
<div class="tools-content"><pre><code>{tools_json}</code></pre></div>
</details>
"""
else:
run_html += "<p>No tools were used.</p>"
run_html += "</div>"
body_content += run_html
body_content += "</div></div>"
body_content += "</div>"
body_content += "</div>"
body_content += "</div>"
# --- Assemble the final HTML ---
full_html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Static Test Results</title>
<style>{style_content}</style>
</head>
<body>
{body_content}
</body>
</html>
"""
with open(output_path, "w", encoding="utf-8") as f:
f.write(full_html)
print(f"Successfully generated static report: {output_path}")
if __name__ == "__main__":
test_results_path = sys.argv[1] if len(sys.argv) > 1 else "test_results.json"
try:
with open(test_results_path, "r", encoding="utf-8") as f:
test_data = json.load(f)
generate_static_report(test_data)
except FileNotFoundError:
print("Error: test_results.json not found. Please run the tests first.")
except json.JSONDecodeError:
print(
"Error: Could not parse test_results.json. The file might be corrupted or empty."
)
```
--------------------------------------------------------------------------------
/tests/common/test_errors.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the error handling utilities.
"""
import unittest
from unittest.mock import patch
from falcon_mcp.common.api_scopes import API_SCOPE_REQUIREMENTS, get_required_scopes
from falcon_mcp.common.errors import (
APIError,
AuthenticationError,
FalconError,
_format_error_response,
handle_api_response,
is_success_response,
)
class TestErrorClasses(unittest.TestCase):
"""Test cases for the error classes."""
def test_falcon_error(self):
"""Test FalconError class."""
error = FalconError("Test error")
self.assertEqual(str(error), "Test error")
def test_authentication_error(self):
"""Test AuthenticationError class."""
error = AuthenticationError("Authentication failed")
self.assertEqual(str(error), "Authentication failed")
self.assertIsInstance(error, FalconError)
def test_api_error(self):
"""Test APIError class."""
error = APIError(
"API request failed",
status_code=403,
body={"errors": [{"message": "Access denied"}]},
operation="TestOperation",
)
self.assertEqual(str(error), "API request failed")
self.assertEqual(error.status_code, 403)
self.assertEqual(error.body, {"errors": [{"message": "Access denied"}]})
self.assertEqual(error.operation, "TestOperation")
self.assertIsInstance(error, FalconError)
class TestErrorUtils(unittest.TestCase):
"""Test cases for the error utility functions."""
def test_is_success_response(self):
"""Test is_success_response function."""
# Success response
self.assertTrue(is_success_response({"status_code": 200}))
# Error responses
self.assertFalse(is_success_response({"status_code": 400}))
self.assertFalse(is_success_response({"status_code": 403}))
self.assertFalse(is_success_response({"status_code": 500}))
self.assertFalse(is_success_response({})) # Missing status_code
def test_get_required_scopes(self):
"""Test get_required_scopes function."""
# Known operation
self.assertEqual(get_required_scopes("GetQueriesAlertsV2"), ["Alerts:read"])
# Unknown operation
self.assertEqual(get_required_scopes("UnknownOperation"), [])
@patch("falcon_mcp.common.errors.logger")
def test_format_error_response(self, mock_logger):
"""Test format_error_response function."""
# Basic error
response = _format_error_response("Test error")
self.assertEqual(response, {"error": "Test error"})
mock_logger.error.assert_called_with("Error: %s", "Test error")
# Error with details
details = {"status_code": 400, "body": {"errors": [{"message": "Bad request"}]}}
response = _format_error_response("Test error", details=details)
self.assertEqual(response["error"], "Test error")
self.assertEqual(response["details"], details)
# Permission error with operation
details = {
"status_code": 403,
"body": {"errors": [{"message": "Access denied"}]},
}
response = _format_error_response(
"Permission denied", details=details, operation="GetQueriesAlertsV2"
)
self.assertEqual(response["error"], "Permission denied")
self.assertEqual(response["details"], details)
self.assertEqual(response["required_scopes"], ["Alerts:read"])
self.assertIn("resolution", response)
self.assertIn("Alerts:read", response["resolution"])
def test_handle_api_response_success(self):
"""Test handle_api_response function with success response."""
# Success response with resources
response = {
"status_code": 200,
"body": {"resources": [{"id": "test", "name": "Test Resource"}]},
}
result = handle_api_response(response, "TestOperation")
self.assertEqual(result, [{"id": "test", "name": "Test Resource"}])
# Success response with empty resources
response = {"status_code": 200, "body": {"resources": []}}
result = handle_api_response(response, "TestOperation")
self.assertEqual(result, [])
# Success response with empty resources and default
response = {"status_code": 200, "body": {"resources": []}}
result = handle_api_response(
response, "TestOperation", default_result={"default": True}
)
self.assertEqual(result, {"default": True})
def test_handle_api_response_error(self):
"""Test handle_api_response function with error response."""
# Error response
response = {
"status_code": 400,
"body": {"errors": [{"message": "Bad request"}]},
}
result = handle_api_response(
response,
"TestOperation",
error_message="Test failed",
)
self.assertIn("error", result)
self.assertIn("Test failed", result["error"])
self.assertEqual(result["details"], response)
# Permission error
response = {
"status_code": 403,
"body": {"errors": [{"message": "Access denied"}]},
}
# Add a test operation to API_SCOPE_REQUIREMENTS
original_scopes = API_SCOPE_REQUIREMENTS.copy()
API_SCOPE_REQUIREMENTS["TestOperation"] = ["test:read"]
try:
result = handle_api_response(
response,
"TestOperation",
error_message="Permission denied",
)
self.assertIn("error", result)
self.assertIn("Permission denied", result["error"])
self.assertIn("Required scopes: test:read", result["error"])
self.assertEqual(result["details"], response)
finally:
# Restore original API_SCOPE_REQUIREMENTS
API_SCOPE_REQUIREMENTS.clear()
API_SCOPE_REQUIREMENTS.update(original_scopes)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/falcon_mcp/common/utils.py:
--------------------------------------------------------------------------------
```python
"""
Common utility functions for Falcon MCP Server
This module provides common utility functions for the Falcon MCP server.
"""
import re
from typing import Any, Dict, List, Optional, Tuple
from .errors import _format_error_response, is_success_response
from .logging import get_logger
logger = get_logger(__name__)
def filter_none_values(data: Dict[str, Any]) -> Dict[str, Any]:
"""Remove None values from a dictionary.
Args:
data: Dictionary to filter
Returns:
Dict[str, Any]: Filtered dictionary
"""
return {k: v for k, v in data.items() if v is not None}
def prepare_api_parameters(params: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare parameters for Falcon API requests.
Args:
params: Raw parameters
Returns:
Dict[str, Any]: Prepared parameters
"""
# Remove None values
filtered = filter_none_values(params)
# Handle special parameter formatting if needed
if "filter" in filtered and isinstance(filtered["filter"], dict):
# Convert filter dict to FQL string if needed
pass
return filtered
def extract_resources(
response: Dict[str, Any],
default: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""Extract resources from an API response.
Args:
response: API response dictionary
default: Default value if no resources are found
Returns:
List[Dict[str, Any]]: Extracted resources
"""
if not is_success_response(response):
return default if default is not None else []
resources = response.get("body", {}).get("resources", [])
return resources if resources else (default if default is not None else [])
def extract_first_resource(
response: Dict[str, Any],
operation: str,
not_found_error: str = "Resource not found",
) -> Dict[str, Any]:
"""Extract the first resource from an API response.
Args:
response: API response dictionary
operation: The API operation that was performed
not_found_error: Error message if no resources are found
Returns:
Dict[str, Any]: First resource or error response
"""
resources = extract_resources(response)
if not resources:
return _format_error_response(not_found_error, operation=operation)
return resources[0]
def sanitize_input(input_str: str) -> str:
"""Sanitize input string.
Args:
input_str: Input string to sanitize
Returns:
Sanitized string with dangerous characters removed
"""
if not isinstance(input_str, str):
return str(input_str)
# Remove backslashes, quotes, and control characters that could be used for injection
sanitized = re.sub(r'[\\"\'\n\r\t]', "", input_str)
# Additional safety: limit length to prevent excessively long inputs
return sanitized[:255]
def generate_md_table(data: List[Tuple]) -> str:
"""Generate a Markdown table from a list of tuples.
This function creates a compact Markdown table with the provided data.
It's designed to minimize token usage while maintaining readability.
The first row of data is used as the header row.
Args:
data: List of tuples where the first tuple contains the headers
and the remaining tuples contain the table data
Returns:
str: Formatted Markdown table as a string
Raises:
TypeError: If the first row (headers) contains non-string values
TypeError: If there are not at least 2 items (header and a value row)
ValueError: If the header row is empty
ValueError: If a row has more items than headers
"""
if not data or len(data) < 2:
raise TypeError("Need at least 2 items. The header and a value row")
# Extract headers from the first row
headers = data[0]
# Check that the header row is not empty
if len(headers) == 0:
raise ValueError("Header row cannot be empty")
# Check that all headers are strings
for header in headers:
if not isinstance(header, str):
raise TypeError(f"Header values must be strings, got {type(header).__name__}")
# Use the remaining rows as data
rows = data[1:]
# Create the table header, stripping spaces from header values
header_parts = []
for h in headers:
# Strip spaces from header values
header_parts.append(str(h).strip())
header_row = "|" + "|".join(header_parts) + "|"
# Create the separator row with the exact expected format
separator = "|-" * len(headers) + "|"
# Build the table
table = [header_row, separator]
for idx, row in enumerate(rows):
# Check if row has more items than headers
if len(row) > len(headers):
raise ValueError(f"Row {idx+1} has {len(row)} items, which is more than the {len(headers)} headers")
# Convert row values to strings and handle special cases
row_values = []
for i, value in enumerate(row):
if i < len(headers):
if value is None:
row_values.append("")
elif isinstance(value, bool):
row_values.append(str(value).lower())
elif isinstance(value, (int, float)):
row_values.append(str(value))
else:
# Process multi-line text to create a clean, single-line representation
text = str(value)
# Split text into lines, strip whitespace, and filter out empty lines
non_empty_lines = [line.strip() for line in text.split('\n') if line.strip()]
# Join the non-empty lines with a single space
formatted_text = " ".join(non_empty_lines).strip()
row_values.append(formatted_text)
# Pad the row if it's shorter than headers
while len(row_values) < len(headers):
row_values.append("")
# Add the row to the table
table.append("|" + "|".join(row_values) + "|")
return "\n".join(table)
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/spotlight.py:
--------------------------------------------------------------------------------
```python
"""
Spotlight module for Falcon MCP Server
This module provides tools for accessing and managing CrowdStrike Falcon Spotlight vulnerabilities.
"""
from textwrap import dedent
from typing import Any, Dict, List
from mcp.server import FastMCP
from mcp.server.fastmcp.resources import TextResource
from pydantic import AnyUrl, Field
from falcon_mcp.common.errors import handle_api_response
from falcon_mcp.common.logging import get_logger
from falcon_mcp.common.utils import prepare_api_parameters
from falcon_mcp.modules.base import BaseModule
from falcon_mcp.resources.spotlight import SEARCH_VULNERABILITIES_FQL_DOCUMENTATION
logger = get_logger(__name__)
class SpotlightModule(BaseModule):
"""Module for accessing and managing CrowdStrike Falcon Spotlight vulnerabilities."""
def register_tools(self, server: FastMCP) -> None:
"""Register tools with the MCP server.
Args:
server: MCP server instance
"""
# Register tools
self._add_tool(
server=server,
method=self.search_vulnerabilities,
name="search_vulnerabilities",
)
def register_resources(self, server: FastMCP) -> None:
"""Register resources with the MCP server.
Args:
server: MCP server instance
"""
search_vulnerabilities_fql_resource = TextResource(
uri=AnyUrl("falcon://spotlight/vulnerabilities/fql-guide"),
name="falcon_search_vulnerabilities_fql_guide",
description="Contains the guide for the `filter` param of the `falcon_search_vulnerabilities` tool.",
text=SEARCH_VULNERABILITIES_FQL_DOCUMENTATION,
)
self._add_resource(
server,
search_vulnerabilities_fql_resource,
)
def search_vulnerabilities(
self,
filter: str | None = Field(
default=None,
description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://spotlight/vulnerabilities/fql-guide` resource when building this filter parameter.",
examples={"status:'open'", "cve.severity:'HIGH'"},
),
limit: int = Field(
default=10,
ge=1,
le=5000,
description="Maximum number of results to return. (Max: 5000, Default: 10)",
),
offset: int | None = Field(
default=None,
description="Starting index of overall result set from which to return results.",
),
sort: str | None = Field(
default=None,
description=dedent("""
Sort vulnerabilities using FQL syntax.
Supported sorting fields:
• created_timestamp: When the vulnerability was found
• closed_timestamp: When the vulnerability was closed
• updated_timestamp: When the vulnerability was last updated
Sort either asc (ascending) or desc (descending).
Format: 'field|direction'
Examples: 'created_timestamp|desc', 'updated_timestamp|desc', 'closed_timestamp|asc'
""").strip(),
examples={
"created_timestamp|desc",
"updated_timestamp|desc",
"closed_timestamp|asc",
},
),
after: str | None = Field(
default=None,
description="A pagination token used with the limit parameter to manage pagination of results. On your first request, don't provide an after token. On subsequent requests, provide the after token from the previous response to continue from that place in the results.",
),
facet: str | None = Field(
default=None,
description=dedent("""
Important: Use only one value!
Select various detail blocks to be returned for each vulnerability.
Supported values:
• host_info: Include host/asset information and context
• remediation: Include remediation and fix information
• cve: Include CVE details, scoring, and metadata
• evaluation_logic: Include vulnerability assessment methodology
Use host_info when you need asset context, remediation for fix information,
cve for detailed vulnerability scoring, and evaluation_logic for assessment details.
Examples: 'host_info', 'cve', 'remediation'
""").strip(),
examples={"host_info", "cve", "remediation", "evaluation_logic"},
),
) -> List[Dict[str, Any]]:
"""Search for vulnerabilities in your CrowdStrike environment.
IMPORTANT: You must use the `falcon://spotlight/vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_vulnerabilities` tool.
"""
# Prepare parameters for combinedQueryVulnerabilities
params = prepare_api_parameters(
{
"filter": filter,
"limit": limit,
"offset": offset,
"sort": sort,
"after": after,
"facet": facet,
}
)
# Define the operation name
operation = "combinedQueryVulnerabilities"
logger.debug("Searching vulnerabilities with params: %s", params)
# Make the API request
response = self.client.command(operation, parameters=params)
# Use handle_api_response to get vulnerability data
vulnerabilities = handle_api_response(
response,
operation=operation,
error_message="Failed to search vulnerabilities",
default_result=[],
)
# If handle_api_response returns an error dict instead of a list,
# it means there was an error, so we return it wrapped in a list
if self._is_error(vulnerabilities):
return [vulnerabilities]
return vulnerabilities
```
--------------------------------------------------------------------------------
/tests/test_streamable_http_transport.py:
--------------------------------------------------------------------------------
```python
"""
Tests for streamable-http transport functionality.
"""
import unittest
from unittest.mock import MagicMock, patch
from falcon_mcp.server import FalconMCPServer
class TestStreamableHttpTransport(unittest.TestCase):
"""Test cases for streamable-http transport."""
@patch("falcon_mcp.server.FalconClient")
@patch("falcon_mcp.server.FastMCP")
@patch("falcon_mcp.server.uvicorn")
def test_streamable_http_transport_initialization(
self,
mock_uvicorn,
mock_fastmcp,
mock_client,
):
"""Test streamable-http transport initialization."""
# Setup mocks
mock_client_instance = MagicMock()
mock_client_instance.authenticate.return_value = True
mock_client.return_value = mock_client_instance
mock_server_instance = MagicMock()
mock_app = MagicMock()
mock_server_instance.streamable_http_app.return_value = mock_app
mock_fastmcp.return_value = mock_server_instance
# Create server
server = FalconMCPServer(debug=True)
# Test streamable-http transport
server.run("streamable-http", host="0.0.0.0", port=8080)
# Verify uvicorn was called with correct parameters
mock_uvicorn.run.assert_called_once_with(
mock_app, host="0.0.0.0", port=8080, log_level="debug"
)
# Verify streamable_http_app was called
mock_server_instance.streamable_http_app.assert_called_once()
@patch("falcon_mcp.server.FalconClient")
@patch("falcon_mcp.server.FastMCP")
@patch("falcon_mcp.server.uvicorn")
def test_streamable_http_default_parameters(
self,
mock_uvicorn,
mock_fastmcp,
mock_client,
):
"""Test streamable-http transport with default parameters."""
# Setup mocks
mock_client_instance = MagicMock()
mock_client_instance.authenticate.return_value = True
mock_client.return_value = mock_client_instance
mock_server_instance = MagicMock()
mock_app = MagicMock()
mock_server_instance.streamable_http_app.return_value = mock_app
mock_fastmcp.return_value = mock_server_instance
# Create server
server = FalconMCPServer(debug=False)
# Test streamable-http transport with defaults
server.run("streamable-http")
# Verify uvicorn was called with default parameters
mock_uvicorn.run.assert_called_once_with(
mock_app,
host="127.0.0.1",
port=8000,
log_level="info",
)
@patch("falcon_mcp.server.FalconClient")
@patch("falcon_mcp.server.FastMCP")
def test_non_streamable_http_transport_unchanged(
self,
mock_fastmcp,
mock_client,
):
"""Test that non-streamable-http transports use the original method."""
# Setup mocks
mock_client_instance = MagicMock()
mock_client_instance.authenticate.return_value = True
mock_client.return_value = mock_client_instance
mock_server_instance = MagicMock()
mock_fastmcp.return_value = mock_server_instance
# Create server
server = FalconMCPServer()
# Test stdio transport (should use original method)
server.run("stdio")
# Verify the original run method was called
mock_server_instance.run.assert_called_once_with("stdio")
# Verify streamable_http_app was NOT called
mock_server_instance.streamable_http_app.assert_not_called()
@patch("falcon_mcp.server.FalconClient")
@patch("falcon_mcp.server.FastMCP")
@patch("falcon_mcp.server.uvicorn")
def test_streamable_http_custom_parameters(
self,
mock_uvicorn,
mock_fastmcp,
mock_client,
):
"""Test streamable-http transport with custom parameters."""
# Setup mocks
mock_client_instance = MagicMock()
mock_client_instance.authenticate.return_value = True
mock_client.return_value = mock_client_instance
mock_server_instance = MagicMock()
mock_app = MagicMock()
mock_server_instance.streamable_http_app.return_value = mock_app
mock_fastmcp.return_value = mock_server_instance
# Create server
server = FalconMCPServer(debug=True)
# Test streamable-http transport with custom parameters
server.run("streamable-http", host="192.168.1.100", port=9000)
# Verify uvicorn was called with custom parameters
mock_uvicorn.run.assert_called_once_with(
mock_app,
host="192.168.1.100",
port=9000,
log_level="debug",
)
@patch("falcon_mcp.server.FalconClient")
@patch("falcon_mcp.server.FastMCP")
@patch("falcon_mcp.server.uvicorn")
def test_streamable_http_logging_levels(
self,
mock_uvicorn,
mock_fastmcp,
mock_client,
):
"""Test streamable-http transport logging level configuration."""
# Setup mocks
mock_client_instance = MagicMock()
mock_client_instance.authenticate.return_value = True
mock_client.return_value = mock_client_instance
mock_server_instance = MagicMock()
mock_app = MagicMock()
mock_server_instance.streamable_http_app.return_value = mock_app
mock_fastmcp.return_value = mock_server_instance
# Test with debug=True
server_debug = FalconMCPServer(debug=True)
server_debug.run("streamable-http")
# Verify debug log level
mock_uvicorn.run.assert_called_with(
mock_app,
host="127.0.0.1",
port=8000,
log_level="debug",
)
# Reset mock
mock_uvicorn.reset_mock()
# Test with debug=False
server_info = FalconMCPServer(debug=False)
server_info.run("streamable-http")
# Verify info log level
mock_uvicorn.run.assert_called_with(
mock_app,
host="127.0.0.1",
port=8000,
log_level="info",
)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/tests/modules/test_base.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the Base module.
"""
import unittest
from falcon_mcp.modules.base import BaseModule
from tests.modules.utils.test_modules import TestModules
class ConcreteBaseModule(BaseModule):
"""Concrete implementation of BaseModule for testing."""
def register_tools(self, server):
"""Implement abstract method."""
class TestBaseModule(TestModules):
"""Test cases for the Base module."""
def setUp(self):
"""Set up test fixtures."""
self.setup_module(ConcreteBaseModule)
def test_is_error_with_error_dict(self):
"""Test _is_error with a dictionary containing an error key."""
response = {"error": "Something went wrong", "details": "Error details"}
result = self.module._is_error(response)
self.assertTrue(result)
def test_is_error_with_non_error_dict(self):
"""Test _is_error with a dictionary not containing an error key."""
response = {"status": "success", "data": "Some data"}
result = self.module._is_error(response)
self.assertFalse(result)
def test_is_error_with_non_dict(self):
"""Test _is_error with a non-dictionary value."""
# Test with a list
response = ["item1", "item2"]
result = self.module._is_error(response)
self.assertFalse(result)
# Test with a string
response = "This is a string response"
result = self.module._is_error(response)
self.assertFalse(result)
# Test with None
response = None
result = self.module._is_error(response)
self.assertFalse(result)
# Test with an integer
response = 42
result = self.module._is_error(response)
self.assertFalse(result)
def test_base_get_by_ids_default_behavior(self):
"""Test _base_get_by_ids with default parameters (backward compatibility)."""
# Setup mock response
mock_response = {
"status_code": 200,
"body": {
"resources": [
{"id": "test1", "name": "Test Item 1"},
{"id": "test2", "name": "Test Item 2"},
]
},
}
self.mock_client.command.return_value = mock_response
# Call _base_get_by_ids with default parameters
result = self.module._base_get_by_ids("TestOperation", ["test1", "test2"])
# Verify client command was called correctly with default "ids" key
self.mock_client.command.assert_called_once_with(
"TestOperation", body={"ids": ["test1", "test2"]}
)
# Verify result
expected_result = [
{"id": "test1", "name": "Test Item 1"},
{"id": "test2", "name": "Test Item 2"},
]
self.assertEqual(result, expected_result)
def test_base_get_by_ids_custom_id_key(self):
"""Test _base_get_by_ids with custom id_key parameter."""
# Setup mock response
mock_response = {
"status_code": 200,
"body": {
"resources": [
{"composite_id": "alert1", "status": "new"},
{"composite_id": "alert2", "status": "closed"},
]
},
}
self.mock_client.command.return_value = mock_response
# Call _base_get_by_ids with custom id_key
result = self.module._base_get_by_ids(
"PostEntitiesAlertsV2", ["alert1", "alert2"], id_key="composite_ids"
)
# Verify client command was called correctly with custom key
self.mock_client.command.assert_called_once_with(
"PostEntitiesAlertsV2", body={"composite_ids": ["alert1", "alert2"]}
)
# Verify result
expected_result = [
{"composite_id": "alert1", "status": "new"},
{"composite_id": "alert2", "status": "closed"},
]
self.assertEqual(result, expected_result)
def test_base_get_by_ids_with_additional_params(self):
"""Test _base_get_by_ids with additional parameters."""
# Setup mock response
mock_response = {
"status_code": 200,
"body": {
"resources": [
{"composite_id": "alert1", "status": "new", "hidden": False}
]
},
}
self.mock_client.command.return_value = mock_response
# Call _base_get_by_ids with additional parameters
result = self.module._base_get_by_ids(
"PostEntitiesAlertsV2",
["alert1"],
id_key="composite_ids",
include_hidden=True,
sort_by="created_timestamp",
)
# Verify client command was called correctly with all parameters
self.mock_client.command.assert_called_once_with(
"PostEntitiesAlertsV2",
body={
"composite_ids": ["alert1"],
"include_hidden": True,
"sort_by": "created_timestamp",
},
)
# Verify result
expected_result = [{"composite_id": "alert1", "status": "new", "hidden": False}]
self.assertEqual(result, expected_result)
def test_base_get_by_ids_error_handling(self):
"""Test _base_get_by_ids error handling."""
# Setup mock error response
mock_response = {
"status_code": 400,
"body": {"errors": [{"message": "Invalid request"}]},
}
self.mock_client.command.return_value = mock_response
# Call _base_get_by_ids
result = self.module._base_get_by_ids("TestOperation", ["invalid_id"])
# Verify error handling - should return error dict
self.assertIn("error", result)
self.assertIn("Failed to perform operation", result["error"])
def test_base_get_by_ids_empty_response(self):
"""Test _base_get_by_ids with empty resources."""
# Setup mock response with empty resources
mock_response = {"status_code": 200, "body": {"resources": []}}
self.mock_client.command.return_value = mock_response
# Call _base_get_by_ids
result = self.module._base_get_by_ids("TestOperation", ["nonexistent"])
# Verify result is empty list
self.assertEqual(result, [])
if __name__ == "__main__":
unittest.main()
```