# Directory Structure
```
├── .gitattributes
├── .github
│ └── FUNDING.yml
├── BloodHound-MCP.py
├── images
│ └── BloodHound-MCP-Banner.png
├── README.md
└── requirements.txt
```
# Files
--------------------------------------------------------------------------------
/.gitattributes:
--------------------------------------------------------------------------------
```
# Auto detect text files and perform LF normalization
* text=auto
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# BloodHound-MCP

## Model Context Protocol (MCP) Server for BloodHound
BloodHound-MCP is a powerful integration that brings the capabilities of Model Context Procotol (MCP) Server to BloodHound, the industry-standard tool for Active Directory security analysis. This integration allows you to analyze BloodHound data using natural language, making complex Active Directory attack path analysis accessible to everyone.
> 🥇 **First-Ever BloodHound AI Integration!**
> This is the first integration that connects BloodHound with AI through MCP, [originally announced here](https://www.linkedin.com/posts/mor-david-cyber_bloodhound-ai-cybersec-activity-7310921541213470721-N390).
## 🔍 What is BloodHound-MCP?
BloodHound-MCP combines the power of:
- **BloodHound**: Industry-standard tool for visualizing and analyzing Active Directory attack paths
- **Model Context Protocol (MCP)**: An open protocol for creating custom AI tools, compatible with various AI models
- **Neo4j**: Graph database used by BloodHound to store AD relationship data
With over 75 specialized tools based on the original BloodHound CE Cypher queries, BloodHound-MCP allows security professionals to:
- Query BloodHound data using natural language
- Discover complex attack paths in Active Directory environments
- Assess Active Directory security posture more efficiently
- Generate detailed security reports for stakeholders
## 📱 Community
Join our Telegram channel for updates, tips, and discussion:
- **Telegram**: [root_sec](https://t.me/root_sec)
## 🌟 Star History
[](https://www.star-history.com/#MorDavid/BloodHound-MCP-AI&Date)
## ✨ Features
- **Natural Language Interface**: Query BloodHound data using plain English
- **Comprehensive Analysis Categories**:
- Domain structure mapping
- Privilege escalation paths
- Kerberos security issues (Kerberoasting, AS-REP Roasting)
- Certificate services vulnerabilities
- Active Directory hygiene assessment
- NTLM relay attack vectors
- Delegation abuse opportunities
- And much more!
## 📋 Prerequisites
- BloodHound 4.x+ with data collected from an Active Directory environment
- Neo4j database with BloodHound data loaded
- Python 3.8 or higher
- MCP Client
## 🔧 Installation
1. Clone this repository:
```bash
git clone https://github.com/your-username/MCP-BloodHound.git
cd MCP-BloodHound
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure the MCP Server
```bash
"mcpServers": {
"BloodHound-MCP": {
"command": "python",
"args": [
"<Your_Path>\\BloodHound-MCP.py"
],
"env": {
"BLOODHOUND_URI": "bolt://localhost:7687",
"BLOODHOUND_USERNAME": "neo4j",
"BLOODHOUND_PASSWORD": "bloodhoundcommunityedition"
}
}
}
```
## 🚀 Usage
Example queries you can ask through the MCP:
- "Show me all paths from kerberoastable users to Domain Admins"
- "Find computers where Domain Users have local admin rights"
- "Identify Domain Controllers vulnerable to NTLM relay attacks"
- "Map all Active Directory certificate services vulnerabilities"
- "Generate a comprehensive security report for my domain"
- "Find inactive privileged accounts"
- "Show me attack paths to high-value targets"
## 🔐 Security Considerations
This tool is designed for legitimate security assessment purposes. Always:
- Obtain proper authorization before analyzing any Active Directory environment
- Handle BloodHound data as sensitive information
- Follow responsible disclosure practices for any vulnerabilities discovered
## 📜 License
This project is licensed under the MIT License - see the LICENSE file for details.
## 🙏 Acknowledgments
- The BloodHound team for creating an amazing Active Directory security tool
- The security community for continuously advancing AD security practices
[](https://mseep.ai/app/09d13f50-8965-4ebf-b4bf-d6bb98e8f092)
---
*Note: This is not an official Anthropic product. BloodHound-MCP is a community-driven integration between BloodHound and MCP.*
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
neo4j
python-dotenv
mcp-server>=0.1.0
fastmcp
```
--------------------------------------------------------------------------------
/.github/FUNDING.yml:
--------------------------------------------------------------------------------
```yaml
# These are supported funding model platforms
github: mordavid
patreon: mordavid
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
polar: # Replace with a single Polar username
buy_me_a_coffee: mordavid
thanks_dev: # Replace with a single thanks.dev username
custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']
```
--------------------------------------------------------------------------------
/BloodHound-MCP.py:
--------------------------------------------------------------------------------
```python
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
from neo4j import GraphDatabase
import os
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# BloodHound & Neo4j connection details
BLOODHOUND_URI = os.getenv("BLOODHOUND_URI", "bolt://localhost:7687")
BLOODHOUND_USERNAME = os.getenv("BLOODHOUND_USERNAME", "neo4j")
BLOODHOUND_PASSWORD = os.getenv("BLOODHOUND_PASSWORD", "bloodhound")
logger.debug(f"Using Neo4j connection details:")
logger.debug(f"URI: {BLOODHOUND_URI}")
logger.debug(f"User: {BLOODHOUND_USERNAME}")
# Create Neo4j driver with BloodHound CE specific settings
driver = GraphDatabase.driver(
BLOODHOUND_URI,
auth=(BLOODHOUND_USERNAME, BLOODHOUND_PASSWORD),
encrypted=False
)
# Verify connection
def verify_connectivity():
try:
# Try both default and bloodhound databases
databases = ["neo4j", "bloodhound"]
for db in databases:
try:
with driver.session(database=db) as session:
logger.debug(f"Attempting to verify connection to database '{db}'...")
result = session.run("MATCH (n:User) RETURN count(n) as count")
count = result.single()["count"]
logger.info(f"Successfully connected to database '{db}'. Found {count} users.")
return True
except Exception as e:
logger.debug(f"Failed to connect to database '{db}': {str(e)}")
continue
raise Exception("Could not connect to any database")
except Exception as e:
logger.error(f"Failed to connect to Neo4j: {str(e)}")
return False
# Create FastMCP server for BloodHound
mcp = FastMCP("BH-Examples")
@mcp.tool()
async def query_bloodhound(query: str):
databases = ["neo4j", "bloodhound"]
last_error = None
for db in databases:
try:
with driver.session(database=db) as session:
result = session.run(query)
data = [record.data() for record in result]
logger.info(f"Query successful on database '{db}'")
return {"success": True, "data": data}
except Exception as e:
last_error = e
logger.debug(f"Query failed on database '{db}': {str(e)}")
continue
logger.error(f"Query failed on all databases. Last error: {str(last_error)}")
return {"success": False, "error": str(last_error)}
# Domain Information
@mcp.tool()
async def find_all_domain_admins():
query = """
MATCH p = (t:Group)<-[:MemberOf*1..]-(a)
WHERE (a:User or a:Computer) and t.objectid ENDS WITH '-512'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def map_domain_trusts():
query = """
MATCH p = (:Domain)-[:TrustedBy]->(:Domain)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_tier_zero_locations():
query = """
MATCH p = (t:Base)<-[:Contains*1..]-(:Domain)
WHERE t.highvalue = true
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def map_ou_structure():
query = """
MATCH p = (:Domain)-[:Contains*1..]->(:OU)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
# Dangerous Privileges
@mcp.tool()
async def find_dcsync_privileges():
query = """
MATCH p=(:Base)-[:DCSync|AllExtendedRights|GenericAll]->(:Domain)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_foreign_group_memberships():
query = """
MATCH p=(s:Base)-[:MemberOf]->(t:Group)
WHERE s.domainsid<>t.domainsid
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_users_local_admins():
query = """
MATCH p=(s:Group)-[:AdminTo]->(:Computer)
WHERE s.objectid ENDS WITH '-513'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_users_laps_readers():
query = """
MATCH p=(s:Group)-[:AllExtendedRights|ReadLAPSPassword]->(:Computer)
WHERE s.objectid ENDS WITH '-513'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_users_high_value_paths():
query = """
MATCH p=shortestPath((s:Group)-[r*1..]->(t))
WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_users_workstation_rdp():
query = """
MATCH p=(s:Group)-[:CanRDP]->(t:Computer)
WHERE s.objectid ENDS WITH '-513' AND NOT toUpper(t.operatingsystem) CONTAINS 'SERVER'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_users_server_rdp():
query = """
MATCH p=(s:Group)-[:CanRDP]->(t:Computer)
WHERE s.objectid ENDS WITH '-513' AND toUpper(t.operatingsystem) CONTAINS 'SERVER'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_users_privileges():
query = """
MATCH p=(s:Group)-[r]->(:Base)
WHERE s.objectid ENDS WITH '-513'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domain_admin_non_dc_logons():
query = """
MATCH (s)-[:MemberOf*0..]->(g:Group)
WHERE g.objectid ENDS WITH '-516'
WITH COLLECT(s) AS exclude
MATCH p = (c:Computer)-[:HasSession]->(:User)-[:MemberOf*1..]->(g:Group)
WHERE g.objectid ENDS WITH '-512' AND NOT c IN exclude
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
# Kerberos Interaction
@mcp.tool()
async def find_kerberoastable_tier_zero():
query = """
MATCH (u:User)
WHERE u.hasspn=true
AND u.enabled = true
AND NOT u.objectid ENDS WITH '-502'
AND NOT u.gmsa = true
AND NOT u.msa = true
AND u.highvalue = true
RETURN u
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_all_kerberoastable_users():
query = """
MATCH (u:User)
WHERE u.hasspn=true
AND u.enabled = true
AND NOT u.objectid ENDS WITH '-502'
AND NOT u.gmsa = true
AND NOT u.msa = true
RETURN u
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_kerberoastable_most_admin():
query = """
MATCH (u:User)
WHERE u.hasspn = true
AND u.enabled = true
AND NOT u.objectid ENDS WITH '-502'
AND NOT u.gmsa = true
AND NOT u.msa = true
MATCH (u)-[:MemberOf|AdminTo*1..]->(c:Computer)
WITH DISTINCT u, COUNT(c) AS adminCount
RETURN u
ORDER BY adminCount DESC
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_asreproast_users():
query = """
MATCH (u:User)
WHERE u.dontreqpreauth = true
AND u.enabled = true
RETURN u
LIMIT 100
"""
return await query_bloodhound(query)
# Shortest Paths
@mcp.tool()
async def find_shortest_paths_unconstrained_delegation():
query = """
MATCH p=shortestPath((s)-[r*1..]->(t:Computer))
WHERE t.unconstraineddelegation = true AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_paths_from_kerberoastable_to_da():
query = """
MATCH p=shortestPath((s:User)-[r*1..]->(t:Group))
WHERE s.hasspn=true
AND s.enabled = true
AND NOT s.objectid ENDS WITH '-502'
AND NOT s.gmsa = true
AND NOT s.msa = true
AND t.objectid ENDS WITH '-512'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_shortest_paths_to_tier_zero():
query = """
MATCH p=shortestPath((s)-[r*1..]->(t))
WHERE t.highvalue = true AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_paths_from_domain_users_to_tier_zero():
query = """
MATCH p=shortestPath((s:Group)-[r*1..]->(t))
WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_shortest_paths_to_domain_admins():
query = """
MATCH p=shortestPath((t:Group)<-[r*1..]-(s:Base))
WHERE t.objectid ENDS WITH '-512' AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_paths_from_owned_objects():
query = """
MATCH p=shortestPath((s:Base)-[r*1..]->(t:Base))
WHERE s.owned = true AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
# Active Directory Certificate Services
@mcp.tool()
async def find_pki_hierarchy():
query = """
MATCH p=()-[:HostsCAService|IssuedSignedBy|EnterpriseCAFor|RootCAFor|TrustedForNTAuth|NTAuthStoreFor*..]->(:Domain)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_public_key_services():
query = """
MATCH p = (c:Container)-[:Contains*..]->(:Base)
WHERE c.distinguishedname starts with 'CN=PUBLIC KEY SERVICES,CN=SERVICES,CN=CONFIGURATION,DC='
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_certificate_enrollment_rights():
query = """
MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_esc1_vulnerable_templates():
query = """
MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
WHERE ct.enrolleesuppliessubject = True
AND ct.authenticationenabled = True
AND ct.requiresmanagerapproval = False
AND (ct.authorizedsignatures = 0 OR ct.schemaversion = 1)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_esc2_vulnerable_templates():
query = """
MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(c:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
WHERE c.requiresmanagerapproval = false
AND (c.effectiveekus = [''] OR '2.5.29.37.0' IN c.effectiveekus)
AND (c.authorizedsignatures = 0 OR c.schemaversion = 1)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_enrollment_agent_templates():
query = """
MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
WHERE '1.3.6.1.4.1.311.20.2.1' IN ct.effectiveekus
OR '2.5.29.37.0' IN ct.effectiveekus
OR SIZE(ct.effectiveekus) = 0
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_dcs_weak_certificate_binding():
query = """
MATCH p = (s:Computer)-[:DCFor]->(:Domain)
WHERE s.strongcertificatebindingenforcementraw = 0 OR s.strongcertificatebindingenforcementraw = 1
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_inactive_tier_zero_principals():
query = """
WITH 60 as inactive_days
MATCH (n:Base)
WHERE n.highvalue = true
AND n.enabled = true
AND n.lastlogontimestamp < (datetime().epochseconds - (inactive_days * 86400))
AND n.lastlogon < (datetime().epochseconds - (inactive_days * 86400))
AND n.whencreated < (datetime().epochseconds - (inactive_days * 86400))
AND NOT n.name STARTS WITH 'AZUREADKERBEROS.'
AND NOT n.objectid ENDS WITH '-500'
AND NOT n.name STARTS WITH 'AZUREADSSOACC.'
RETURN n
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_tier_zero_without_smartcard():
query = """
MATCH (u:User)
WHERE u.highvalue = true
AND u.enabled = true
AND u.smartcardrequired = false
AND NOT u.name STARTS WITH 'MSOL_'
AND NOT u.name STARTS WITH 'PROVAGENTGMSA'
AND NOT u.name STARTS WITH 'ADSYNCMSA_'
RETURN u
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_domains_with_machine_quota():
query = """
MATCH (d:Domain)
WHERE d.machineaccountquota > 0
RETURN d
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_smartcard_dont_expire_domains():
query = """
MATCH (s:Domain)-[:Contains*1..]->(t:Base)
WHERE s.expirepasswordsonsmartcardonlyaccounts = false
AND t.enabled = true
AND t.smartcardrequired = true
RETURN s
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_two_way_forest_trust_delegation():
query = """
MATCH p=(n:Domain)-[r:TrustedBy]->(m:Domain)
WHERE (m)-[:TrustedBy]->(n)
AND r.trusttype = 'Forest'
AND r.tgtdelegationenabled = true
RETURN p
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_unsupported_operating_systems():
query = """
MATCH (c:Computer)
WHERE c.operatingsystem =~ '(?i).*Windows.* (2000|2003|2008|2012|xp|vista|7|8|me|nt).*'
RETURN c
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_users_with_no_password_required():
query = """
MATCH (u:User)
WHERE u.passwordnotreqd = true
RETURN u
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_users_password_not_rotated():
query = """
WITH 365 as days_since_change
MATCH (u:User)
WHERE u.pwdlastset < (datetime().epochseconds - (days_since_change * 86400))
AND NOT u.pwdlastset IN [-1.0, 0.0]
RETURN u
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_nested_tier_zero_groups():
query = """
MATCH p=(t:Group)<-[:MemberOf*..]-(s:Group)
WHERE t.highvalue = true
AND NOT s.objectid ENDS WITH '-512'
AND NOT s.objectid ENDS WITH '-519'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_disabled_tier_zero_principals():
query = """
MATCH (n:Base)
WHERE n.highvalue = true
AND n.enabled = false
AND NOT n.objectid ENDS WITH '-502'
AND NOT n.objectid ENDS WITH '-500'
RETURN n
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_principals_reversible_encryption():
query = """
MATCH (n:Base)
WHERE n.encryptedtextpwdallowed = true
RETURN n
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_principals_des_only_kerberos():
query = """
MATCH (n:Base)
WHERE n.enabled = true
AND n.usedeskeyonly = true
RETURN n
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_principals_weak_kerberos_encryption():
query = """
MATCH (u:Base)
WHERE 'DES-CBC-CRC' IN u.supportedencryptiontypes
OR 'DES-CBC-MD5' IN u.supportedencryptiontypes
OR 'RC4-HMAC-MD5' IN u.supportedencryptiontypes
RETURN u
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_tier_zero_non_expiring_passwords():
query = """
MATCH (u:User)
WHERE u.enabled = true
AND u.pwdneverexpires = true
AND u.highvalue = true
RETURN u
LIMIT 100
"""
return await query_bloodhound(query)
# NTLM Relay Attacks
@mcp.tool()
async def find_ntlm_relay_edges():
query = """
MATCH p = (n:Base)-[:CoerceAndRelayNTLMToLDAP|CoerceAndRelayNTLMToLDAPS|CoerceAndRelayNTLMToADCS|CoerceAndRelayNTLMToSMB]->(:Base)
RETURN p LIMIT 500
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_esc8_vulnerable_cas():
query = """
MATCH (n:EnterpriseCA)
WHERE n.hasvulnerableendpoint=true
RETURN n
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_computers_outbound_ntlm_deny():
query = """
MATCH (c:Computer)
WHERE c.restrictoutboundntlm = True
RETURN c LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_computers_in_protected_users():
query = """
MATCH p = (:Base)-[:MemberOf*1..]->(g:Group)
WHERE g.objectid ENDS WITH "-525"
RETURN p LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_dcs_vulnerable_ntlm_relay():
query = """
MATCH p = (dc:Computer)-[:DCFor]->(:Domain)
WHERE (dc.ldapavailable = True AND dc.ldapsigning = False)
OR (dc.ldapsavailable = True AND dc.ldapsepa = False)
OR (dc.ldapavailable = True AND dc.ldapsavailable = True AND dc.ldapsigning = False and dc.ldapsepa = True)
RETURN p
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_computers_webclient_running():
query = """
MATCH (c:Computer)
WHERE c.webclientrunning = True
RETURN c LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_computers_no_smb_signing():
query = """
MATCH (n:Computer)
WHERE n.smbsigning = False
RETURN n
"""
return await query_bloodhound(query)
# Azure - General
@mcp.tool()
async def find_global_administrators():
query = """
MATCH p = (:AZBase)-[:AZGlobalAdmin*1..]->(:AZTenant)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_high_privileged_role_members():
query = """
MATCH p=(t:AZRole)<-[:AZHasRole|AZMemberOf*1..2]-(:AZBase)
WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
# Azure - Shortest Paths
@mcp.tool()
async def find_paths_from_entra_to_tier_zero():
query = """
MATCH p=shortestPath((s:AZUser)-[r*1..]->(t:AZBase))
WHERE t.highvalue = true AND t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_paths_to_privileged_roles():
query = """
MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZRole))
WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_paths_from_azure_apps_to_tier_zero():
query = """
MATCH p=shortestPath((s:AZApp)-[r*1..]->(t:AZBase))
WHERE t.highvalue = true AND s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_paths_to_azure_subscriptions():
query = """
MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZSubscription))
WHERE s<>t
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
# Azure - Microsoft Graph
@mcp.tool("sp_app_role_grant")
async def find_service_principals_with_app_role_grant():
query = """
MATCH p=(:AZServicePrincipal)-[:AZMGGrantAppRoles]->(:AZTenant)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("find_sp_graph_assignments")
async def find_service_principals_with_graph_assignments():
query = """
MATCH p=(:AZServicePrincipal)-[:AZMGAppRoleAssignment_ReadWrite_All|AZMGApplication_ReadWrite_All|AZMGDirectory_ReadWrite_All|AZMGGroupMember_ReadWrite_All|AZMGGroup_ReadWrite_All|AZMGRoleManagement_ReadWrite_Directory|AZMGServicePrincipalEndpoint_ReadWrite_All]->(:AZServicePrincipal)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
# Azure - Hygiene
@mcp.tool()
async def find_foreign_tier_zero_principals():
query = """
MATCH (n:AZServicePrincipal)
WHERE n.highvalue = true
AND NOT toUpper(n.appownerorganizationid) = toUpper(n.tenantid)
AND n.appownerorganizationid CONTAINS '-'
RETURN n
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_synced_tier_zero_principals():
query = """
MATCH (ENTRA:AZBase)
MATCH (AD:Base)
WHERE ENTRA.onpremsyncenabled = true
AND ENTRA.onpremid = AD.objectid
AND AD.highvalue = true
RETURN ENTRA
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_external_tier_zero_users():
query = """
MATCH (n:AZUser)
WHERE n.highvalue = true
AND n.name CONTAINS '#EXT#@'
RETURN n
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_disabled_azure_tier_zero_principals():
query = """
MATCH (n:AZBase)
WHERE n.highvalue = true
AND n.enabled = false
RETURN n
LIMIT 100
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_devices_unsupported_os():
query = """
MATCH (n:AZDevice)
WHERE n.operatingsystem CONTAINS 'WINDOWS'
AND n.operatingsystemversion =~ '(10.0.19044|10.0.22000|10.0.19043|10.0.19042|10.0.19041|10.0.18363|10.0.18362|10.0.17763|10.0.17134|10.0.16299|10.0.15063|10.0.14393|10.0.10586|10.0.10240|6.3.9600|6.2.9200|6.1.7601|6.0.6200|5.1.2600|6.0.6003|5.2.3790|5.0.2195).?.*'
RETURN n
LIMIT 100
"""
return await query_bloodhound(query)
# Azure - Cross Platform Attack Paths
@mcp.tool()
async def find_entra_users_in_domain_admins():
query = """
MATCH p = (:AZUser)-[:SyncedToADUser]->(:User)-[:MemberOf]->(t:Group)
WHERE t.objectid ENDS WITH '-512'
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_onprem_users_owning_entra_objects():
query = """
MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwns]->(:AZBase)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_onprem_users_in_entra_groups():
query = """
MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("templates_no_security_ext")
async def find_templates_no_security_extension():
query = """
MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
WHERE ct.nosecurityextension = true
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("templates_with_user_san")
async def find_templates_with_user_specified_san():
query = """
MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(eca:EnterpriseCA)
WHERE eca.isuserspecifiessanenabled = True
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool()
async def find_ca_administrators():
query = """
MATCH p = (:Base)-[:ManageCertificates|ManageCA]->(:EnterpriseCA)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("onprem_users_direct_entra_roles")
async def find_onprem_users_with_direct_entra_roles():
query = """
MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZHasRole]->(:AZRole)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("onprem_users_group_entra_roles")
async def find_onprem_users_with_group_entra_roles():
query = """
MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZHasRole]->(:AZRole)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("onprem_users_direct_azure_roles")
async def find_onprem_users_with_direct_azure_roles():
query = """
MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
@mcp.tool("onprem_users_group_azure_roles")
async def find_onprem_users_with_group_azure_roles():
query = """
MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase)
RETURN p
LIMIT 1000
"""
return await query_bloodhound(query)
if __name__ == "__main__":
if verify_connectivity():
try:
logger.info("Starting MCP server...")
mcp.run(transport="stdio")
finally:
driver.close()
else:
logger.error("Failed to establish Neo4j connection. Please check your credentials and connection settings.")
```