#
tokens: 8437/50000 5/5 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitattributes
├── .github
│   └── FUNDING.yml
├── BloodHound-MCP.py
├── images
│   └── BloodHound-MCP-Banner.png
├── README.md
└── requirements.txt
```

# Files

--------------------------------------------------------------------------------
/.gitattributes:
--------------------------------------------------------------------------------

```
# Auto detect text files and perform LF normalization
* text=auto

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# BloodHound-MCP

![BloodHound-MCP](/images/BloodHound-MCP-Banner.png)

## Model Context Protocol (MCP) Server for BloodHound

BloodHound-MCP is a powerful integration that brings the capabilities of Model Context Procotol (MCP) Server to BloodHound, the industry-standard tool for Active Directory security analysis. This integration allows you to analyze BloodHound data using natural language, making complex Active Directory attack path analysis accessible to everyone.

> 🥇 **First-Ever BloodHound AI Integration!**  
> This is the first integration that connects BloodHound with AI through MCP, [originally announced here](https://www.linkedin.com/posts/mor-david-cyber_bloodhound-ai-cybersec-activity-7310921541213470721-N390).

## 🔍 What is BloodHound-MCP?

BloodHound-MCP combines the power of:
- **BloodHound**: Industry-standard tool for visualizing and analyzing Active Directory attack paths
- **Model Context Protocol (MCP)**: An open protocol for creating custom AI tools, compatible with various AI models
- **Neo4j**: Graph database used by BloodHound to store AD relationship data

With over 75 specialized tools based on the original BloodHound CE Cypher queries, BloodHound-MCP allows security professionals to:
- Query BloodHound data using natural language
- Discover complex attack paths in Active Directory environments
- Assess Active Directory security posture more efficiently
- Generate detailed security reports for stakeholders

## 📱 Community

Join our Telegram channel for updates, tips, and discussion:
- **Telegram**: [root_sec](https://t.me/root_sec)

## 🌟 Star History

[![Star History Chart](https://api.star-history.com/svg?repos=MorDavid/BloodHound-MCP-AI&type=Date)](https://www.star-history.com/#MorDavid/BloodHound-MCP-AI&Date)

## ✨ Features

- **Natural Language Interface**: Query BloodHound data using plain English
- **Comprehensive Analysis Categories**:
  - Domain structure mapping
  - Privilege escalation paths
  - Kerberos security issues (Kerberoasting, AS-REP Roasting)
  - Certificate services vulnerabilities
  - Active Directory hygiene assessment
  - NTLM relay attack vectors
  - Delegation abuse opportunities
  - And much more!

## 📋 Prerequisites

- BloodHound 4.x+ with data collected from an Active Directory environment
- Neo4j database with BloodHound data loaded
- Python 3.8 or higher
- MCP Client

## 🔧 Installation

1. Clone this repository:
   ```bash
   git clone https://github.com/your-username/MCP-BloodHound.git
   cd MCP-BloodHound
   ```

2. Install dependencies:
   ```bash
   pip install -r requirements.txt
   ```
3. Configure the MCP Server
    ```bash
    "mcpServers": {
        "BloodHound-MCP": {
            "command": "python",
            "args": [
                "<Your_Path>\\BloodHound-MCP.py"
            ],
            "env": {
                "BLOODHOUND_URI": "bolt://localhost:7687",
                "BLOODHOUND_USERNAME": "neo4j",
                "BLOODHOUND_PASSWORD": "bloodhoundcommunityedition"
            }
        }
    }
   ```
## 🚀 Usage

Example queries you can ask through the MCP:

- "Show me all paths from kerberoastable users to Domain Admins"
- "Find computers where Domain Users have local admin rights"
- "Identify Domain Controllers vulnerable to NTLM relay attacks"
- "Map all Active Directory certificate services vulnerabilities"
- "Generate a comprehensive security report for my domain"
- "Find inactive privileged accounts"
- "Show me attack paths to high-value targets"

## 🔐 Security Considerations

This tool is designed for legitimate security assessment purposes. Always:
- Obtain proper authorization before analyzing any Active Directory environment
- Handle BloodHound data as sensitive information
- Follow responsible disclosure practices for any vulnerabilities discovered

## 📜 License

This project is licensed under the MIT License - see the LICENSE file for details.

## 🙏 Acknowledgments

- The BloodHound team for creating an amazing Active Directory security tool
- The security community for continuously advancing AD security practices

[![Verified on MseeP](https://mseep.ai/badge.svg)](https://mseep.ai/app/09d13f50-8965-4ebf-b4bf-d6bb98e8f092)

---

*Note: This is not an official Anthropic product. BloodHound-MCP is a community-driven integration between BloodHound and MCP.* 

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
neo4j
python-dotenv
mcp-server>=0.1.0
fastmcp
```

--------------------------------------------------------------------------------
/.github/FUNDING.yml:
--------------------------------------------------------------------------------

```yaml
# These are supported funding model platforms

github: mordavid
patreon: mordavid
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
polar: # Replace with a single Polar username
buy_me_a_coffee: mordavid
thanks_dev: # Replace with a single thanks.dev username
custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']

```

--------------------------------------------------------------------------------
/BloodHound-MCP.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
from neo4j import GraphDatabase
import os
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# BloodHound & Neo4j connection details
BLOODHOUND_URI = os.getenv("BLOODHOUND_URI", "bolt://localhost:7687")
BLOODHOUND_USERNAME = os.getenv("BLOODHOUND_USERNAME", "neo4j")
BLOODHOUND_PASSWORD = os.getenv("BLOODHOUND_PASSWORD", "bloodhound")

logger.debug(f"Using Neo4j connection details:")
logger.debug(f"URI: {BLOODHOUND_URI}")
logger.debug(f"User: {BLOODHOUND_USERNAME}")

# Create Neo4j driver with BloodHound CE specific settings
driver = GraphDatabase.driver(
    BLOODHOUND_URI,
    auth=(BLOODHOUND_USERNAME, BLOODHOUND_PASSWORD),
    encrypted=False
)

# Verify connection
def verify_connectivity():
    try:
        # Try both default and bloodhound databases
        databases = ["neo4j", "bloodhound"]
        for db in databases:
            try:
                with driver.session(database=db) as session:
                    logger.debug(f"Attempting to verify connection to database '{db}'...")
                    result = session.run("MATCH (n:User) RETURN count(n) as count")
                    count = result.single()["count"]
                    logger.info(f"Successfully connected to database '{db}'. Found {count} users.")
                    return True
            except Exception as e:
                logger.debug(f"Failed to connect to database '{db}': {str(e)}")
                continue
        raise Exception("Could not connect to any database")
    except Exception as e:
        logger.error(f"Failed to connect to Neo4j: {str(e)}")
        return False

# Create FastMCP server for BloodHound
mcp = FastMCP("BH-Examples")

@mcp.tool()
async def query_bloodhound(query: str):
    databases = ["neo4j", "bloodhound"]
    last_error = None
    
    for db in databases:
        try:
            with driver.session(database=db) as session:
                result = session.run(query)
                data = [record.data() for record in result]
                logger.info(f"Query successful on database '{db}'")
                return {"success": True, "data": data}
        except Exception as e:
            last_error = e
            logger.debug(f"Query failed on database '{db}': {str(e)}")
            continue
    
    logger.error(f"Query failed on all databases. Last error: {str(last_error)}")
    return {"success": False, "error": str(last_error)}

# Domain Information
@mcp.tool()
async def find_all_domain_admins():
    query = """
    MATCH p = (t:Group)<-[:MemberOf*1..]-(a)
    WHERE (a:User or a:Computer) and t.objectid ENDS WITH '-512'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def map_domain_trusts():
    query = """
    MATCH p = (:Domain)-[:TrustedBy]->(:Domain)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_tier_zero_locations():
    query = """
    MATCH p = (t:Base)<-[:Contains*1..]-(:Domain)
    WHERE t.highvalue = true
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def map_ou_structure():
    query = """
    MATCH p = (:Domain)-[:Contains*1..]->(:OU)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

# Dangerous Privileges
@mcp.tool()
async def find_dcsync_privileges():
    query = """
    MATCH p=(:Base)-[:DCSync|AllExtendedRights|GenericAll]->(:Domain)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_foreign_group_memberships():
    query = """
    MATCH p=(s:Base)-[:MemberOf]->(t:Group)
    WHERE s.domainsid<>t.domainsid
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_users_local_admins():
    query = """
    MATCH p=(s:Group)-[:AdminTo]->(:Computer)
    WHERE s.objectid ENDS WITH '-513'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_users_laps_readers():
    query = """
    MATCH p=(s:Group)-[:AllExtendedRights|ReadLAPSPassword]->(:Computer)
    WHERE s.objectid ENDS WITH '-513'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_users_high_value_paths():
    query = """
    MATCH p=shortestPath((s:Group)-[r*1..]->(t))
    WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_users_workstation_rdp():
    query = """
    MATCH p=(s:Group)-[:CanRDP]->(t:Computer)
    WHERE s.objectid ENDS WITH '-513' AND NOT toUpper(t.operatingsystem) CONTAINS 'SERVER'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_users_server_rdp():
    query = """
    MATCH p=(s:Group)-[:CanRDP]->(t:Computer)
    WHERE s.objectid ENDS WITH '-513' AND toUpper(t.operatingsystem) CONTAINS 'SERVER'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_users_privileges():
    query = """
    MATCH p=(s:Group)-[r]->(:Base)
    WHERE s.objectid ENDS WITH '-513'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domain_admin_non_dc_logons():
    query = """
    MATCH (s)-[:MemberOf*0..]->(g:Group)
    WHERE g.objectid ENDS WITH '-516'
    WITH COLLECT(s) AS exclude
    MATCH p = (c:Computer)-[:HasSession]->(:User)-[:MemberOf*1..]->(g:Group)
    WHERE g.objectid ENDS WITH '-512' AND NOT c IN exclude
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

# Kerberos Interaction
@mcp.tool()
async def find_kerberoastable_tier_zero():
    query = """
    MATCH (u:User)
    WHERE u.hasspn=true
    AND u.enabled = true
    AND NOT u.objectid ENDS WITH '-502'
    AND NOT u.gmsa = true
    AND NOT u.msa = true
    AND u.highvalue = true
    RETURN u
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_all_kerberoastable_users():
    query = """
    MATCH (u:User)
    WHERE u.hasspn=true
    AND u.enabled = true
    AND NOT u.objectid ENDS WITH '-502'
    AND NOT u.gmsa = true
    AND NOT u.msa = true
    RETURN u
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_kerberoastable_most_admin():
    query = """
    MATCH (u:User)
    WHERE u.hasspn = true
      AND u.enabled = true
      AND NOT u.objectid ENDS WITH '-502'
      AND NOT u.gmsa = true
      AND NOT u.msa = true
    MATCH (u)-[:MemberOf|AdminTo*1..]->(c:Computer)
    WITH DISTINCT u, COUNT(c) AS adminCount
    RETURN u
    ORDER BY adminCount DESC
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_asreproast_users():
    query = """
    MATCH (u:User)
    WHERE u.dontreqpreauth = true
    AND u.enabled = true
    RETURN u
    LIMIT 100
    """
    return await query_bloodhound(query)

# Shortest Paths
@mcp.tool()
async def find_shortest_paths_unconstrained_delegation():
    query = """
    MATCH p=shortestPath((s)-[r*1..]->(t:Computer))
    WHERE t.unconstraineddelegation = true AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_paths_from_kerberoastable_to_da():
    query = """
    MATCH p=shortestPath((s:User)-[r*1..]->(t:Group))
    WHERE s.hasspn=true
    AND s.enabled = true
    AND NOT s.objectid ENDS WITH '-502'
    AND NOT s.gmsa = true
    AND NOT s.msa = true
    AND t.objectid ENDS WITH '-512'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_shortest_paths_to_tier_zero():
    query = """
    MATCH p=shortestPath((s)-[r*1..]->(t))
    WHERE t.highvalue = true AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_paths_from_domain_users_to_tier_zero():
    query = """
    MATCH p=shortestPath((s:Group)-[r*1..]->(t))
    WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_shortest_paths_to_domain_admins():
    query = """
    MATCH p=shortestPath((t:Group)<-[r*1..]-(s:Base))
    WHERE t.objectid ENDS WITH '-512' AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_paths_from_owned_objects():
    query = """
    MATCH p=shortestPath((s:Base)-[r*1..]->(t:Base))
    WHERE s.owned = true AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

# Active Directory Certificate Services
@mcp.tool()
async def find_pki_hierarchy():
    query = """
    MATCH p=()-[:HostsCAService|IssuedSignedBy|EnterpriseCAFor|RootCAFor|TrustedForNTAuth|NTAuthStoreFor*..]->(:Domain)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_public_key_services():
    query = """
    MATCH p = (c:Container)-[:Contains*..]->(:Base)
    WHERE c.distinguishedname starts with 'CN=PUBLIC KEY SERVICES,CN=SERVICES,CN=CONFIGURATION,DC='
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_certificate_enrollment_rights():
    query = """
    MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_esc1_vulnerable_templates():
    query = """
    MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
    WHERE ct.enrolleesuppliessubject = True
    AND ct.authenticationenabled = True
    AND ct.requiresmanagerapproval = False
    AND (ct.authorizedsignatures = 0 OR ct.schemaversion = 1)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_esc2_vulnerable_templates():
    query = """
    MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(c:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
    WHERE c.requiresmanagerapproval = false
    AND (c.effectiveekus = [''] OR '2.5.29.37.0' IN c.effectiveekus)
    AND (c.authorizedsignatures = 0 OR c.schemaversion = 1)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_enrollment_agent_templates():
    query = """
    MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
    WHERE '1.3.6.1.4.1.311.20.2.1' IN ct.effectiveekus
    OR '2.5.29.37.0' IN ct.effectiveekus
    OR SIZE(ct.effectiveekus) = 0
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_dcs_weak_certificate_binding():
    query = """
    MATCH p = (s:Computer)-[:DCFor]->(:Domain)
    WHERE s.strongcertificatebindingenforcementraw = 0 OR s.strongcertificatebindingenforcementraw = 1
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_inactive_tier_zero_principals():
    query = """
    WITH 60 as inactive_days
    MATCH (n:Base)
    WHERE n.highvalue = true
    AND n.enabled = true
    AND n.lastlogontimestamp < (datetime().epochseconds - (inactive_days * 86400))
    AND n.lastlogon < (datetime().epochseconds - (inactive_days * 86400))
    AND n.whencreated < (datetime().epochseconds - (inactive_days * 86400))
    AND NOT n.name STARTS WITH 'AZUREADKERBEROS.'
    AND NOT n.objectid ENDS WITH '-500'
    AND NOT n.name STARTS WITH 'AZUREADSSOACC.'
    RETURN n
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_tier_zero_without_smartcard():
    query = """
    MATCH (u:User)
    WHERE u.highvalue = true
    AND u.enabled = true
    AND u.smartcardrequired = false
    AND NOT u.name STARTS WITH 'MSOL_'
    AND NOT u.name STARTS WITH 'PROVAGENTGMSA'
    AND NOT u.name STARTS WITH 'ADSYNCMSA_'
    RETURN u
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_domains_with_machine_quota():
    query = """
    MATCH (d:Domain)
    WHERE d.machineaccountquota > 0
    RETURN d
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_smartcard_dont_expire_domains():
    query = """
    MATCH (s:Domain)-[:Contains*1..]->(t:Base)
    WHERE s.expirepasswordsonsmartcardonlyaccounts = false
    AND t.enabled = true
    AND t.smartcardrequired = true
    RETURN s
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_two_way_forest_trust_delegation():
    query = """
    MATCH p=(n:Domain)-[r:TrustedBy]->(m:Domain)
    WHERE (m)-[:TrustedBy]->(n)
    AND r.trusttype = 'Forest'
    AND r.tgtdelegationenabled = true
    RETURN p
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_unsupported_operating_systems():
    query = """
    MATCH (c:Computer)
    WHERE c.operatingsystem =~ '(?i).*Windows.* (2000|2003|2008|2012|xp|vista|7|8|me|nt).*'
    RETURN c
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_users_with_no_password_required():
    query = """
    MATCH (u:User)
    WHERE u.passwordnotreqd = true
    RETURN u
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_users_password_not_rotated():
    query = """
    WITH 365 as days_since_change
    MATCH (u:User)
    WHERE u.pwdlastset < (datetime().epochseconds - (days_since_change * 86400))
    AND NOT u.pwdlastset IN [-1.0, 0.0]
    RETURN u
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_nested_tier_zero_groups():
    query = """
    MATCH p=(t:Group)<-[:MemberOf*..]-(s:Group)
    WHERE t.highvalue = true
    AND NOT s.objectid ENDS WITH '-512'
    AND NOT s.objectid ENDS WITH '-519'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_disabled_tier_zero_principals():
    query = """
    MATCH (n:Base)
    WHERE n.highvalue = true
    AND n.enabled = false
    AND NOT n.objectid ENDS WITH '-502'
    AND NOT n.objectid ENDS WITH '-500'
    RETURN n
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_principals_reversible_encryption():
    query = """
    MATCH (n:Base)
    WHERE n.encryptedtextpwdallowed = true
    RETURN n
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_principals_des_only_kerberos():
    query = """
    MATCH (n:Base)
    WHERE n.enabled = true
    AND n.usedeskeyonly = true
    RETURN n
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_principals_weak_kerberos_encryption():
    query = """
    MATCH (u:Base)
    WHERE 'DES-CBC-CRC' IN u.supportedencryptiontypes
    OR 'DES-CBC-MD5' IN u.supportedencryptiontypes
    OR 'RC4-HMAC-MD5' IN u.supportedencryptiontypes
    RETURN u
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_tier_zero_non_expiring_passwords():
    query = """
    MATCH (u:User)
    WHERE u.enabled = true
    AND u.pwdneverexpires = true
    AND u.highvalue = true
    RETURN u
    LIMIT 100
    """
    return await query_bloodhound(query)

# NTLM Relay Attacks
@mcp.tool()
async def find_ntlm_relay_edges():
    query = """
    MATCH p = (n:Base)-[:CoerceAndRelayNTLMToLDAP|CoerceAndRelayNTLMToLDAPS|CoerceAndRelayNTLMToADCS|CoerceAndRelayNTLMToSMB]->(:Base)
    RETURN p LIMIT 500
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_esc8_vulnerable_cas():
    query = """
    MATCH (n:EnterpriseCA)
    WHERE n.hasvulnerableendpoint=true
    RETURN n
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_computers_outbound_ntlm_deny():
    query = """
    MATCH (c:Computer)
    WHERE c.restrictoutboundntlm = True
    RETURN c LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_computers_in_protected_users():
    query = """
    MATCH p = (:Base)-[:MemberOf*1..]->(g:Group)
    WHERE g.objectid ENDS WITH "-525"
    RETURN p LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_dcs_vulnerable_ntlm_relay():
    query = """
    MATCH p = (dc:Computer)-[:DCFor]->(:Domain)
    WHERE (dc.ldapavailable = True AND dc.ldapsigning = False)
    OR (dc.ldapsavailable = True AND dc.ldapsepa = False)
    OR (dc.ldapavailable = True AND dc.ldapsavailable = True AND dc.ldapsigning = False and dc.ldapsepa = True)
    RETURN p
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_computers_webclient_running():
    query = """
    MATCH (c:Computer)
    WHERE c.webclientrunning = True
    RETURN c LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_computers_no_smb_signing():
    query = """
    MATCH (n:Computer)
    WHERE n.smbsigning = False
    RETURN n
    """
    return await query_bloodhound(query)

# Azure - General
@mcp.tool()
async def find_global_administrators():
    query = """
    MATCH p = (:AZBase)-[:AZGlobalAdmin*1..]->(:AZTenant)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_high_privileged_role_members():
    query = """
    MATCH p=(t:AZRole)<-[:AZHasRole|AZMemberOf*1..2]-(:AZBase)
    WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

# Azure - Shortest Paths
@mcp.tool()
async def find_paths_from_entra_to_tier_zero():
    query = """
    MATCH p=shortestPath((s:AZUser)-[r*1..]->(t:AZBase))
    WHERE t.highvalue = true AND t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_paths_to_privileged_roles():
    query = """
    MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZRole))
    WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_paths_from_azure_apps_to_tier_zero():
    query = """
    MATCH p=shortestPath((s:AZApp)-[r*1..]->(t:AZBase))
    WHERE t.highvalue = true AND s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_paths_to_azure_subscriptions():
    query = """
    MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZSubscription))
    WHERE s<>t
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

# Azure - Microsoft Graph
@mcp.tool("sp_app_role_grant")
async def find_service_principals_with_app_role_grant():
    query = """
    MATCH p=(:AZServicePrincipal)-[:AZMGGrantAppRoles]->(:AZTenant)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("find_sp_graph_assignments")
async def find_service_principals_with_graph_assignments():
    query = """
    MATCH p=(:AZServicePrincipal)-[:AZMGAppRoleAssignment_ReadWrite_All|AZMGApplication_ReadWrite_All|AZMGDirectory_ReadWrite_All|AZMGGroupMember_ReadWrite_All|AZMGGroup_ReadWrite_All|AZMGRoleManagement_ReadWrite_Directory|AZMGServicePrincipalEndpoint_ReadWrite_All]->(:AZServicePrincipal)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

# Azure - Hygiene
@mcp.tool()
async def find_foreign_tier_zero_principals():
    query = """
    MATCH (n:AZServicePrincipal)
    WHERE n.highvalue = true
    AND NOT toUpper(n.appownerorganizationid) = toUpper(n.tenantid)
    AND n.appownerorganizationid CONTAINS '-'
    RETURN n
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_synced_tier_zero_principals():
    query = """
    MATCH (ENTRA:AZBase)
    MATCH (AD:Base)
    WHERE ENTRA.onpremsyncenabled = true
    AND ENTRA.onpremid = AD.objectid
    AND AD.highvalue = true
    RETURN ENTRA
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_external_tier_zero_users():
    query = """
    MATCH (n:AZUser)
    WHERE n.highvalue = true
    AND n.name CONTAINS '#EXT#@'
    RETURN n
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_disabled_azure_tier_zero_principals():
    query = """
    MATCH (n:AZBase)
    WHERE n.highvalue = true
    AND n.enabled = false
    RETURN n
    LIMIT 100
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_devices_unsupported_os():
    query = """
    MATCH (n:AZDevice)
    WHERE n.operatingsystem CONTAINS 'WINDOWS'
    AND n.operatingsystemversion =~ '(10.0.19044|10.0.22000|10.0.19043|10.0.19042|10.0.19041|10.0.18363|10.0.18362|10.0.17763|10.0.17134|10.0.16299|10.0.15063|10.0.14393|10.0.10586|10.0.10240|6.3.9600|6.2.9200|6.1.7601|6.0.6200|5.1.2600|6.0.6003|5.2.3790|5.0.2195).?.*'
    RETURN n
    LIMIT 100
    """
    return await query_bloodhound(query)

# Azure - Cross Platform Attack Paths
@mcp.tool()
async def find_entra_users_in_domain_admins():
    query = """
    MATCH p = (:AZUser)-[:SyncedToADUser]->(:User)-[:MemberOf]->(t:Group)
    WHERE t.objectid ENDS WITH '-512'
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_onprem_users_owning_entra_objects():
    query = """
    MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwns]->(:AZBase)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_onprem_users_in_entra_groups():
    query = """
    MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("templates_no_security_ext")
async def find_templates_no_security_extension():
    query = """
    MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA)
    WHERE ct.nosecurityextension = true
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("templates_with_user_san")
async def find_templates_with_user_specified_san():
    query = """
    MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(eca:EnterpriseCA)
    WHERE eca.isuserspecifiessanenabled = True
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool()
async def find_ca_administrators():
    query = """
    MATCH p = (:Base)-[:ManageCertificates|ManageCA]->(:EnterpriseCA)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("onprem_users_direct_entra_roles")
async def find_onprem_users_with_direct_entra_roles():
    query = """
    MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZHasRole]->(:AZRole)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("onprem_users_group_entra_roles")
async def find_onprem_users_with_group_entra_roles():
    query = """
    MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZHasRole]->(:AZRole)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("onprem_users_direct_azure_roles")
async def find_onprem_users_with_direct_azure_roles():
    query = """
    MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

@mcp.tool("onprem_users_group_azure_roles")
async def find_onprem_users_with_group_azure_roles():
    query = """
    MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase)
    RETURN p
    LIMIT 1000
    """
    return await query_bloodhound(query)

if __name__ == "__main__":
    if verify_connectivity():
        try:
            logger.info("Starting MCP server...")
            mcp.run(transport="stdio")
        finally:
            driver.close()
    else:
        logger.error("Failed to establish Neo4j connection. Please check your credentials and connection settings.") 
```