#
tokens: 42363/50000 15/15 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .dockerignore
├── .gitignore
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── readme.md
├── requirements.txt
├── roam_mcp
│   ├── __init__.py
│   ├── api.py
│   ├── cli.py
│   ├── content_parsers.py
│   ├── content.py
│   ├── memory.py
│   ├── search.py
│   ├── server.py
│   └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
# Version control
.git
.gitignore

# Python cache files
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
venv/
ENV/
env/

# IDE specific files
.idea/
.vscode/
*.swp
*.swo

# Docker specific
Dockerfile
.dockerignore

# Local development files
.env
*.log
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# OS-specific
.DS_Store

# Python bytecode and C extensions
__pycache__/
*.py[cod]
*$py.class
*.so

# Build system artifacts
.Python
build/
dist/
downloads/
*.egg-info/
.eggs/
*.egg
MANIFEST
wheels/
share/python-wheels/
sdist/
develop-eggs/
lib/
lib64/
parts/
var/
.installed.cfg

# Packaging tools
*.spec

# Logs and runtime files
*.log
pip-log.txt
pip-delete-this-directory.txt

# Coverage and test artifacts
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.pytest_cache/
.hypothesis/

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# IDEs and editors
.vscode/
.idea/
.spyderproject
.spyproject
.ropeproject

# Jupyter and IPython
.ipynb_checkpoints/
profile_default/
ipython_config.py

# Documentation
docs/_build/
site/

# Type checking
.mypy_cache/
.dmypy.json
.pyre/
.pytype/

# Other project tools
.pybuilder/
target/
scrapy/
.webassets-cache
celerybeat-schedule
celerybeat.pid

# SQLite (dev DBs)
db.sqlite3
db.sqlite3-journal

# Claude Code and MCP configuration
.mcp.json
CLAUDE.md
WARP.md
```

--------------------------------------------------------------------------------
/readme.md:
--------------------------------------------------------------------------------

```markdown
# Roam Research MCP Server

A Model Context Protocol (MCP) server that connects Claude and other AI assistants to your Roam Research graph.

## What This Does

This server acts as a bridge between AI assistants and your Roam Research database. After setup, you can simply ask Claude to work with your Roam data - no coding required.

For example, you can say:
- "Add these meeting notes to today's daily note in Roam"
- "Search my Roam graph for blocks tagged with #ProjectIdeas"
- "Create a new page in Roam called 'Project Planning'"
- "Find all TODO items I created this month"

## Features

### Content Creation
- Create new pages with nested content and headings
- Add blocks to any page with proper hierarchy
- Create structured outlines with customizable nesting
- Import markdown with proper nesting
- Add todo items with automatic TODO status
- Update existing content individually or in batches
- Modify block content with pattern transformations

### Search and Retrieval
- Find pages and blocks by title, text, or tags
- Search for TODO/DONE items with filtering options
- Find recently modified content
- Search block references and explore block hierarchies
- Search by creation or modification dates
- Navigate parent-child relationships in blocks
- Execute custom Datalog queries for advanced needs

### Memory System
- Store information for Claude to remember across conversations
- Recall stored memories with filtering and sorting options
- Tag memories with custom categories
- Access both recent and older memories with flexible retrieval

### URL Content Processing
- Extract and import content from webpages
- Parse and extract text from PDF documents
- Retrieve YouTube video transcripts
- Intelligently detect content type and process accordingly

## Setup Instructions

1. Install Claude Desktop from [https://claude.ai/download](https://claude.ai/download)

2. Edit your Claude Desktop configuration file:
   - Mac: `~/Library/Application Support/Claude/claude_desktop_config.json`
   - Windows: `%APPDATA%\Claude\claude_desktop_config.json`

3. Add this configuration:

```json
{
  "mcpServers": {
    "roam-helper": {
      "command": "uvx",
      "args": ["git+https://github.com/PhiloSolares/roam-mcp.git"],
      "env": {
        "ROAM_API_TOKEN": "<your_roam_api_token>",
        "ROAM_GRAPH_NAME": "<your_roam_graph_name>"
      }
    }
  }
}
```

4. Get your Roam API token:
   - Go to your Roam Research graph settings
   - Navigate to "API tokens"
   - Click "+ New API Token"
   - Copy the token to your configuration

## How to Use

Once set up, simply chat with Claude and ask it to work with your Roam graph. Claude will use the appropriate MCP commands behind the scenes.

Example conversations:

**Creating Content:**
> You: "Claude, please create a new page in my Roam graph called 'Project Ideas' with a section for mobile app ideas."

**Searching Content:**
> You: "Find all blocks in my Roam graph tagged with #ProjectIdeas that also mention mobile apps."
>
> You: "Show me all the TODO items I created this week."

**Using the Memory System:**
> You: "Remember that I want to use spaced repetition for learning JavaScript."
>
> Later:
> You: "What learning techniques have we discussed for programming?"

**Working with External Content:**
> You: "Extract the main points from this PDF and add them to my Roam graph."
>
> You: "Get the transcript from this YouTube video about productivity."

## Advanced Configuration

By default, memories are stored with the tag `#[[Memories]]`. To use a different tag:

```json
"env": {
  "ROAM_API_TOKEN": "your-token",
  "ROAM_GRAPH_NAME": "your-graph",
  "MEMORIES_TAG": "#[[Claude/Memories]]"
}
```

## Docker Support

You can run the Roam MCP server in a Docker container:

### Building the Image

```bash
docker build -t roam-mcp .
```

### Running the Container

Run with environment variables:

```bash
docker run -p 3000:3000 \
  -e ROAM_API_TOKEN="your_api_token" \
  -e ROAM_GRAPH_NAME="your_graph_name" \
  roam-mcp
```

### Using with Claude Desktop

Configure Claude Desktop to use the containerized server:

```json
{
  "mcpServers": {
    "roam-helper": {
      "command": "docker",
      "args": ["run", "--rm", "-p", "3000:3000",
               "-e", "ROAM_API_TOKEN=your_token",
               "-e", "ROAM_GRAPH_NAME=your_graph",
               "roam-mcp"],
      "env": {}
    }
  }
}
```

## License

MIT License
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp>=1.3.0
httpx>=0.24.0
pydantic>=2.0.0
youtube-transcript-api>=0.6.0
requests>=2.28.0
python-dotenv>=1.0.0
trafilatura>=1.6.0
unstructured[pdf]>=0.10.0
```

--------------------------------------------------------------------------------
/roam_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Roam Research MCP Server - Python implementation
Connect Claude to your Roam Research database

Enhanced version with improved architecture and features
"""

__version__ = "0.3.0"
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Base image
FROM python:3.11-slim

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV ROAM_API_TOKEN=""
ENV ROAM_GRAPH_NAME=""
ENV MEMORIES_TAG="#[[Memories]]"

# Install system dependencies for PDF processing
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        gcc \
        poppler-utils \
        libmagic1 \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

# Create a non-root user
RUN useradd -m appuser

# Create and set working directory
WORKDIR /app

# Copy requirements file for caching
COPY --chown=appuser:appuser requirements.txt ./

# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt

# Copy the application code
COPY --chown=appuser:appuser . .

# Change to non-root user
USER appuser

# Expose port for SSE transport
EXPOSE 3000

# Command to run the application (can be overridden)
CMD ["python", "-m", "roam_mcp.cli", "--transport", "sse", "--port", "3000"]
```

--------------------------------------------------------------------------------
/roam_mcp/cli.py:
--------------------------------------------------------------------------------

```python
"""Command-line interface for the Roam MCP server."""

import argparse
import sys
from roam_mcp.server import run_server

def main():
    """Entry point for the Roam MCP server CLI."""
    parser = argparse.ArgumentParser(description="Roam Research MCP Server")
    
    # Transport options
    parser.add_argument(
        "--transport",
        choices=["stdio", "sse"],
        default="stdio",
        help="Transport method (stdio or sse)"
    )
    
    # Server configuration
    parser.add_argument(
        "--port",
        type=int,
        default=3000,
        help="Port for SSE transport (default: 3000)"
    )
    
    # Verbosity options
    parser.add_argument(
        "-v", "--verbose",
        action="store_true",
        help="Enable verbose logging"
    )

    # Parse arguments
    args = parser.parse_args()

    # Run the server with the specified transport
    try:
        run_server(
            transport=args.transport,
            port=args.port if args.transport == "sse" else None,
            verbose=args.verbose
        )
    except KeyboardInterrupt:
        print("\nServer stopped by user", file=sys.stderr)
        sys.exit(0)
    except Exception as e:
        print(f"Error starting server: {str(e)}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "roam-mcp"
version = "0.2.0"
description = "A Model Context Protocol server for Roam Research integration with AI assistants"
readme = "README.md"
requires-python = ">=3.9"
license = {text = "MIT"}
authors = [
    {name = "Roam MCP Project Contributors"}
]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: End Users/Desktop",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.9",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
]
dependencies = [
    "mcp>=1.3.0",
    "httpx>=0.24.0",
    "pydantic>=2.0.0",
    "youtube-transcript-api>=0.6.0",
    "requests>=2.28.0",
    "python-dotenv>=1.0.0",
    "trafilatura>=1.6.0",
    "unstructured[pdf]>=0.10.0"
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "black>=23.0.0",
    "isort>=5.12.0",
    "mypy>=1.0.0",
    "pylint>=2.17.0"
]
pdf = [
    "poppler-utils>=23.01.0"
]

[project.scripts]
roam-mcp = "roam_mcp.cli:main"

[tool.hatch.build.targets.wheel]
packages = ["roam_mcp"]

[tool.black]
line-length = 100
target-version = ["py39"]

[tool.isort]
profile = "black"
line_length = 100

[tool.mypy]
python_version = "3.9"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true

[tool.pylint.messages_control]
disable = [
    "missing-docstring",
    "invalid-name"
]

[project.urls]
"Homepage" = "https://github.com/PhiloSolares/roam-mcp"
"Bug Tracker" = "https://github.com/PhiloSolares/roam-mcp/issues"
```

--------------------------------------------------------------------------------
/roam_mcp/content_parsers.py:
--------------------------------------------------------------------------------

```python
"""External content parsing operations for the Roam MCP server."""

import os
import tempfile
import logging
from typing import Dict, Any, Optional
import httpx
import trafilatura
from unstructured.partition.pdf import partition_pdf

# Set up logging
logger = logging.getLogger("roam-mcp.content_parsers")

async def parse_webpage(url: str) -> Dict[str, Any]:
    """
    Parse content from a web page URL.
    
    Args:
        url: URL of the webpage to parse
        
    Returns:
        Result with parsed content
    """
    try:
        logger.debug(f"Fetching web page content from: {url}")
        downloaded = trafilatura.fetch_url(url)
        
        if not downloaded:
            return {
                "success": False,
                "error": f"Failed to download content from {url}"
            }
        
        # Extract main content with document structure preserved
        content = trafilatura.extract(
            downloaded,
            output_format='text',
            include_links=False,
            include_formatting=True
        )
        
        if not content:
            return {
                "success": False,
                "error": f"Failed to extract meaningful content from {url}"
            }
        
        # Get metadata
        metadata = trafilatura.extract_metadata(downloaded)
        title = metadata.get('title', 'Untitled Page')
        
        return {
            "success": True,
            "content": content,
            "title": title,
            "url": url
        }
    except Exception as e:
        logger.error(f"Error parsing web page: {str(e)}")
        return {
            "success": False,
            "error": f"Error parsing web page: {str(e)}"
        }

async def parse_pdf(url: str) -> Dict[str, Any]:
    """
    Parse content from a PDF URL.
    
    Args:
        url: URL of the PDF to parse
        
    Returns:
        Result with parsed content
    """
    try:
        logger.debug(f"Fetching PDF content from: {url}")
        
        # Download the PDF to a temporary file
        async with httpx.AsyncClient() as client:
            response = await client.get(url, follow_redirects=True)
            response.raise_for_status()
            
            # Check if it's a PDF based on Content-Type
            content_type = response.headers.get('Content-Type', '')
            if 'application/pdf' not in content_type.lower():
                return {
                    "success": False,
                    "error": f"URL does not point to a PDF (Content-Type: {content_type})"
                }
            
            # Create a temporary file for the PDF
            with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_file:
                temp_path = temp_file.name
                temp_file.write(response.content)
        
        # Extract content using unstructured
        try:
            elements = partition_pdf(
                temp_path,
                strategy="hi_res",
                extract_images=False,
                extract_tables=True
            )
            
            # Convert to formatted text while preserving structure
            content = "\n\n".join([str(element) for element in elements])
        except UnicodeDecodeError:
            # Fall back to a simpler strategy if hi_res fails with encoding issues
            logger.warning(f"Encountered encoding issues with hi_res strategy, trying fast strategy")
            elements = partition_pdf(
                temp_path,
                strategy="fast",
                extract_images=False,
                extract_tables=False
            )
            content = "\n\n".join([str(element) for element in elements])
        
        # Try to extract a title from the filename in the URL
        path_parts = url.split('/')
        filename = path_parts[-1].split('?')[0]  # Remove query parameters
        title = os.path.splitext(filename)[0].replace('-', ' ').replace('_', ' ').title()
        if not title:
            title = "PDF Document"
        
        # Clean up temporary file
        os.unlink(temp_path)
        
        return {
            "success": True,
            "content": content,
            "title": title,
            "url": url
        }
    except Exception as e:
        logger.error(f"Error parsing PDF: {str(e)}")
        # Clean up temporary file if it exists
        try:
            if 'temp_path' in locals():
                os.unlink(temp_path)
        except:
            pass
            
        return {
            "success": False,
            "error": f"Error parsing PDF: {str(e)}"
        }
```

--------------------------------------------------------------------------------
/roam_mcp/memory.py:
--------------------------------------------------------------------------------

```python
"""Memory system operations for the Roam MCP server."""

from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import logging

from roam_mcp.api import (
    execute_query,
    execute_write_action,
    get_session_and_headers,
    GRAPH_NAME,
    get_daily_page,
    add_block_to_page,
    MEMORIES_TAG,
    ValidationError,
    PageNotFoundError,
    QueryError
)
from roam_mcp.utils import (
    format_roam_date,
    resolve_block_references
)

# Set up logging
logger = logging.getLogger("roam-mcp.memory")


def remember(memory: str, categories: Optional[List[str]] = None) -> Dict[str, Any]:
    """
    Store a memory with the specified MEMORIES_TAG.
    
    Args:
        memory: The memory to store
        categories: Optional list of categories to tag the memory with
        
    Returns:
        Result with success status
    """
    if not memory:
        return {
            "success": False,
            "error": "Memory cannot be empty"
        }
    
    session, headers = get_session_and_headers()
    
    try:
        # Validate and normalize categories
        normalized_categories = []
        if categories:
            # Ensure all categories are strings
            invalid_categories = [cat for cat in categories if not isinstance(cat, str)]
            if invalid_categories:
                return {
                    "success": False,
                    "error": "All categories must be strings"
                }
            
            # Normalize category formats
            for category in categories:
                category = category.strip()
                if not category:
                    continue
                
                # Remove any existing tag syntax
                clean_category = category.replace('#', '').replace('[[', '').replace(']]', '')
                
                # Add to normalized list
                normalized_categories.append(clean_category)
        
        # Get today's daily page
        daily_page_uid = get_daily_page()
        
        # Format memory with tags
        formatted_memory = MEMORIES_TAG
        
        # Add the memory text
        formatted_memory += f" {memory}"
        
        # Add category tags
        for category in normalized_categories:
            # Format category as Roam tag
            if " " in category or "/" in category:
                tag = f"#[[{category}]]"
            else:
                tag = f"#{category}"
            
            formatted_memory += f" {tag}"
        
        # Create memory block
        block_uid = add_block_to_page(daily_page_uid, formatted_memory)
        
        return {
            "success": True,
            "block_uid": block_uid,
            "content": formatted_memory
        }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        logger.error(f"Error storing memory: {str(e)}")
        return {
            "success": False,
            "error": f"Error storing memory: {str(e)}"
        }


def recall(sort_by: str = "newest", filter_tag: Optional[str] = None) -> Dict[str, Any]:
    """
    Recall stored memories, optionally filtered by tag.
    
    Args:
        sort_by: Sort order ("newest" or "oldest")
        filter_tag: Optional tag to filter memories by
        
    Returns:
        List of memory contents
    """
    if sort_by not in ["newest", "oldest"]:
        return {
            "success": False,
            "error": "sort_by must be 'newest' or 'oldest'"
        }
    
    session, headers = get_session_and_headers()
    
    # Clean and normalize the MEMORIES_TAG for queries
    clean_tag = MEMORIES_TAG.replace('#', '').replace('[[', '').replace(']]', '')
    
    # Prepare filter tag conditions if needed
    filter_conditions = ""
    if filter_tag:
        # Clean and normalize filter tag
        clean_filter = filter_tag.replace('#', '').replace('[[', '').replace(']]', '')
        
        # Generate filter tag variants
        filter_variants = []
        if " " in clean_filter or "/" in clean_filter:
            filter_variants = [f"#{clean_filter}", f"#[[{clean_filter}]]", f"[[{clean_filter}]]"]
        else:
            filter_variants = [f"#{clean_filter}", f"#[[{clean_filter}]]", f"[[{clean_filter}]]"]
        
        # Build filter conditions
        filter_conditions_list = []
        for variant in filter_variants:
            filter_conditions_list.append(f'(clojure.string/includes? ?s "{variant}")')
        
        if filter_conditions_list:
            filter_conditions = f" AND (or {' '.join(filter_conditions_list)})"
    
    try:
        logger.debug(f"Recalling memories with sort_by={sort_by}")
        if filter_tag:
            logger.debug(f"Filtering by tag: {filter_tag}")
        
        # Method 1: Search for blocks containing the MEMORIES_TAG across the database
        # Generate tag variants
        tag_variants = []
        if " " in clean_tag or "/" in clean_tag:
            tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
        else:
            tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
        
        # Build tag conditions
        tag_conditions = []
        for variant in tag_variants:
            tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
        
        tag_condition = f"(or {' '.join(tag_conditions)})"
        
        # Create combined condition with filter if needed
        combined_condition = tag_condition
        if filter_conditions:
            combined_condition = f"(and {tag_condition}{filter_conditions})"
        
        # Query blocks with tag
        tag_query = f"""[:find ?uid ?s ?time ?page-title
                      :where
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :create/time ?time]
                      [?b :block/page ?p]
                      [?p :node/title ?page-title]
                      [{combined_condition}]]"""
        
        tag_results = execute_query(tag_query)
        
        # Method 2: Also check for dedicated page with the clean tag name
        page_query = f"""[:find ?uid ?s ?time
                      :where
                      [?p :node/title "{clean_tag}"]
                      [?b :block/page ?p]
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :create/time ?time]]"""
        
        # Add filter if needed
        if filter_conditions:
            page_query = f"""[:find ?uid ?s ?time
                          :where
                          [?p :node/title "{clean_tag}"]
                          [?b :block/page ?p]
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :create/time ?time]
                          [{filter_conditions.replace('AND ', '')}]]"""
        
        page_results = execute_query(page_query)
        
        # Process and combine results
        memories = []
        
        # Process tag results
        for uid, content, time, page_title in tag_results:
            # Resolve references
            resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
            
            memories.append({
                "content": resolved_content,
                "time": time,
                "page_title": page_title,
                "block_uid": uid
            })
        
        # Process page results
        for uid, content, time in page_results:
            # Resolve references
            resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
            
            memories.append({
                "content": resolved_content,
                "time": time,
                "page_title": clean_tag,
                "block_uid": uid
            })
        
        # Sort by time
        memories.sort(key=lambda x: x["time"], reverse=(sort_by == "newest"))
        
        # Clean up content - remove the MEMORIES_TAG
        for memory in memories:
            content = memory["content"]
            for variant in tag_variants:
                content = content.replace(variant, "")
            memory["content"] = content.strip()
        
        # Remove duplicates while preserving order
        seen_contents = set()
        unique_memories = []
        
        for memory in memories:
            content = memory["content"]
            if content and content not in seen_contents:
                seen_contents.add(content)
                unique_memories.append(memory)
        
        # Return just the content strings
        memory_contents = [memory["content"] for memory in unique_memories]
        
        return {
            "success": True,
            "memories": memory_contents,
            "message": f"Found {len(memory_contents)} memories"
        }
    except QueryError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        logger.error(f"Error recalling memories: {str(e)}")
        return {
            "success": False,
            "error": f"Error recalling memories: {str(e)}"
        }
```

--------------------------------------------------------------------------------
/roam_mcp/utils.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for the Roam MCP server."""

import re
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional, Set, Match, Tuple, Union
import json
import time
import uuid

# Set up logging
logger = logging.getLogger("roam-mcp.utils")

# Date formatting
def format_roam_date(date: Optional[datetime] = None) -> str:
    """
    Format a date in Roam's preferred format (e.g., "March 25th, 2025").
    
    Args:
        date: The date to format, defaults to today's date
        
    Returns:
        A string in Roam's date format
    """
    if date is None:
        date = datetime.now()
    
    day = date.day
    if 11 <= day <= 13:
        suffix = "th"
    else:
        suffix = {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
    
    return date.strftime(f"%B %-d{suffix}, %Y")


# Regular expressions for markdown elements
MD_BOLD_PATTERN = r'\*\*(.+?)\*\*'
MD_ITALIC_PATTERN = r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)'
MD_ITALIC_UNDERSCORE_PATTERN = r'(?<!_)_(?!_)(.+?)(?<!_)_(?!_)'
MD_HIGHLIGHT_PATTERN = r'==(.+?)=='
MD_LINK_PATTERN = r'\[([^\]]+)\]\(([^)]+)\)'
MD_CODE_BLOCK_PATTERN = r'```([a-zA-Z0-9]*)\s*\n([\s\S]*?)```'
MD_INLINE_CODE_PATTERN = r'`([^`]+)`'

# Table regex patterns
MD_TABLE_PATTERN = r'(?:\|(.+)\|\s*\n\|(?::?-+:?\|)+\s*\n(?:\|(?:.+)\|\s*\n*)+)'
MD_TABLE_ROW_PATTERN = r'\|(.*)\|'
MD_TABLE_HEADER_PATTERN = r'\|(\s*:?-+:?\s*)\|'
MD_TABLE_ALIGNMENT_PATTERN = r'^(:?)-+(:?)$'  # For detecting alignment in table headers

# Headings pattern
MD_HEADING_PATTERN = r'^(#{1,6})\s+(.+)$'


# Markdown conversion utilities
def convert_to_roam_markdown(text: str) -> str:
    """
    Convert standard markdown to Roam-compatible format.
    
    Args:
        text: Standard markdown text
        
    Returns:
        Roam-formatted markdown text
    """
    # Convert tables first (they may contain other markdown elements)
    text = convert_tables(text)
    
    # Handle code blocks (must be done before other inline elements)
    text = convert_code_blocks(text)
    
    # Handle double asterisks/underscores (bold)
    text = re.sub(MD_BOLD_PATTERN, r'**\1**', text)
    
    # Handle single asterisks/underscores (italic)
    text = re.sub(MD_ITALIC_PATTERN, r'__\1__', text)  # Single asterisk to double underscore
    text = re.sub(MD_ITALIC_UNDERSCORE_PATTERN, r'__\1__', text)  # Single underscore to double underscore
    
    # Handle highlights
    text = re.sub(MD_HIGHLIGHT_PATTERN, r'^^\\1^^', text)
    
    # Convert tasks
    text = re.sub(r'- \[ \]', r'- {{[[TODO]]}}', text)
    text = re.sub(r'- \[x\]', r'- {{[[DONE]]}}', text)
    
    # Convert links
    text = re.sub(MD_LINK_PATTERN, r'[\1](\2)', text)
    
    # Handle headings (convert to Roam's heading format)
    text = convert_headings(text)
    
    # Handle inline code
    text = re.sub(MD_INLINE_CODE_PATTERN, r'`\1`', text)
    
    return text


def convert_headings(text: str) -> str:
    """
    Convert markdown headings to Roam's heading format.
    
    Args:
        text: Markdown text with potential headings
        
    Returns:
        Text with headings converted to Roam format
    """
    def heading_replacer(match: Match) -> str:
        level = len(match.group(1))  # Number of # characters
        content = match.group(2).strip()
        
        # For text format, we'll just keep the heading text and let block attributes 
        # handle the actual heading level in Roam
        return content
    
    # Process line by line to avoid matching # in code blocks
    lines = text.split('\n')
    for i, line in enumerate(lines):
        heading_match = re.match(MD_HEADING_PATTERN, line)
        if heading_match:
            lines[i] = heading_replacer(heading_match)
    
    return '\n'.join(lines)


def convert_code_blocks(text: str) -> str:
    """
    Convert markdown code blocks while preserving language and indentation.
    
    Args:
        text: Markdown text with potential code blocks
        
    Returns:
        Text with code blocks properly formatted
    """
    def code_block_replacer(match: Match) -> str:
        language = match.group(1).strip()
        code_content = match.group(2)
        
        # Preserve language info
        language_tag = f"{language}\n" if language else "\n"
        
        # Clean up indentation
        lines = code_content.split('\n')
        # Find the common indentation level
        non_empty_lines = [line for line in lines if line.strip()]
        if non_empty_lines:
            common_indent = min(len(line) - len(line.lstrip()) for line in non_empty_lines)
            # Remove common indentation
            code_content = '\n'.join(line[common_indent:] if line.strip() else line for line in lines)
        
        return f"```{language_tag}{code_content}```"
    
    return re.sub(MD_CODE_BLOCK_PATTERN, code_block_replacer, text)


def convert_tables(text: str) -> str:
    """
    Convert markdown tables to Roam format.
    
    Args:
        text: Markdown text with potential tables
        
    Returns:
        Text with tables converted to Roam format
    """
    def table_replacer(match: Match) -> str:
        table_text = match.group(0)
        
        # Find all rows
        rows = re.findall(MD_TABLE_ROW_PATTERN, table_text)
        if len(rows) < 2:  # Need at least header and separator
            return table_text
        
        # First row is header, second is separator, rest are data
        header_cells = [cell.strip() for cell in rows[0].split('|') if cell.strip()]
        separator_cells = [cell.strip() for cell in rows[1].split('|') if cell.strip()]
        
        # Determine column alignments from separator row
        alignments = []
        for sep in separator_cells:
            alignment_match = re.match(MD_TABLE_ALIGNMENT_PATTERN, sep)
            if alignment_match:
                left_colon = bool(alignment_match.group(1))
                right_colon = bool(alignment_match.group(2))
                
                if left_colon and right_colon:
                    alignments.append("center")
                elif right_colon:
                    alignments.append("right")
                else:
                    alignments.append("left")
            else:
                alignments.append("left")  # Default alignment
        
        # Generate Roam table format
        roam_table = "{{table}}\n"
        
        # Add header row
        for i, header in enumerate(header_cells):
            indent = "  " * (i + 1)
            roam_table += f"{indent}- {header}\n"
        
        # Add data rows - start from index 2 to skip header and separator
        for row_idx in range(2, len(rows)):
            data_cells = [cell.strip() for cell in rows[row_idx].split('|') if cell.strip()]
            
            for i, cell in enumerate(data_cells):
                if i < len(header_cells):  # Only process cells that have a corresponding header
                    indent = "  " * (i + 1)
                    roam_table += f"{indent}- {cell}\n"
        
        return roam_table
    
    return re.sub(MD_TABLE_PATTERN, table_replacer, text)


class MarkdownNode:
    """Class representing a node in the markdown parsing tree."""
    def __init__(self, content: str, level: int = 0, heading_level: int = 0):
        self.content = content
        self.level = level
        self.heading_level = heading_level
        self.children = []
    
    def add_child(self, node: 'MarkdownNode') -> None:
        """Add a child node to this node."""
        self.children.append(node)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert node to dictionary representation."""
        result = {
            "text": self.content,
            "level": self.level
        }
        
        if self.heading_level:
            result["heading_level"] = self.heading_level
            
        if self.children:
            result["children"] = [child.to_dict() for child in self.children]
            
        return result
        

def parse_markdown_list(markdown: str) -> List[Dict[str, Any]]:
    """
    Parse a markdown list into a hierarchical structure.
    
    Args:
        markdown: Markdown text with nested lists
        
    Returns:
        List of dictionaries with 'text', 'level', and 'children' keys
    """
    # Convert markdown syntax first
    markdown = convert_to_roam_markdown(markdown)
    
    lines = markdown.split('\n')
    root = MarkdownNode("ROOT", -1)  # Root node to hold all top-level items
    node_stack = [root]
    current_level = -1
    in_code_block = False
    code_block_content = []
    code_block_indent = 0
    
    for line_idx, line in enumerate(lines):
        if not line.strip() and not in_code_block:
            continue
            
        # Handle code blocks
        if "```" in line and not in_code_block:
            # Start of code block
            in_code_block = True
            code_block_content = [line]
            # Store the indentation level
            code_block_indent = len(line) - len(line.lstrip())
            continue
        elif "```" in line and in_code_block:
            # End of code block - process the entire block
            code_block_content.append(line)
            
            # Calculate the level based on indentation
            level = code_block_indent // 2
            
            # Join the content with proper line breaks
            content = "\n".join(code_block_content)
            
            # Create a node for the code block
            node = MarkdownNode(content, level)
            
            # Find the right parent for this node
            while len(node_stack) > 1 and node_stack[-1].level >= level:
                node_stack.pop()
                
            # Add to parent
            node_stack[-1].add_child(node)
            
            # Update stack and level
            node_stack.append(node)
            current_level = level
            
            # Reset code block state
            in_code_block = False
            code_block_content = []
            continue
        elif in_code_block:
            # In a code block - just collect the line
            code_block_content.append(line)
            continue
            
        # Check for heading
        heading_match = re.match(MD_HEADING_PATTERN, line)
        if heading_match:
            level = 0  # Headings are top-level
            heading_text = heading_match.group(2).strip()
            heading_level = len(heading_match.group(1))  # Number of # characters
            
            # Reset stack for headings
            while len(node_stack) > 1:
                node_stack.pop()
                
            # Create heading node
            node = MarkdownNode(heading_text, level, heading_level)
            node_stack[-1].add_child(node)
            node_stack.append(node)
            current_level = level
            continue
            
        # Regular list items
        match = re.match(r'^(\s*)[-*+]\s+(.+)$', line)
        if match:
            indent, content = match.groups()
            level = len(indent) // 2 + 1  # Convert indentation to level, starting with 1
            
            # Check for TODO/DONE
            if "{{[[TODO]]}}" in content or "{{[[DONE]]}}" in content:
                level_to_append = level
            else:
                level_to_append = level
            
            # Pop stack until we find parent level
            while len(node_stack) > 1 and node_stack[-1].level >= level:
                node_stack.pop()
                
            # Create new node
            node = MarkdownNode(content, level_to_append)
            node_stack[-1].add_child(node)
            node_stack.append(node)
            current_level = level
        else:
            # Non-list line - treat as continuation of previous item or as top-level text
            content = line.strip()
            if content and current_level >= 0 and len(node_stack) > 1:
                # Add to the current node's content
                node_stack[-1].content += "\n" + content
            elif content:
                # Create as top-level text
                node = MarkdownNode(content, 0)
                node_stack[0].add_child(node)
                node_stack = [root, node]
                current_level = 0
    
    # Convert the tree to the expected dictionary format with proper hierarchy
    def build_hierarchy(node):
        """Convert a node and its children to a hierarchical dictionary structure."""
        result = {
            "text": node.content,
            "level": node.level
        }
        
        if node.heading_level:
            result["heading_level"] = node.heading_level
            
        if node.children:
            result["children"] = [build_hierarchy(child) for child in node.children]
            
        return result
    
    # Build result with correct hierarchy
    hierarchical_result = []
    for child in root.children:
        hierarchical_result.append(build_hierarchy(child))
    
    # We'll now convert this to the flattened format for backward compatibility
    # while preserving hierarchy information for functions that can use it
    flattened_result = []
    
    def flatten_hierarchy(item, parent_level=-1, path=None):
        """Flatten a hierarchical structure while preserving parent-child information."""
        if path is None:
            path = []
        
        # Get item properties
        text = item["text"]
        level = item.get("level", parent_level + 1)
        heading_level = item.get("heading_level", 0)
        
        # Create the flattened item
        flat_item = {
            "text": text,
            "level": level
        }
        
        if heading_level:
            flat_item["heading_level"] = heading_level
            
        # Add path information for reconstructing hierarchy
        flat_item["_path"] = path.copy()
        
        # Add to results
        flattened_result.append(flat_item)
        
        # Process children
        children = item.get("children", [])
        if children:
            for i, child in enumerate(children):
                child_path = path + [i]
                flatten_hierarchy(child, level, child_path)
    
    # Flatten the hierarchical result
    for i, item in enumerate(hierarchical_result):
        flatten_hierarchy(item, -1, [i])
    
    # We return the flattened result but with _path information
    # for reconstructing hierarchy if needed
    return flattened_result


def convert_roam_dates(text: str) -> str:
    """
    Convert date references to Roam date format.
    
    Args:
        text: Text with potential date references
        
    Returns:
        Text with dates in Roam format
    """
    # Convert ISO dates (YYYY-MM-DD)
    def replace_date(match: Match) -> str:
        date_str = match.group(0)
        try:
            date = datetime.strptime(date_str, "%Y-%m-%d")
            return format_roam_date(date)
        except ValueError:
            return date_str
    
    return re.sub(r'\b\d{4}-\d{2}-\d{2}\b', replace_date, text)


def extract_youtube_video_id(url: str) -> Optional[str]:
    """
    Extract the video ID from a YouTube URL.
    
    Args:
        url: YouTube URL
        
    Returns:
        Video ID or None if not found
    """
    patterns = [
        r"(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})",
        r"youtube\.com\/embed\/([a-zA-Z0-9_-]{11})",
        r"youtube\.com\/v\/([a-zA-Z0-9_-]{11})",
        r"youtube\.com\/user\/[^\/]+\/\?v=([a-zA-Z0-9_-]{11})"
    ]
    
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    
    return None


def detect_url_type(url: str) -> str:
    """
    Detect the type of content a URL points to.
    
    Args:
        url: URL to analyze
        
    Returns:
        Content type: 'youtube', 'pdf', 'webpage', or 'unknown'
    """
    url_lower = url.lower()
    
    # Check for YouTube
    youtube_patterns = [
        r"(?:youtube\.com\/watch\?v=|youtu\.be\/)",
        r"youtube\.com\/embed\/",
        r"youtube\.com\/v\/",
        r"youtube\.com\/user\/[^\/]+\/\?v="
    ]
    for pattern in youtube_patterns:
        if re.search(pattern, url_lower):
            return "youtube"
    
    # Check for PDF
    if url_lower.endswith('.pdf') or '/pdf/' in url_lower:
        return "pdf"
    
    # Default to webpage
    return "webpage"


def create_block_action(parent_uid: str, content: str, order: Union[int, str] = "last", 
                        uid: Optional[str] = None, heading: Optional[int] = None) -> Dict[str, Any]:
    """
    Create a block action for batch operations.
    
    Args:
        parent_uid: UID of the parent block/page
        content: Block content
        order: Position of the block
        uid: Optional UID for the block
        heading: Optional heading level (1-3)
        
    Returns:
        Block action dictionary
    """
    block_data = {
        "string": content
    }
    
    if uid:
        block_data["uid"] = uid
    else:
        # Generate a unique UID if none provided
        block_data["uid"] = str(uuid.uuid4())[:9]
        
    if heading and heading > 0 and heading <= 3:
        block_data["heading"] = heading
    
    action = {
        "action": "create-block",
        "location": {
            "parent-uid": parent_uid,
            "order": order
        },
        "block": block_data
    }
    
    logger.debug(f"Created block action for parent {parent_uid}: {content[:30]}{'...' if len(content) > 30 else ''}")
    return action


def process_nested_content(content: List[Dict], parent_uid: str, session, headers, graph_name: str) -> List[str]:
    """
    Recursively process nested content structure and create blocks.
    
    Args:
        content: List of content items with potential children
        parent_uid: UID of the parent block
        session: Active session for API requests
        headers: Request headers with authentication
        graph_name: Roam graph name
        
    Returns:
        List of created block UIDs
    """
    from roam_mcp.api import execute_batch_actions  # Import here to avoid circular imports
    
    if not content:
        return []
    
    # Sort content by level
    content = sorted(content, key=lambda x: x.get("level", 0))
    
    # Create batch actions
    batch_actions = []
    level_parent_map = {0: parent_uid}
    
    # Process items level by level (top-down)
    for item in content:
        level = item.get("level", 0)
        text = item.get("text", "")
        heading_level = item.get("heading_level", 0)
        
        # Find parent for this level
        parent_level = level - 1
        if parent_level < 0:
            parent_level = 0
            
        parent_for_item = level_parent_map.get(parent_level, parent_uid)
        
        # Create block action
        action = create_block_action(
            parent_uid=parent_for_item,
            content=text,
            order="last",
            heading=heading_level
        )
        
        batch_actions.append(action)
        
        # Add temp ID for this level for child reference
        level_parent_map[level] = f"temp_{len(batch_actions)-1}"
    
    # Execute the batch
    result = execute_batch_actions(batch_actions)
    return result.get("created_uids", [])


def find_block_uid(session, headers, graph_name: str, block_content: str, max_retries: int = 3) -> Optional[str]:
    """
    Search for a block by its content to find its UID with retries.
    
    Args:
        session: Active session for API requests
        headers: Request headers with authentication
        graph_name: Roam graph name
        block_content: Content to search for
        max_retries: Maximum number of retries
        
    Returns:
        Block UID or None if not found
    """
    # Escape quotes in content
    escaped_content = block_content.replace('"', '\\"')
    
    for attempt in range(max_retries):
        search_query = f'''[:find ?uid .
                          :where [?e :block/string "{escaped_content}"]
                                 [?e :block/uid ?uid]]'''
        
        response = session.post(
            f'https://api.roamresearch.com/api/graph/{graph_name}/q',
            headers=headers,
            json={"query": search_query}
        )
        
        if response.status_code == 200 and response.json().get('result'):
            block_uid = response.json()['result']
            return block_uid
            
        # If not found and not the last attempt, wait and retry
        if attempt < max_retries - 1:
            wait_time = 1 * (attempt + 1)  # Exponential backoff
            logger.debug(f"Block not found, retrying in {wait_time}s (attempt {attempt+1}/{max_retries})")
            time.sleep(wait_time)
    
    logger.debug(f"Could not find block UID for content: {block_content[:50]}...")
    return None


def find_page_by_title(session, headers, graph_name: str, title: str) -> Optional[str]:
    """
    Find a page by title, with case-insensitive matching.
    
    Args:
        session: Active session for API requests
        headers: Request headers with authentication
        graph_name: Roam graph name
        title: Page title to search for
        
    Returns:
        Page UID or None if not found
    """
    # Clean up the title
    title = title.strip()
    
    # First try direct page lookup (more reliable than case-insensitive queries in Roam)
    query = f'''[:find ?uid .
                :where [?e :node/title "{title}"]
                        [?e :block/uid ?uid]]'''
    
    response = session.post(
        f'https://api.roamresearch.com/api/graph/{graph_name}/q',
        headers=headers,
        json={"query": query}
    )
    
    if response.status_code == 200 and response.json().get('result'):
        return response.json()['result']
    
    # If not found, try checking if it's a UID
    if len(title) == 9 and re.match(r'^[a-zA-Z0-9_-]{9}$', title):
        # This looks like a UID, check if it's a valid page UID
        uid_query = f'''[:find ?title .
                        :where [?e :block/uid "{title}"]
                                [?e :node/title ?title]]'''
        
        uid_response = session.post(
            f'https://api.roamresearch.com/api/graph/{graph_name}/q',
            headers=headers,
            json={"query": uid_query}
        )
        
        if uid_response.status_code == 200 and uid_response.json().get('result'):
            return title
    
# If still not found, try case-insensitive match by getting all pages
    all_pages_query = f'''[:find ?title ?uid
                         :where [?e :node/title ?title]
                                 [?e :block/uid ?uid]]'''
    
    all_pages_response = session.post(
        f'https://api.roamresearch.com/api/graph/{graph_name}/q',
        headers=headers,
        json={"query": all_pages_query}
    )
    
    if all_pages_response.status_code == 200 and all_pages_response.json().get('result'):
        for page_title, uid in all_pages_response.json()['result']:
            if page_title.lower() == title.lower():
                return uid
    
    return None


def resolve_block_references(session, headers, graph_name: str, content: str, max_depth: int = 3, current_depth: int = 0) -> str:
    """
    Resolve block references in content recursively.
    
    Args:
        session: Active session for API requests
        headers: Request headers with authentication
        graph_name: Roam graph name
        content: Content with potential block references
        max_depth: Maximum recursion depth
        current_depth: Current recursion depth
        
    Returns:
        Content with block references resolved
    """
    if current_depth >= max_depth:
        return content
    
    # Find all block references
    ref_pattern = r'\(\(([a-zA-Z0-9_-]{9})\)\)'
    refs = re.findall(ref_pattern, content)
    
    if not refs:
        return content
    
    # For each reference, get its content
    for ref in refs:
        try:
            query = f'''[:find ?string .
                        :where [?b :block/uid "{ref}"]
                                [?b :block/string ?string]]'''
            
            response = session.post(
                f'https://api.roamresearch.com/api/graph/{graph_name}/q',
                headers=headers,
                json={"query": query}
            )
            
            if response.status_code == 200 and response.json().get('result'):
                ref_content = response.json()['result']
                
                # Recursively resolve nested references
                resolved_ref = resolve_block_references(
                    session, headers, graph_name, 
                    ref_content, max_depth, current_depth + 1
                )
                
                # Replace reference with content
                content = content.replace(f"(({ref}))", resolved_ref)
        except Exception as e:
            logger.warning(f"Failed to resolve reference (({ref})): {str(e)}")
    
    return content
```

--------------------------------------------------------------------------------
/roam_mcp/api.py:
--------------------------------------------------------------------------------

```python
"""Core API functions for interacting with Roam Research."""

import os
import re
import sys
import logging
from typing import Dict, List, Any, Optional, Union, Set, Tuple, Callable
import requests
from datetime import datetime
import json
import time
from functools import wraps

from roam_mcp.utils import (
    format_roam_date,
    find_block_uid,
    find_page_by_title,
    process_nested_content,
    resolve_block_references
)

# Set up logging
logger = logging.getLogger("roam-mcp.api")

# Get API credentials from environment variables
API_TOKEN = os.environ.get("ROAM_API_TOKEN")
GRAPH_NAME = os.environ.get("ROAM_GRAPH_NAME")
MEMORIES_TAG = os.environ.get("MEMORIES_TAG", "#[[Memories]]")

# Validate API credentials
if not API_TOKEN:
    logger.warning("ROAM_API_TOKEN environment variable is not set")
    
if not GRAPH_NAME:
    logger.warning("ROAM_GRAPH_NAME environment variable is not set")


# Enhanced Error Hierarchy
class RoamAPIError(Exception):
    """Base exception for all Roam API errors."""
    def __init__(self, message: str, code: Optional[str] = None, details: Optional[Dict] = None, remediation: Optional[str] = None):
        self.message = message
        self.code = code or "UNKNOWN_ERROR"
        self.details = details or {}
        self.remediation = remediation
        super().__init__(self._format_message())
        
    def _format_message(self) -> str:
        msg = f"{self.code}: {self.message}"
        if self.details:
            msg += f" - Details: {json.dumps(self.details)}"
        if self.remediation:
            msg += f" - Suggestion: {self.remediation}"
        return msg


class AuthenticationError(RoamAPIError):
    """Exception raised for authentication errors."""
    def __init__(self, message: str, details: Optional[Dict] = None):
        super().__init__(
            message=message,
            code="AUTH_ERROR",
            details=details,
            remediation="Check your API token and graph name in environment variables."
        )


class PageNotFoundError(RoamAPIError):
    """Exception raised when a page cannot be found."""
    def __init__(self, title: str, details: Optional[Dict] = None):
        super().__init__(
            message=f"Page '{title}' not found",
            code="PAGE_NOT_FOUND",
            details=details,
            remediation="Check the page title for typos or create the page first."
        )


class BlockNotFoundError(RoamAPIError):
    """Exception raised when a block cannot be found."""
    def __init__(self, uid: str, details: Optional[Dict] = None):
        super().__init__(
            message=f"Block with UID '{uid}' not found",
            code="BLOCK_NOT_FOUND",
            details=details,
            remediation="Check the block UID for accuracy."
        )


class ValidationError(RoamAPIError):
    """Exception raised for input validation errors."""
    def __init__(self, message: str, param: str, details: Optional[Dict] = None):
        super().__init__(
            message=message,
            code="VALIDATION_ERROR",
            details={"parameter": param, **(details or {})},
            remediation="Check the input parameters and correct the formatting."
        )


class QueryError(RoamAPIError):
    """Exception raised for query execution errors."""
    def __init__(self, message: str, query: str, details: Optional[Dict] = None):
        super().__init__(
            message=message,
            code="QUERY_ERROR",
            details={"query": query, **(details or {})},
            remediation="Check the query syntax or parameters."
        )


class RateLimitError(RoamAPIError):
    """Exception raised when rate limits are exceeded."""
    def __init__(self, message: str, details: Optional[Dict] = None):
        super().__init__(
            message=message,
            code="RATE_LIMIT_ERROR",
            details=details,
            remediation="Retry after a delay or reduce the request frequency."
        )


class TransactionError(RoamAPIError):
    """Exception raised for transaction failures."""
    def __init__(self, message: str, action_type: str, details: Optional[Dict] = None):
        super().__init__(
            message=message,
            code="TRANSACTION_ERROR",
            details={"action_type": action_type, **(details or {})},
            remediation="Check the action data or retry the operation."
        )


class PreserveAuthSession(requests.Session):
    """Session class that preserves authentication headers during redirects."""
    def rebuild_auth(self, prepared_request, response):
        """Preserve the Authorization header on redirects."""
        return


# Retry decorator for API calls
def retry_on_error(max_retries=3, base_delay=1, backoff_factor=2, retry_on=(RateLimitError, requests.exceptions.RequestException)):
    """
    Decorator to retry API calls with exponential backoff.
    
    Args:
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay in seconds
        backoff_factor: Multiplier for delay on each retry
        retry_on: Tuple of exception types to retry on
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while True:
                try:
                    return func(*args, **kwargs)
                except retry_on as e:
                    retries += 1
                    if retries > max_retries:
                        logger.error(f"Maximum retries ({max_retries}) exceeded: {str(e)}")
                        raise
                    
                    delay = base_delay * (backoff_factor ** (retries - 1))
                    logger.warning(f"Retrying after error: {str(e)}. Attempt {retries}/{max_retries} in {delay:.2f}s")
                    time.sleep(delay)
        return wrapper
    return decorator


def validate_credentials():
    """
    Validate that required API credentials are set.
    
    Raises:
        AuthenticationError: If required credentials are missing
    """
    if not API_TOKEN or not GRAPH_NAME:
        missing = []
        if not API_TOKEN:
            missing.append("ROAM_API_TOKEN")
        if not GRAPH_NAME:
            missing.append("ROAM_GRAPH_NAME")
            
        raise AuthenticationError(
            f"Missing required credentials: {', '.join(missing)}",
            {"missing": missing}
        )


def get_session_and_headers() -> Tuple[requests.Session, Dict[str, str]]:
    """
    Create a session with authentication headers.
    
    Returns:
        Tuple of (session, headers)
    
    Raises:
        AuthenticationError: If required environment variables are missing
    """
    validate_credentials()
    
    session = PreserveAuthSession()
    headers = {
        "Accept": "application/json",
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
    }
    
    return session, headers


@retry_on_error()
def execute_query(query: str, inputs: Optional[List[Any]] = None) -> Any:
    """
    Execute a Datalog query against the Roam graph.
    
    Args:
        query: Datalog query string
        inputs: Optional list of query inputs
        
    Returns:
        Query results
        
    Raises:
        QueryError: If the query fails
        AuthenticationError: If authentication fails
        RateLimitError: If rate limits are exceeded
    """
    validate_credentials()
    session, headers = get_session_and_headers()
    
    # Prepare query data
    data = {
        "query": query,
    }
    if inputs:
        data["inputs"] = inputs
    
    # Log query (without inputs for security)
    logger.debug(f"Executing query: {query}")
    
    # Execute query
    try:
        response = session.post(
            f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/q',
            headers=headers,
            json=data
        )
        
        if response.status_code == 401:
            raise AuthenticationError("Authentication failed", {"status_code": response.status_code})
        
        if response.status_code == 429:
            raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code})
        
        response.raise_for_status()
        result = response.json().get('result')
        
        # Log result size
        if isinstance(result, list):
            logger.debug(f"Query returned {len(result)} results")
            
        return result
    except requests.RequestException as e:
        error_msg = f"Query failed: {str(e)}"
        error_details = {}
        
        if hasattr(e, 'response') and e.response:
            error_details["status_code"] = e.response.status_code
            try:
                error_details["response"] = e.response.json()
            except:
                error_details["response_text"] = e.response.text[:500]
        
        # Classify error based on status code if available
        if hasattr(e, 'response') and e.response:
            if e.response.status_code == 401:
                raise AuthenticationError("Authentication failed", error_details) from e
            elif e.response.status_code == 429:
                raise RateLimitError("Rate limit exceeded", error_details) from e
        
        logger.error(error_msg, extra={"details": error_details})
        raise QueryError(error_msg, query, error_details) from e


@retry_on_error()
def execute_write_action(action_data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]:
    """
    Execute a write action or a batch of actions on the Roam graph.
    
    Args:
        action_data: The action data to write or a list of actions for batch operation
        
    Returns:
        Response data
        
    Raises:
        TransactionError: If the write action fails
        AuthenticationError: If authentication fails
        RateLimitError: If rate limits are exceeded
    """
    validate_credentials()
    session, headers = get_session_and_headers()
    
    # Check if it's a batch operation or single action
    is_batch = isinstance(action_data, list)
    
    # If it's a batch operation, wrap it in a batch container
    if is_batch:
        # Log batch size
        logger.debug(f"Executing batch write action with {len(action_data)} operations")
        
        # Group operations by type for debugging
        action_types = {}
        for action in action_data:
            action_type = action.get("action", "unknown")
            if action_type in action_types:
                action_types[action_type] += 1
            else:
                action_types[action_type] = 1
                
        logger.debug(f"Batch operation types: {action_types}")
        
        # Prepare batch action
        batch_data = {
            "action": "batch-actions",
            "actions": action_data
        }
        
        action_type = "batch-actions"
        operation_data = batch_data
    else:
        # Log action type
        action_type = action_data.get("action", "unknown")
        logger.debug(f"Executing write action: {action_type}")
        operation_data = action_data
    
    # Debug log the operation data
    logger.debug(f"Sending data: {json.dumps(operation_data)[:100]}...")
    
    # Execute action
    try:
        response = session.post(
            f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/write',
            headers=headers,
            json=operation_data  # Use json parameter for proper JSON encoding
        )
        
        logger.debug(f"Status code: {response.status_code}")
        logger.debug(f"Response headers: {dict(response.headers)}")
        
        if response.status_code == 401:
            raise AuthenticationError("Authentication failed", {"status_code": response.status_code})
        
        if response.status_code == 429:
            raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code})
        
        # Special handling for empty responses
        if response.status_code == 200 and not response.text:
            logger.debug("Received empty response with status 200 (success)")
            return {"success": True}
        
        response.raise_for_status()
        
        # Try to parse JSON response
        try:
            result = response.json()
            logger.debug(f"Response: {json.dumps(result)[:500]}")
            
            # Success even with error message for batch operations that partly succeed
            if "batch-error-message" in result and "num-actions-successfully-transacted-before-failure" in result:
                num_success = result.get("num-actions-successfully-transacted-before-failure", 0)
                logger.debug(f"Batch partially succeeded with {num_success} actions before failure")
                return result
            
            return result
        except json.JSONDecodeError:
            # Some successful operations return empty responses
            if 200 <= response.status_code < 300:
                logger.debug("Success with non-JSON response")
                return {"success": True}
            else:
                logger.debug(f"Failed to parse response as JSON: {response.text[:500]}")
                raise TransactionError(
                    f"Failed to parse response as JSON",
                    action_type,
                    {"response_text": response.text[:500]}
                )
            
    except requests.RequestException as e:
        error_details = {}
        
        if hasattr(e, 'response') and e.response:
            error_details["status_code"] = e.response.status_code
            try:
                error_details["response"] = e.response.json()
            except:
                error_details["response_text"] = e.response.text[:500]
        
        # Classify error based on status code if available
        if hasattr(e, 'response') and e.response:
            if e.response.status_code == 401:
                raise AuthenticationError("Authentication failed", error_details) from e
            elif e.response.status_code == 429:
                raise RateLimitError("Rate limit exceeded", error_details) from e
        
        error_msg = f"Write action failed: {str(e)}"
        logger.error(error_msg, extra={"details": error_details})
        raise TransactionError(error_msg, action_type, error_details) from e


def execute_batch_actions(actions: List[Dict[str, Any]], chunk_size: int = 50) -> Dict[str, Any]:
    """
    Execute a batch of actions, optionally chunking into multiple requests.
    
    Args:
        actions: List of actions to execute
        chunk_size: Maximum number of actions per request
        
    Returns:
        Combined results of all batch operations
        
    Raises:
        TransactionError: If any batch fails
    """
    if not actions:
        return {"success": True, "created_uids": []}
    
    # Single batch if under chunk size
    if len(actions) <= chunk_size:
        result = execute_write_action(actions)
        
        # Check for tempids-to-uids mapping in response
        if "tempids-to-uids" in result:
            return {"success": True, "created_uids": list(result["tempids-to-uids"].values())}
        elif "successful" in result and result["successful"]:
            return {"success": True, "created_uids": []}
        else:
            return result
    
    # Split into chunks for larger batches
    chunks = [actions[i:i + chunk_size] for i in range(0, len(actions), chunk_size)]
    logger.debug(f"Splitting batch operation into {len(chunks)} chunks of max {chunk_size} actions")
    
    # Track results across chunks
    combined_results = {
        "created_uids": [],
        "success": True
    }
    
    # Track temporary and real UIDs for parent-child relationships
    temp_uid_map = {}
    
    # Execute each chunk
    for i, chunk in enumerate(chunks):
        logger.debug(f"Executing batch chunk {i+1}/{len(chunks)} with {len(chunk)} actions")
        
        # Update parent UIDs with real UIDs from previous chunks
        if i > 0 and temp_uid_map:
            for action in chunk:
                if action["action"] == "create-block":
                    parent_uid = action["location"]["parent-uid"]
                    if parent_uid.startswith("temp_") and parent_uid in temp_uid_map:
                        action["location"]["parent-uid"] = temp_uid_map[parent_uid]
        
        result = execute_write_action(chunk)
        
        # Collect UIDs from this chunk
        created_uids = []
        if "tempids-to-uids" in result:
            created_uids = list(result["tempids-to-uids"].values())
        
        if created_uids:
            # Map temp UIDs to real UIDs for next chunks
            if i < len(chunks) - 1:
                for j, uid in enumerate(created_uids):
                    temp_key = f"temp_{i}_{j}"
                    temp_uid_map[temp_key] = uid
            
            combined_results["created_uids"].extend(created_uids)
        
        # Add delay between batches to ensure ordering
        if i < len(chunks) - 1:
            time.sleep(0.5)
    
    return combined_results


def find_or_create_page(title: str) -> str:
    """
    Find a page by title or create it if it doesn't exist.
    
    Args:
        title: Page title
        
    Returns:
        Page UID
        
    Raises:
        TransactionError: If page creation fails
        ValidationError: If title is invalid
        AuthenticationError: If authentication fails
    """
    validate_credentials()
    session, headers = get_session_and_headers()
    
    # Validate title
    if not title or not isinstance(title, str):
        raise ValidationError("Page title must be a non-empty string", "title")
    
    title = title.strip()
    if not title:
        raise ValidationError("Page title cannot be empty or just whitespace", "title")
    
    # Try to find the page first
    logger.debug(f"Looking for page: {title}")
    query = f'''[:find ?uid .
              :where [?e :node/title "{title}"]
                     [?e :block/uid ?uid]]'''
    
    page_uid = execute_query(query)
    
    if page_uid:
        logger.debug(f"Found existing page: {title} (UID: {page_uid})")
        return page_uid
    
    # Create the page if it doesn't exist
    logger.debug(f"Creating new page: {title}")
    action_data = {
        "action": "create-page",
        "page": {"title": title}
    }
    
    try:
        response = execute_write_action(action_data)
        
        if response.get("success", False):
            # Wait a moment for the page to be created
            time.sleep(1)
            
            # Try to find the page again
            page_uid = execute_query(query)
            if page_uid:
                logger.debug(f"Created page: {title} (UID: {page_uid})")
                return page_uid
                
            # If still not found, try one more time with a longer delay
            time.sleep(2)
            page_uid = execute_query(query)
            if page_uid:
                logger.debug(f"Found newly created page: {title} (UID: {page_uid})")
                return page_uid
            
        # If we get here, something went wrong
        error_msg = f"Failed to create page: {title}"
        logger.error(error_msg)
        raise TransactionError(error_msg, "create-page", {"title": title, "response": response})
    except TransactionError:
        # Rethrow existing TransactionError
        raise
    except Exception as e:
        error_msg = f"Failed to create page: {title}"
        logger.error(error_msg)
        raise TransactionError(error_msg, "create-page", {"title": title, "error": str(e)}) from e


def get_daily_page() -> str:
    """
    Get or create today's daily page.
    
    Returns:
        Daily page UID
        
    Raises:
        TransactionError: If page creation fails
    """
    today = datetime.now()
    date_str = format_roam_date(today)
    
    logger.debug(f"Getting daily page for: {date_str}")
    return find_or_create_page(date_str)


def add_block_to_page(page_uid: str, content: str, order: Union[int, str] = "last") -> Optional[str]:
    """
    Add a block to a page.
    
    Args:
        page_uid: Parent page UID
        content: Block content
        order: Position ("first", "last", or integer index)
        
    Returns:
        New block UID or None if creation failed
        
    Raises:
        BlockNotFoundError: If page does not exist
        ValidationError: If parameters are invalid
        TransactionError: If block creation fails
    """
    # Validate parameters
    if not page_uid:
        raise ValidationError("Parent page UID is required", "page_uid")
    
    if not content:
        raise ValidationError("Block content cannot be empty", "content")
    
    # Generate a unique block UID
    import uuid
    block_uid = str(uuid.uuid4())[:9]
    
    action_data = {
        "action": "create-block",
        "location": {
            "parent-uid": page_uid,
            "order": order
        },
        "block": {
            "string": content,
            "uid": block_uid
        }
    }
    
    logger.debug(f"Adding block to page {page_uid}")
    try:
        result = execute_write_action(action_data)
        
        if result.get("success", False):
            # Add a brief delay to ensure the block is created
            time.sleep(1)
            
            # Verify the block exists
            session, headers = get_session_and_headers()
            found_uid = find_block_uid(session, headers, GRAPH_NAME, content)
            
            if found_uid:
                logger.debug(f"Created block with UID: {found_uid}")
                return found_uid
            
            # If we couldn't find the UID by content, return the one we generated
            logger.debug(f"Block created but couldn't verify, returning generated UID: {block_uid}")
            return block_uid
        else:
            logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}")
            return None
    except Exception as e:
        if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)):
            raise
        
        error_msg = f"Failed to create block: {str(e)}"
        logger.error(error_msg)
        raise TransactionError(error_msg, "create-block", {"page_uid": page_uid}) from e


def update_block(block_uid: str, content: str) -> bool:
    """
    Update a block's content.
    
    Args:
        block_uid: Block UID
        content: New content
        
    Returns:
        Success flag
        
    Raises:
        BlockNotFoundError: If block does not exist
        ValidationError: If parameters are invalid
        TransactionError: If block update fails
    """
    # Validate parameters
    if not block_uid:
        raise ValidationError("Block UID is required", "block_uid")
    
    if content is None:
        raise ValidationError("Block content cannot be None", "content")
    
    action_data = {
        "action": "update-block",
        "block": {
            "uid": block_uid,
            "string": content
        }
    }
    
    logger.debug(f"Updating block: {block_uid}")
    try:
        execute_write_action(action_data)
        return True
    except Exception as e:
        if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)):
            raise
            
        error_msg = f"Failed to update block: {str(e)}"
        logger.error(error_msg)
        raise TransactionError(error_msg, "update-block", {"block_uid": block_uid}) from e


def transform_block(block_uid: str, find_pattern: str, replace_with: str, global_replace: bool = True) -> str:
    """
    Transform a block's content using regex pattern replacement.
    
    Args:
        block_uid: Block UID
        find_pattern: Regex pattern to find
        replace_with: Text to replace with
        global_replace: Whether to replace all occurrences
        
    Returns:
        Updated content
        
    Raises:
        BlockNotFoundError: If block does not exist
        ValidationError: If parameters are invalid
        QueryError: If block retrieval fails
        TransactionError: If block update fails
    """
    # Validate parameters
    if not block_uid:
        raise ValidationError("Block UID is required", "block_uid")
    
    if not find_pattern:
        raise ValidationError("Find pattern cannot be empty", "find_pattern")
    
    # First get the current content
    query = f'''[:find ?string .
                :where [?b :block/uid "{block_uid}"]
                        [?b :block/string ?string]]'''
    
    logger.debug(f"Getting content for block: {block_uid}")
    try:
        current_content = execute_query(query)
        
        if not current_content:
            raise BlockNotFoundError(block_uid)
        
        # Apply transformation
        logger.debug(f"Transforming block {block_uid} with pattern: {find_pattern}")
        flags = re.MULTILINE
        count = 0 if global_replace else 1
        
        try:
            new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags)
        except re.error as e:
            raise ValidationError(f"Invalid regex pattern: {str(e)}", "find_pattern", {"pattern": find_pattern})
        
        # Update the block
        update_block(block_uid, new_content)
        
        return new_content
    except (BlockNotFoundError, ValidationError, QueryError, TransactionError):
        # Rethrow existing errors
        raise
    except Exception as e:
        error_msg = f"Failed to transform block: {str(e)}"
        logger.error(error_msg)
        raise TransactionError(error_msg, "transform-block", {"block_uid": block_uid}) from e


def batch_update_blocks(updates: List[Dict[str, Any]], chunk_size: int = 50) -> List[Dict[str, Any]]:
    """
    Update multiple blocks in a single operation.
    
    Args:
        updates: List of update operations
        chunk_size: Maximum number of actions per batch
        
    Returns:
        List of results
        
    Raises:
        ValidationError: If updates are not valid
    """
    if not isinstance(updates, list):
        raise ValidationError("Updates must be a list", "updates")
    
    if not updates:
        return []
    
    session, headers = get_session_and_headers()
    results = []
    batch_actions = []
    
    logger.debug(f"Batch updating {len(updates)} blocks")
    
    # Validate each update and prepare batch actions
    for i, update in enumerate(updates):
        try:
            block_uid = update.get("block_uid")
            if not block_uid:
                results.append({"success": False, "error": "Missing block_uid"})
                continue
                
            # Check block exists
            query = f'''[:find ?string .
                        :where [?b :block/uid "{block_uid}"]
                                [?b :block/string ?string]]'''
            
            current_content = execute_query(query)
            if not current_content:
                results.append({
                    "success": False,
                    "block_uid": block_uid,
                    "error": f"Block with UID {block_uid} not found"
                })
                continue
            
            # Handle direct content update
            if "content" in update:
                batch_actions.append({
                    "action": "update-block",
                    "block": {
                        "uid": block_uid,
                        "string": update["content"]
                    }
                })
                
                results.append({
                    "success": True,
                    "block_uid": block_uid,
                    "content": update["content"]
                })
            # Handle pattern transformation
            elif "transform" in update:
                transform = update["transform"]
                
                try:
                    find_pattern = transform["find"]
                    replace_with = transform["replace"]
                    global_replace = transform.get("global", True)
                    
                    # Apply transformation
                    flags = re.MULTILINE
                    count = 0 if global_replace else 1
                    new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags)
                    
                    batch_actions.append({
                        "action": "update-block",
                        "block": {
                            "uid": block_uid,
                            "string": new_content
                        }
                    })
                    
                    results.append({
                        "success": True,
                        "block_uid": block_uid,
                        "content": new_content
                    })
                except re.error as e:
                                    results.append({
                                        "success": False,
                                        "block_uid": block_uid,
                                        "error": f"Invalid regex pattern: {str(e)}"
                    })
                except KeyError as e:
                    results.append({
                        "success": False,
                        "block_uid": block_uid,
                        "error": f"Missing required transform key: {str(e)}"
                    })
            else:
                results.append({
                    "success": False,
                    "block_uid": block_uid,
                    "error": "Neither content nor transform provided"
                })
        except Exception as e:
            logger.error(f"Error preparing update for block {update.get('block_uid', 'unknown')}: {str(e)}")
            results.append({
                "success": False,
                "block_uid": update.get("block_uid", "unknown"),
                "error": str(e)
            })
    
    # Execute batch updates if we have any valid actions
    if batch_actions:
        try:
            execute_batch_actions(batch_actions, chunk_size)
        except Exception as e:
            logger.error(f"Error executing batch update: {str(e)}")
            # Mark all previously successful results as failed
            for result in results:
                if result.get("success"):
                    result["success"] = False
                    result["error"] = f"Batch update failed: {str(e)}"
    
    # Log success rate
    successful = sum(1 for r in results if r.get("success"))
    logger.debug(f"Batch update completed: {successful}/{len(updates)} successful")
    
    return results


def get_page_content(title: str, resolve_refs: bool = True, max_depth: int = 5) -> str:
    """
    Get the content of a page with optional block reference resolution.
    
    Args:
        title: Page title
        resolve_refs: Whether to resolve block references
        max_depth: Maximum depth of nested blocks to retrieve (default: 5)
        
    Returns:
        Page content as markdown
        
    Raises:
        PageNotFoundError: If page retrieval fails
        QueryError: If query execution fails
    """
    session, headers = get_session_and_headers()
    
    # First find the page UID
    logger.debug(f"Getting content for page: {title}")
    page_uid = find_page_by_title(session, headers, GRAPH_NAME, title)
    
    if not page_uid:
        raise PageNotFoundError(title)
    
    # Build block hierarchy iteratively
    block_map = {}
    top_level_blocks = []
    
    # Query to get immediate children of a parent (page or block)
    def get_children(parent_uid: str, depth: int = 0) -> None:
        if depth >= max_depth:
            return
        
        query = f"""[:find ?uid ?string ?order
                    :where
                    [?parent :block/uid "{parent_uid}"]
                    [?parent :block/children ?child]
                    [?child :block/uid ?uid]
                    [?child :block/string ?string]
                    [?child :block/order ?order]]"""
        
        try:
            results = execute_query(query)
            if not results:
                return
            
            for uid, content, order in results:
                # Resolve references if requested
                if resolve_refs:
                    content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                # Create block object
                block = {
                    "uid": uid,
                    "content": content,
                    "order": order,
                    "children": []
                }
                
                block_map[uid] = block
                
                # Add to top-level or parent's children
                if parent_uid == page_uid:
                    top_level_blocks.append(block)
                elif parent_uid in block_map:
                    block_map[parent_uid]["children"].append(block)
                
                # Recursively fetch children
                get_children(uid, depth + 1)
                
        except QueryError as e:
            logger.warning(f"Failed to fetch children for {parent_uid}: {str(e)}")
            raise
    
    try:
        # Start with the page's top-level blocks
        get_children(page_uid)
        
        if not top_level_blocks:
            logger.debug(f"No content found on page: {title}")
            return f"# {title}\n\nNo content found on this page."
        
        # Sort blocks by order
        def sort_blocks(blocks):
            blocks.sort(key=lambda b: b["order"])
            for block in blocks:
                sort_blocks(block["children"])
        
        sort_blocks(top_level_blocks)
        
        # Convert to markdown
        markdown = f"# {title}\n\n"
        
        def blocks_to_md(blocks, level=0):
            result = ""
            for block in blocks:
                indent = "  " * level
                result += f"{indent}- {block['content']}\n"
                if block["children"]:
                    result += blocks_to_md(block["children"], level + 1)
            return result
        
        markdown += blocks_to_md(top_level_blocks)
        
        logger.debug(f"Retrieved page content for: {title}")
        return markdown
    except QueryError:
        # Rethrow existing QueryError
        raise
    except Exception as e:
        error_msg = f"Failed to get page content: {str(e)}"
        logger.error(error_msg)
        raise QueryError(error_msg, "Iterative child fetch", {"page_title": title, "page_uid": page_uid}) from e
```

--------------------------------------------------------------------------------
/roam_mcp/server.py:
--------------------------------------------------------------------------------

```python
"""Core server module for Roam MCP server."""

import os
import sys
import logging
import traceback
from typing import Dict, List, Any, Optional, Union
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled
from mcp.server.fastmcp import FastMCP
from datetime import datetime

# Import operations
from roam_mcp.api import (
    API_TOKEN,
    GRAPH_NAME,
    MEMORIES_TAG,
    get_page_content,
    ValidationError,
    QueryError,
    PageNotFoundError,
    BlockNotFoundError,
    TransactionError,
    AuthenticationError,
    RateLimitError
)
from roam_mcp.search import (
    search_by_text,
    search_by_tag,
    search_by_status,
    search_block_refs,
    search_hierarchy,
    search_by_date,
    find_pages_modified_today,
    execute_datomic_query
)
from roam_mcp.content import (
    create_page,
    create_block,
    create_outline,
    import_markdown,
    add_todos,
    update_content,
    update_multiple_contents
)
from roam_mcp.memory import (
    remember,
    recall
)
from roam_mcp.utils import (
    extract_youtube_video_id,
    detect_url_type
)
from roam_mcp.content_parsers import parse_webpage, parse_pdf

# Initialize FastMCP server
mcp = FastMCP("roam-research")

# Configure logging
logger = logging.getLogger("roam-mcp")


def setup_logging(verbose=False):
    """Configure logging with appropriate level of detail."""
    log_level = logging.DEBUG if verbose else logging.INFO
    
    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(log_level)
    
    # Clear any existing handlers
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    
    # Add console handler
    console_handler = logging.StreamHandler(sys.stderr)
    console_handler.setLevel(log_level)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)


def validate_environment():
    """Validate that required environment variables are set."""
    if not API_TOKEN or not GRAPH_NAME:
        missing = []
        if not API_TOKEN:
            missing.append("ROAM_API_TOKEN")
        if not GRAPH_NAME:
            missing.append("ROAM_GRAPH_NAME")
            
        error_msg = f"""
Missing required environment variables: {', '.join(missing)}

Please configure these variables either:
1. In your MCP settings file:
   - For Claude: ~/Library/Application Support/Claude/claude_desktop_config.json
   - For Cline: ~/Library/Application Support/Code/User/globalStorage/saoudrizwan.claude-dev/settings/cline_mcp_settings.json

   Example configuration:
   {{
     "mcpServers": {{
       "roam-helper": {{
         "command": "uvx",
         "args": ["git+https://github.com/PhiloSolares/roam-mcp.git"],
         "env": {{
           "ROAM_API_TOKEN": "your-api-token",
           "ROAM_GRAPH_NAME": "your-graph-name"
         }}
       }}
     }}
   }}

2. Or in a .env file in the roam-mcp directory:
   ROAM_API_TOKEN=your-api-token
   ROAM_GRAPH_NAME=your-graph-name
"""
        logger.error(error_msg)
        return False
    
    return True


def format_error_response(error: Exception) -> str:
    """Format an error for user-friendly display."""
    if isinstance(error, ValidationError):
        return f"Validation error: {str(error)}"
    elif isinstance(error, PageNotFoundError):
        return f"Page not found: {str(error)}"
    elif isinstance(error, BlockNotFoundError):
        return f"Block not found: {str(error)}"
    elif isinstance(error, QueryError):
        return f"Query error: {str(error)}"
    elif isinstance(error, TransactionError):
        return f"Transaction error: {str(error)}"
    elif isinstance(error, AuthenticationError):
        return f"Authentication error: {str(error)}"
    elif isinstance(error, RateLimitError):
        return f"Rate limit exceeded: {str(error)}"
    else:
        return f"Error: {str(error)}"


@mcp.tool()
async def search_roam(search_terms: List[str]) -> str:
    """Search Roam database for content containing the specified terms.

    Args:
        search_terms: List of keywords to search for
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not search_terms:
            return "Please provide at least one search term"
        
        all_results = []
        for term in search_terms:
            result = search_by_text(term)
            if result["success"]:
                all_results.extend(result["matches"])
        
        # Limit to 3000 words
        word_count = 0
        max_word_count = 3000
        filtered_results = []
        
        for match in all_results:
            content = match["content"]
            block_word_count = len(content.split())
            
            if word_count + block_word_count <= max_word_count:
                filtered_results.append(f"Page: {match.get('page_title', 'Unknown')}\n{content}")
                word_count += block_word_count
            else:
                break
        
        if not filtered_results:
            return f"No results found for terms: {', '.join(search_terms)}"
            
        return "\n\n".join(filtered_results)
    except Exception as e:
        logger.error(f"Error searching Roam: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_fetch_page_by_title(title: str) -> str:
    """Retrieve complete page contents by exact title, including all nested blocks and resolved block references.

    Args:
        title: Title of the page
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not title:
            return "Error: title is required"
        
        content = get_page_content(title)
        return content
    except Exception as e:
        logger.error(f"Error fetching page: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_create_page(title: str, content: Optional[List[Dict[str, Any]]] = None) -> str:
    """Create a new page in Roam Research with optional content using explicit nesting levels.

    Args:
        title: Title of the new page
        content: Initial content for the page as an array of blocks with explicit nesting levels.
               Each block must have a 'text' field with the content as a string.
               Example:
               [
                 {"text": "Heading", "level": 0},
                 {"text": "Bullet point", "level": 1},
                 {"text": "Another point", "level": 1, "children": [
                   {"text": "Nested point", "level": 2}
                 ]}
               ]
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not title:
            return "Error: title is required"
        
        result = create_page(title, content)
        if result["success"]:
            return f"Page created successfully: {result['page_url']}"
        else:
            return f"Error creating page: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error creating page: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_create_block(content: str, page_uid: Optional[str] = None, title: Optional[str] = None) -> str:
    """Add a new block to an existing Roam page. If no page specified, adds to today's daily note.

    Args:
        content: Content of the block
        page_uid: Optional: UID of the page to add block to
        title: Optional: Title of the page to add block to
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not content:
            return "Error: content is required"
        
        result = create_block(content, page_uid, title)
        if result["success"]:
            block_uid = result.get("block_uid", "unknown")
            parent_uid = result.get("parent_uid", "unknown")
            return f"Block created successfully with UID: {block_uid} under parent: {parent_uid}"
        else:
            return f"Error creating block: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error creating block: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_create_outline(outline: List[Dict[str, Any]], page_title_uid: Optional[str] = None, block_text_uid: Optional[str] = None) -> str:
    """Add a structured outline to an existing page or block with customizable nesting levels.

    Args:
        outline: Array of outline items with block text and explicit nesting level
        page_title_uid: Title or UID of the page. Leave blank to use the default daily page
        block_text_uid: A title heading for the outline or the UID of the block under which content will be nested
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not outline:
            return "Error: outline is required and cannot be empty"
        
        result = create_outline(outline, page_title_uid, block_text_uid)
        if result["success"]:
            created_count = len(result.get("created_uids", []))
            page_uid = result.get("page_uid", "unknown")
            parent_uid = result.get("parent_uid", "unknown")
            return f"Outline created successfully with {created_count} blocks on page {page_uid} under parent {parent_uid}"
        else:
            return f"Error creating outline: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error creating outline: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_import_markdown(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None,
                            parent_uid: Optional[str] = None, parent_string: Optional[str] = None, 
                            order: str = "last") -> str:
    """Import nested markdown content into Roam under a specific block.

    Args:
        content: Nested markdown content to import
        page_uid: Optional: UID of the page containing the parent block
        page_title: Optional: Title of the page containing the parent block
        parent_uid: Optional: UID of the parent block to add content under
        parent_string: Optional: Exact string content of the parent block to add content under
        order: Optional: Where to add the content under the parent ("first" or "last")
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not content:
            return "Error: content is required and cannot be empty"
        
        result = import_markdown(content, page_uid, page_title, parent_uid, parent_string, order)
        if result["success"]:
            created_count = len(result.get("created_uids", []))
            page_uid = result.get("page_uid", "unknown")
            parent_uid = result.get("parent_uid", "unknown")
            return f"Markdown imported successfully with {created_count} blocks on page {page_uid} under parent {parent_uid}"
        else:
            return f"Error importing markdown: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error importing markdown: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_add_todo(todos: List[str]) -> str:
    """Add a list of todo items as individual blocks to today's daily page in Roam.

    Args:
        todos: List of todo items to add
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not todos:
            return "Error: todos list cannot be empty"
        
        result = add_todos(todos)
        if result["success"]:
            return f"Added {len(todos)} todo items to today's daily page"
        else:
            return f"Error adding todos: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error adding todos: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_search_for_tag(primary_tag: str, page_title_uid: Optional[str] = None, near_tag: Optional[str] = None) -> str:
    """Search for blocks containing a specific tag and optionally filter by blocks that also contain another tag nearby.

    Args:
        primary_tag: The main tag to search for (without the [[ ]] brackets)
        page_title_uid: Optional: Title or UID of the page to search in
        near_tag: Optional: Another tag to filter results by - will only return blocks where both tags appear
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not primary_tag:
            return "Error: primary_tag is required"
        
        result = search_by_tag(primary_tag, page_title_uid, near_tag)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
                formatted += f"- {match['content']}{page_info}\n"
            
            return formatted
        else:
            return f"Error searching for tag: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error searching for tag: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_search_by_status(status: str, page_title_uid: Optional[str] = None, 
                              include: Optional[str] = None, exclude: Optional[str] = None) -> str:
    """Search for blocks with a specific status (TODO/DONE) across all pages or within a specific page.

    Args:
        status: Status to search for (TODO or DONE)
        page_title_uid: Optional: Title or UID of the page to search in
        include: Optional: Comma-separated list of terms to filter results by inclusion
        exclude: Optional: Comma-separated list of terms to filter results by exclusion
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not status or status not in ["TODO", "DONE"]:
            return "Error: status must be either 'TODO' or 'DONE'"
        
        result = search_by_status(status, page_title_uid, include, exclude)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
                formatted += f"- {match['content']}{page_info}\n"
            
            return formatted
        else:
            return f"Error searching by status: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error searching by status: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_search_block_refs(block_uid: Optional[str] = None, page_title_uid: Optional[str] = None) -> str:
    """Search for block references within a page or across the entire graph.

    Args:
        block_uid: Optional: UID of the block to find references to
        page_title_uid: Optional: Title or UID of the page to search in
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        result = search_block_refs(block_uid, page_title_uid)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
                formatted += f"- {match['content']}{page_info}\n"
            
            return formatted
        else:
            return f"Error searching block references: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error searching block references: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_search_hierarchy(parent_uid: Optional[str] = None, child_uid: Optional[str] = None,
                              page_title_uid: Optional[str] = None, max_depth: int = 1) -> str:
    """Search for parent or child blocks in the block hierarchy.

    Args:
        parent_uid: Optional: UID of the block to find children of
        child_uid: Optional: UID of the block to find parents of
        page_title_uid: Optional: Title or UID of the page to search in
        max_depth: Optional: How many levels deep to search (default: 1)
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not parent_uid and not child_uid:
            return "Error: Either parent_uid or child_uid must be provided"
        
        result = search_hierarchy(parent_uid, child_uid, page_title_uid, max_depth)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
                depth_info = f" (depth: {match['depth']})"
                formatted += f"- {match['content']}{page_info}{depth_info}\n"
            
            return formatted
        else:
            return f"Error searching hierarchy: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error searching hierarchy: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_find_pages_modified_today(max_num_pages: int = 50) -> str:
    """Find pages that have been modified today (since midnight).

    Args:
        max_num_pages: Max number of pages to retrieve (default: 50)
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if max_num_pages < 1:
            return "Error: max_num_pages must be at least 1"
        
        result = find_pages_modified_today(max_num_pages)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for page in result["pages"]:
                formatted += f"- {page}\n"
            
            return formatted
        else:
            return f"Error finding modified pages: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error finding modified pages: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_search_by_text(text: str, page_title_uid: Optional[str] = None) -> str:
    """Search for blocks containing specific text across all pages or within a specific page.

    Args:
        text: The text to search for
        page_title_uid: Optional: Title or UID of the page to search in
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not text:
            return "Error: text is required"
        
        result = search_by_text(text, page_title_uid)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
                formatted += f"- {match['content']}{page_info}\n"
            
            return formatted
        else:
            return f"Error searching by text: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error searching by text: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_update_block(block_uid: str, content: Optional[str] = None, 
                          transform_pattern: Optional[Dict[str, Any]] = None) -> str:
    """Update a single block identified by its UID.

    Args:
        block_uid: UID of the block to update
        content: New content for the block
        transform_pattern: Pattern to transform the current content
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not block_uid:
            return "Error: block_uid is required"
        
        if not content and not transform_pattern:
            return "Error: Either content or transform_pattern must be provided"
        
        result = update_content(block_uid, content, transform_pattern)
        if result["success"]:
            return f"Block updated successfully: {result['content']}"
        else:
            return f"Error updating block: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error updating block: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_update_multiple_blocks(updates: List[Dict[str, Any]]) -> str:
    """Efficiently update multiple blocks in a single batch operation.

    Args:
        updates: Array of block updates to perform
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not updates or not isinstance(updates, list):
            return "Error: updates must be a non-empty list"
        
        result = update_multiple_contents(updates)
        if result["success"]:
            successful = sum(1 for r in result["results"] if r.get("success"))
            return f"Updated {successful}/{len(updates)} blocks successfully"
        else:
            return f"Error updating blocks: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error updating blocks: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_search_by_date(start_date: str, end_date: Optional[str] = None,
                            type_filter: str = "created", scope: str = "blocks",
                            include_content: bool = True) -> str:
    """Search for blocks or pages based on creation or modification dates.

    Args:
        start_date: Start date in ISO format (YYYY-MM-DD)
        end_date: Optional: End date in ISO format (YYYY-MM-DD)
        type_filter: Whether to search by "created", "modified", or "both"
        scope: Whether to search "blocks", "pages", or "both"
        include_content: Whether to include the content of matching blocks/pages
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not start_date:
            return "Error: start_date is required"
        
        if type_filter not in ["created", "modified", "both"]:
            return "Error: type_filter must be 'created', 'modified', or 'both'"
        
        if scope not in ["blocks", "pages", "both"]:
            return "Error: scope must be 'blocks', 'pages', or 'both'"
        
        result = search_by_date(start_date, end_date, type_filter, scope, include_content)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                date_info = datetime.fromtimestamp(match["time"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
                
                if match["type"] == "block":
                    page_info = f" (in page: {match.get('page_title', 'Unknown')})"
                    content_info = f": {match.get('content', '')}" if include_content else ""
                    formatted += f"- Block {match['uid']} {date_info}{page_info}{content_info}\n"
                else:  # page
                    title_info = f" (title: {match.get('title', 'Unknown')})"
                    content_info = f": {match.get('content', '')}" if include_content else ""
                    formatted += f"- Page {match['uid']} {date_info}{title_info}{content_info}\n"
            
            return formatted
        else:
            return f"Error searching by date: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error searching by date: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_remember(memory: str, categories: Optional[List[str]] = None) -> str:
    """Add a memory or piece of information to remember, stored on the daily page with tag.

    Args:
        memory: The memory detail or information to remember
        categories: Optional categories to tag the memory with
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not memory:
            return "Error: memory is required"
        
        result = remember(memory, categories)
        if result["success"]:
            return f"Memory stored successfully: {result['content']}"
        else:
            return f"Error storing memory: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error storing memory: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_recall(sort_by: str = "newest", filter_tag: Optional[str] = None) -> str:
    """Retrieve stored memories, optionally filtered by tag and sorted by creation date.

    Args:
        sort_by: Sort order for memories based on creation date
        filter_tag: Include only memories with a specific filter tag
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if sort_by not in ["newest", "oldest"]:
            return "Error: sort_by must be 'newest' or 'oldest'"
        
        result = recall(sort_by, filter_tag)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for memory in result["memories"]:
                formatted += f"- {memory}\n"
            
            return formatted
        else:
            return f"Error recalling memories: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error recalling memories: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def roam_datomic_query(query: str, inputs: Optional[List[Any]] = None) -> str:
    """Execute a custom Datomic query on the Roam graph beyond the available search tools.

    Args:
        query: The Datomic query to execute (in Datalog syntax)
        inputs: Optional array of input parameters for the query
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        if not query:
            return "Error: query is required"
        
        result = execute_datomic_query(query, inputs)
        if result["success"]:
            # Format the results
            formatted = f"{result['message']}\n\n"
            
            for match in result["matches"]:
                formatted += f"- {match['content']}\n"
            
            return formatted
        else:
            return f"Error executing query: {result.get('message', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error executing query: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.tool()
async def get_youtube_transcript(url: str) -> str:
    """Fetch and return the transcript of a YouTube video.

    Args:
        url: URL of the YouTube video
    """
    video_id = extract_youtube_video_id(url)
    if not video_id:
        return "Invalid YouTube URL. Unable to extract video ID."

    try:
        # Define the prioritized list of language codes
        languages = [
            'en', 'en-US', 'en-GB', 'de', 'es', 'hi', 'zh', 'ar', 'bn', 'pt',
            'ru', 'ja', 'pa'
        ]

        # Attempt to retrieve the available transcripts
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

        # Try to find a transcript in the prioritized languages
        for language in languages:
            try:
                transcript = transcript_list.find_transcript([language])
                # Check if the transcript is manually created or generated, prefer manually created
                if transcript.is_generated:
                    continue
                text = " ".join([line["text"] for line in transcript.fetch()])
                return text
            except Exception:
                continue

        # If no suitable transcript is found in the specified languages, try to fetch a generated transcript
        try:
            generated_transcript = transcript_list.find_generated_transcript(
                languages)
            text = " ".join(
                [line["text"] for line in generated_transcript.fetch()])
            return text
        except Exception:
            return "No suitable transcript found for this video."

    except TranscriptsDisabled:
        return "Transcripts are disabled for this video."
    except Exception as e:
        logger.error(f"Error fetching YouTube transcript: {str(e)}", exc_info=True)
        return f"An error occurred while fetching the transcript: {str(e)}"


@mcp.tool()
async def fetch_webpage_content(url: str) -> str:
    """Fetch and extract the main content from a web page.

    Args:
        url: URL of the web page to fetch
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        logger.debug(f"Fetching webpage content: {url}")
        result = await parse_webpage(url)
        
        if result["success"]:
            return f"# {result['title']}\n\nSource: {url}\n\n{result['content']}"
        else:
            return f"Error fetching webpage: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error in fetch_webpage_content: {str(e)}", exc_info=True)
        return f"Error fetching webpage: {str(e)}"


@mcp.tool()
async def fetch_pdf_content(url: str) -> str:
    """Fetch and extract the content from a PDF file.

    Args:
        url: URL of the PDF file to fetch
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        logger.debug(f"Fetching PDF content: {url}")
        result = await parse_pdf(url)
        
        if result["success"]:
            return f"# {result['title']}\n\nSource: {url}\n\n{result['content']}"
        else:
            return f"Error fetching PDF: {result.get('error', 'Unknown error')}"
    except Exception as e:
        logger.error(f"Error in fetch_pdf_content: {str(e)}", exc_info=True)
        return f"Error fetching PDF: {str(e)}"


@mcp.tool()
async def parse_url(url: str) -> str:
    """Intelligently parse content from a URL - supports webpages, PDFs, and YouTube videos.

    Args:
        url: URL to parse
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        # Detect URL type
        url_type = detect_url_type(url)
        
        if url_type == "youtube":
            # Use existing YouTube transcript function
            return await get_youtube_transcript(url)
        elif url_type == "pdf":
            return await fetch_pdf_content(url)
        else:  # webpage or unknown
            return await fetch_webpage_content(url)
    except Exception as e:
        logger.error(f"Error parsing URL: {str(e)}", exc_info=True)
        return f"Error parsing URL: {str(e)}"


@mcp.tool()
async def get_roam_graph_info() -> str:
    """Get information about your Roam Research graph.
    """
    if not validate_environment():
        return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
    
    try:
        # Get page count
        query = """[:find (count ?p)
                    :where [?p :node/title]]"""
        
        result = execute_datomic_query(query)
        
        if result["success"] and result["matches"]:
            page_count = result["matches"][0]["content"]
        else:
            page_count = "Unknown"
        
        # Get block count
        query = """[:find (count ?b)
                    :where [?b :block/string]]"""
        
        result = execute_datomic_query(query)
        
        if result["success"] and result["matches"]:
            block_count = result["matches"][0]["content"]
        else:
            block_count = "Unknown"
        
        # Format the output
        memory_tag = MEMORIES_TAG if MEMORIES_TAG else "Not set (using default #[[Memories]])"
        
        formatted_info = f"""
Graph Name: {GRAPH_NAME}
Pages: {page_count}
Blocks: {block_count}
API Access: Enabled
Memory Tag: {memory_tag}
"""
        
        return formatted_info
    except Exception as e:
        logger.error(f"Error retrieving graph information: {str(e)}", exc_info=True)
        return format_error_response(e)


@mcp.prompt()
async def summarize_page(page_title: str) -> dict:
    """
    Create a prompt to summarize a page in Roam Research.

    Args:
        page_title: Title of the page to summarize
    """
    if not validate_environment():
        return {
            "messages": [{
                "role": "user",
                "content": "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
            }]
        }
    
    try:
        content = get_page_content(page_title)
        
        return {
            "messages": [{
                "role": "user",
                "content": f"Please provide a concise summary of the following page content from my Roam Research database:\n\n{content}"
            }]
        }
    except Exception as e:
        logger.error(f"Error creating summary prompt: {str(e)}", exc_info=True)
        return {
            "messages": [{
                "role": "user",
                "content": f"I wanted to summarize my Roam page titled '{page_title}', but there was an error retrieving the content: {format_error_response(e)}. Can you help me troubleshoot this issue with my Roam Research integration?"
            }]
        }


def run_server(transport="stdio", port=None, verbose=False):
    """Run the MCP server with the specified transport."""
    # Configure logging based on verbosity
    setup_logging(verbose)
    
    logger.info("Server starting...")
    
    # Validate environment variables
    valid_env = validate_environment()
    if valid_env:
        logger.info(f"API token and graph name are set")
        logger.info(f"MEMORIES_TAG is set to: {MEMORIES_TAG}")
    else:
        logger.warning("Missing required environment variables")
    
    # Run the server
    try:
        if transport == "stdio":
            logger.info("Starting server with stdio transport")
            mcp.run(transport="stdio")
        elif transport == "sse":
            if not port:
                port = 3000
            logger.info(f"Starting server with SSE transport on port {port}")
            mcp.run(transport="sse", port=port)
        else:
            logger.error(f"Unsupported transport: {transport}")
            sys.exit(1)
    except KeyboardInterrupt:
        logger.info("Server stopped by user")
    except Exception as e:
        logger.error(f"Error running server: {str(e)}")
        traceback.print_exc()
```

--------------------------------------------------------------------------------
/roam_mcp/content.py:
--------------------------------------------------------------------------------

```python
"""Content operations for the Roam MCP server (pages, blocks, and outlines)."""

from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import re
import logging
import uuid
import time
import json

from roam_mcp.api import (
    execute_query,
    execute_write_action,
    execute_batch_actions,
    get_session_and_headers,
    GRAPH_NAME,
    find_or_create_page,
    get_daily_page,
    add_block_to_page,
    update_block,
    batch_update_blocks,
    find_page_by_title,
    ValidationError,
    BlockNotFoundError,
    PageNotFoundError,
    TransactionError
)
from roam_mcp.utils import (
    format_roam_date,
    convert_to_roam_markdown,
    parse_markdown_list,
    process_nested_content,
    find_block_uid,
    create_block_action
)

# Set up logging
logger = logging.getLogger("roam-mcp.content")


def process_hierarchical_content(parent_uid: str, content_data: List[Dict[str, Any]], order: str = "last") -> Dict[str, Any]:
    """
    Process hierarchical content with proper parent-child relationships.
    This is a standardized utility function used across different content creation methods.
    
    Args:
        parent_uid: UID of the parent block/page
        content_data: List of content items with text, level, and optional children/heading_level attributes
        order: Where to add content ("first" or "last")
        
    Returns:
        Dictionary with success status and created block UIDs
    """
    if not content_data:
        return {
            "success": True,
            "created_uids": []
        }
    
    # First, validate the hierarchical structure
    def validate_item(item, path="root"):
        errors = []
        # Check required fields
        if not item.get("text") and not item.get("string"):
            errors.append(f"Item at {path} is missing required 'text' field")
        
        # Ensure level is valid
        level = item.get("level")
        if level is not None and not isinstance(level, int):
            errors.append(f"Item at {path} has invalid 'level', must be an integer")
        
        # Validate heading level
        heading_level = item.get("heading_level", 0)
        if heading_level and (not isinstance(heading_level, int) or heading_level < 0 or heading_level > 3):
            errors.append(f"Item at {path} has invalid 'heading_level', must be an integer between 0 and 3")
            
        # Validate children recursively
        children = item.get("children", [])
        if not isinstance(children, list):
            errors.append(f"Item at {path} has invalid 'children', must be a list")
        else:
            for i, child in enumerate(children):
                child_path = f"{path}.children[{i}]"
                child_errors = validate_item(child, child_path)
                errors.extend(child_errors)
                
        return errors
    
    # Validate all items
    all_errors = []
    for i, item in enumerate(content_data):
        item_path = f"item[{i}]"
        errors = validate_item(item, item_path)
        all_errors.extend(errors)
        
    if all_errors:
        return {
            "success": False,
            "error": f"Invalid content structure: {'; '.join(all_errors)}"
        }
    
    # Process hierarchical content with proper nesting
    session, headers = get_session_and_headers()
    all_created_uids = []
    
    # Define a recursive function to process items
    def process_item(item, parent_uid, level_to_uid, current_level):
        created_uids = []
        
        # Get item properties
        text = item.get("text", item.get("string", ""))
        
        # Strip leading dash characters that might cause double bullets
        text = re.sub(r'^-\s+', '', text)
        
        level = item.get("level", current_level)
        heading_level = item.get("heading_level", 0)
        
        # Find the appropriate parent for this level
        parent_level = level - 1
        if parent_level < -1:
            parent_level = -1
            
        effective_parent = level_to_uid.get(parent_level, parent_uid)
        
        # Create block with a unique UID
        block_uid = str(uuid.uuid4())[:9]
        
        action_data = {
            "action": "create-block",
            "location": {
                "parent-uid": effective_parent,
                "order": order if level == 0 else "last"
            },
            "block": {
                "string": text,
                "uid": block_uid
            }
        }
        
        # Add heading level if specified
        if heading_level and heading_level > 0 and heading_level <= 3:
            action_data["block"]["heading"] = heading_level
            
        # Execute the action
        result = execute_write_action(action_data)
        
        if result.get("success", False):
            created_uids.append(block_uid)
            level_to_uid[level] = block_uid
            logger.debug(f"Created block at level {level} with UID: {block_uid}")
            
            # Process children if any
            children = item.get("children", [])
            if children:
                for child in children:
                    # Process each child with this block as parent
                    child_result = process_item(child, block_uid, level_to_uid, level + 1)
                    created_uids.extend(child_result)
                    
            # Add a brief delay for API stability
            time.sleep(0.3)
        else:
            logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}")
        
        return created_uids
    
    try:
        # Process each top-level item
        level_to_uid = {-1: parent_uid}  # Start with parent as level -1
        
        for item in content_data:
            item_uids = process_item(item, parent_uid, level_to_uid, 0)
            all_created_uids.extend(item_uids)
            
        return {
            "success": True,
            "created_uids": all_created_uids
        }
    except Exception as e:
        error_msg = f"Failed to process hierarchical content: {str(e)}"
        logger.error(error_msg)
        return {
            "success": False,
            "error": error_msg,
            "created_uids": all_created_uids  # Return any UIDs created before failure
        }


def create_nested_blocks(parent_uid: str, blocks_data: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Create nested blocks with proper parent-child relationships.
    
    Args:
        parent_uid: UID of the parent block/page
        blocks_data: List of block data (text, level, children)
        
    Returns:
        Dictionary with success status and created block UIDs
    """
    # For backward compatibility, now uses the standardized hierarchical content processor
    return process_hierarchical_content(parent_uid, blocks_data)


def create_page(title: str, content: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
    """
    Create a new page in Roam Research with optional nested content.
    
    Args:
        title: Title for the new page
        content: Optional content as a list of dicts with 'text', optional 'level', and optional 'children'
               Each item should have:
               - 'text' or 'string': Content text
               - 'level': Nesting level (optional, defaults to parent_level + 1)
               - 'heading_level': Heading level 1-3 (optional)
               - 'children': List of child items (optional)
        
    Returns:
        Result with page UID and created block UIDs
    """
    if not title:
        return {
            "success": False,
            "error": "Title is required"
        }
    
    session, headers = get_session_and_headers()
    
    try:
        # Create the page
        page_uid = find_or_create_page(title)
        
        # Add content if provided
        if content:
            # Use the standardized hierarchical content processor
            result = process_hierarchical_content(page_uid, content)
            
            if result["success"]:
                return {
                    "success": True,
                    "uid": page_uid,
                    "created_uids": result.get("created_uids", []),
                    "page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
                }
            else:
                return {
                    "success": False,
                    "error": result.get("error", "Failed to create content"),
                    "uid": page_uid,
                    "page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
                }
        
        return {
            "success": True,
            "uid": page_uid,
            "page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
        }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except TransactionError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        logger.error(f"Error creating page: {str(e)}")
        return {
            "success": False,
            "error": f"Error creating page: {str(e)}"
        }


def create_block(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None) -> Dict[str, Any]:
    """
    Create a new block in Roam Research.
    
    Args:
        content: Block content - can be single-line text or multi-line content 
                 that will be parsed into a hierarchical structure
        page_uid: Optional page UID
        page_title: Optional page title
        
    Returns:
        Result with block UID
    """
    if not content:
        return {
            "success": False,
            "error": "Content is required"
        }
    
    session, headers = get_session_and_headers()
    
    try:
        # Determine target page
        target_page_uid = None
        
        if page_uid:
            # Use provided page UID
            target_page_uid = page_uid
        elif page_title:
            # Find or create page by title
            target_page_uid = find_or_create_page(page_title)
        else:
            # Use today's daily page
            target_page_uid = get_daily_page()
        
        # Handle multi-line content
        if "\n" in content:
            # Parse as nested structure
            markdown_content = convert_to_roam_markdown(content)
            parsed_content = parse_markdown_list(markdown_content)
            
            # Check if there's any content
            if not parsed_content:
                return {
                    "success": False,
                    "error": "Failed to parse content"
                }
            
            # Build hierarchical structure
            def build_hierarchy_from_parsed(items):
                # Sort by level first
                sorted_items = sorted(items, key=lambda x: x.get("level", 0))
                
                # Group items by level
                level_groups = {}
                for item in sorted_items:
                    level = item.get("level", 0)
                    if level not in level_groups:
                        level_groups[level] = []
                    level_groups[level].append(item)
                
                # Find the minimum level (root level)
                min_level = min(level_groups.keys()) if level_groups else 0
                root_items = level_groups.get(min_level, [])
                
                # Track parents at each level
                current_parents = {}
                hierarchical_items = []
                
                # Process items level by level
                for level in sorted(level_groups.keys()):
                    for item in level_groups[level]:
                        if level == min_level:
                            # Root level items
                            hierarchical_items.append(item)
                            current_parents[level] = item
                        else:
                            # Find the parent
                            parent_level = level - 1
                            while parent_level >= min_level:
                                if parent_level in current_parents:
                                    parent = current_parents[parent_level]
                                    if "children" not in parent:
                                        parent["children"] = []
                                    parent["children"].append(item)
                                    current_parents[level] = item
                                    break
                                parent_level -= 1
                            
                            # If no parent found, add as root
                            if parent_level < min_level:
                                hierarchical_items.append(item)
                                current_parents[level] = item
                
                return hierarchical_items
            
            # Build hierarchical structure
            hierarchical_content = build_hierarchy_from_parsed(parsed_content)
            
            # Process using the standardized hierarchical content processor
            result = process_hierarchical_content(target_page_uid, hierarchical_content)
            
            if result["success"]:
                return {
                    "success": True,
                    "block_uid": result["created_uids"][0] if result["created_uids"] else None,
                    "parent_uid": target_page_uid,
                    "created_uids": result["created_uids"]
                }
            else:
                return {
                    "success": False,
                    "error": result.get("error", "Failed to create hierarchical blocks"),
                    "parent_uid": target_page_uid
                }
        else:
            # Create a simple block with explicit UID
            block_uid = str(uuid.uuid4())[:9]
            
            action_data = {
                "action": "create-block",
                "location": {
                    "parent-uid": target_page_uid,
                    "order": "last"
                },
                "block": {
                    "string": content,
                    "uid": block_uid
                }
            }
            
            result = execute_write_action(action_data)
            if result.get("success", False):
                # Verify the block exists after a brief delay
                time.sleep(0.5)
                found_uid = find_block_uid(session, headers, GRAPH_NAME, content)
                
                return {
                    "success": True,
                    "block_uid": found_uid or block_uid,
                    "parent_uid": target_page_uid
                }
            else:
                return {
                    "success": False,
                    "error": "Failed to create block"
                }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except BlockNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except TransactionError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        logger.error(f"Error creating block: {str(e)}")
        return {
            "success": False,
            "error": f"Error creating block: {str(e)}"
        }


def create_outline(outline: List[Dict[str, Any]], page_title_uid: Optional[str] = None, block_text_uid: Optional[str] = None) -> Dict[str, Any]:
    """
    Create a structured outline in Roam Research.
    
    Args:
        outline: List of outline items with text and level
               Each item should have:
                - 'text': Content text (required)
                - 'level': Nesting level (required)
                - 'heading_level': Heading level 1-3 (optional)
        page_title_uid: Optional page title or UID
        block_text_uid: Optional block text or UID to add outline under
        
    Returns:
        Result with created block UIDs
    """
    # Validate outline
    if not outline:
        return {
            "success": False,
            "error": "Outline cannot be empty"
        }
    
    # Check for valid levels
    invalid_items = [item for item in outline if not item.get("text") or not isinstance(item.get("level"), int)]
    if invalid_items:
        return {
            "success": False,
            "error": "All outline items must have text and a valid level"
        }
    
    session, headers = get_session_and_headers()
    
    try:
        # Determine target page
        target_page_uid = None
        
        if page_title_uid:
            # Find page by title or UID
            page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
            
            if page_uid:
                target_page_uid = page_uid
            else:
                # Create new page if not found
                target_page_uid = find_or_create_page(page_title_uid)
        else:
            # Use today's daily page
            target_page_uid = get_daily_page()
        
        # Determine parent block
        parent_uid = target_page_uid
        
        if block_text_uid:
            # Check if it's a valid block UID (9 characters)
            if len(block_text_uid) == 9 and re.match(r'^[a-zA-Z0-9_-]{9}$', block_text_uid):
                # Verify block exists
                query = f'''[:find ?uid
                           :where [?b :block/uid "{block_text_uid}"]
                                  [?b :block/uid ?uid]]'''
                
                result = execute_query(query)
                
                if result:
                    parent_uid = block_text_uid
                else:
                    return {
                        "success": False,
                        "error": f"Block with UID {block_text_uid} not found"
                    }
            else:
                # Create a header block with the given text
                action_data = {
                    "action": "create-block",
                    "location": {
                        "parent-uid": target_page_uid,
                        "order": "last"
                    },
                    "block": {
                        "string": block_text_uid,
                        "uid": str(uuid.uuid4())[:9]
                    }
                }
                
                execute_write_action(action_data)
                time.sleep(0.5)  # Add delay to ensure block is created
                header_uid = find_block_uid(session, headers, GRAPH_NAME, block_text_uid)
                
                if not header_uid:
                    return {
                        "success": False,
                        "error": f"Failed to create header block with text: {block_text_uid}"
                    }
                    
                parent_uid = header_uid
        
        # Build hierarchical structure from flat outline items
        def build_outline_hierarchy(items):
            # First, sort by level
            sorted_items = sorted(items, key=lambda x: x.get("level", 0))
            
            # Group items by level
            level_groups = {}
            for item in sorted_items:
                level = item.get("level", 0)
                if level not in level_groups:
                    level_groups[level] = []
                level_groups[level].append(item)
            
            # Build parent-child relationships based on item position and level
            min_level = min(level_groups.keys()) if level_groups else 0
            hierarchical_items = []
            
            # Track parent nodes at each level
            level_parents = {}
            
            # Process items in order
            for item in sorted_items:
                level = item.get("level", 0)
                
                # If this is a root-level item, add it to the result directly
                if level == min_level:
                    hierarchical_items.append(item)
                    level_parents[level] = item
                else:
                    # Find the nearest parent level
                    parent_level = level - 1
                    while parent_level >= min_level and parent_level not in level_parents:
                        parent_level -= 1
                    
                    # If we found a parent, add this item as its child
                    if parent_level >= min_level:
                        parent = level_parents[parent_level]
                        if "children" not in parent:
                            parent["children"] = []
                        parent["children"].append(item)
                        level_parents[level] = item
                    else:
                        # If no parent found, add it as a root item
                        hierarchical_items.append(item)
                        level_parents[level] = item
            
            return hierarchical_items
        
        # Build hierarchical structure from outline
        hierarchical_outline = build_outline_hierarchy(outline)
        
        # Use the standardized hierarchical content processor
        result = process_hierarchical_content(parent_uid, hierarchical_outline)
        
        if result["success"]:
            return {
                "success": True,
                "page_uid": target_page_uid,
                "parent_uid": parent_uid,
                "created_uids": result.get("created_uids", [])
            }
        else:
            return {
                "success": False,
                "error": result.get("error", "Failed to create outline"),
                "page_uid": target_page_uid,
                "parent_uid": parent_uid
            }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except BlockNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except TransactionError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        logger.error(f"Error creating outline: {str(e)}")
        return {
            "success": False,
            "error": f"Error creating outline: {str(e)}"
        }


def import_markdown(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None,
                   parent_uid: Optional[str] = None, parent_string: Optional[str] = None,
                   order: str = "last") -> Dict[str, Any]:
    """
    Import markdown content into Roam Research.
    
    Args:
        content: Markdown content to import
        page_uid: Optional page UID
        page_title: Optional page title
        parent_uid: Optional parent block UID
        parent_string: Optional parent block text
        order: Position ("first" or "last")
        
    Returns:
        Result with created block UIDs
    """
    if not content:
        return {
            "success": False,
            "error": "Content cannot be empty"
        }
    
    if order not in ["first", "last"]:
        return {
            "success": False,
            "error": "Order must be 'first' or 'last'"
        }
    
    session, headers = get_session_and_headers()
    
    try:
        # Determine target page
        target_page_uid = None
        
        if page_uid:
            # Use provided page UID
            target_page_uid = page_uid
        elif page_title:
            # Find or create page by title
            target_page_uid = find_or_create_page(page_title)
        else:
            # Use today's daily page
            target_page_uid = get_daily_page()
        
        # Determine parent block
        parent_block_uid = target_page_uid
        
        if parent_uid:
            # Verify block exists
            query = f'''[:find ?uid .
                       :where [?b :block/uid "{parent_uid}"]
                              [?b :block/uid ?uid]]'''
            
            result = execute_query(query)
            
            if result:
                parent_block_uid = parent_uid
            else:
                return {
                    "success": False,
                    "error": f"Block with UID {parent_uid} not found"
                }
        elif parent_string:
            # Find block by string
            found_uid = find_block_uid(session, headers, GRAPH_NAME, parent_string)
            
            if found_uid:
                parent_block_uid = found_uid
            else:
                # Create parent block if it doesn't exist
                block_uid = str(uuid.uuid4())[:9]
                
                action_data = {
                    "action": "create-block",
                    "location": {
                        "parent-uid": target_page_uid,
                        "order": "last"
                    },
                    "block": {
                        "string": parent_string,
                        "uid": block_uid
                    }
                }
                
                execute_write_action(action_data)
                time.sleep(1)  # Wait for block to be created
                
                found_uid = find_block_uid(session, headers, GRAPH_NAME, parent_string)
                if found_uid:
                    parent_block_uid = found_uid
                else:
                    parent_block_uid = block_uid
                    logger.debug(f"Created parent block with UID: {block_uid}")
        
        # Convert markdown to Roam format
        roam_markdown = convert_to_roam_markdown(content)
        
        # Parse markdown into hierarchical structure
        parsed_content = parse_markdown_list(roam_markdown)
        
        if not parsed_content:
            return {
                "success": False,
                "error": "Failed to parse markdown content"
            }
        
        # Build a proper hierarchical structure from the parsed markdown
        def build_hierarchy(items):
            # Group items by level
            level_groups = {}
            for item in items:
                level = item.get("level", 0)
                if level not in level_groups:
                    level_groups[level] = []
                level_groups[level].append(item)
            
            # Start with the root level (usually 0)
            min_level = min(level_groups.keys()) if level_groups else 0
            root_items = level_groups.get(min_level, [])
            
            # Recursive function to build the tree
            def attach_children(parent_items, parent_level):
                for parent in parent_items:
                    children = []
                    child_level = parent_level + 1
                    
                    # If there are items at the next level
                    if child_level in level_groups:
                        # Find children whose current parent would be this item
                        # based on the flattened list's position
                        parent_index = items.index(parent)
                        for potential_child in level_groups[child_level]:
                            child_index = items.index(potential_child)
                            
                            # Is this child positioned after the parent and before the next parent?
                            if child_index > parent_index:
                                # Check if there's another parent of the same level between this parent and the child
                                next_parent_index = float('inf')
                                for next_parent in level_groups[parent_level]:
                                    next_idx = items.index(next_parent)
                                    if next_idx > parent_index and next_idx < child_index:
                                        next_parent_index = next_idx
                                        break
                                
                                if child_index < next_parent_index:
                                    children.append(potential_child)
                    
                    # Set the children
                    if children:
                        parent["children"] = children
                        # Recursively attach children to these children
                        attach_children(children, child_level)
            
            # Start the recursive process
            attach_children(root_items, min_level)
            return root_items
        
        # Build a hierarchical structure that preserves parent-child relationships
        hierarchical_content = build_hierarchy(parsed_content)
        
        # Process the hierarchical content using the standardized utility
        result = process_hierarchical_content(parent_block_uid, hierarchical_content, order)
        
        if result["success"]:
            return {
                "success": True,
                "page_uid": target_page_uid,
                "parent_uid": parent_block_uid,
                "created_uids": result.get("created_uids", [])
            }
        else:
            return {
                "success": False,
                "error": result.get("error", "Failed to import markdown"),
                "page_uid": target_page_uid,
                "parent_uid": parent_block_uid
            }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except BlockNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except TransactionError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        logger.error(f"Error importing markdown: {str(e)}")
        return {
            "success": False,
            "error": f"Error importing markdown: {str(e)}"
        }


def add_todos(todos: List[str]) -> Dict[str, Any]:
    """
    Add todo items to today's daily page.
    
    Args:
        todos: List of todo items
        
    Returns:
        Result with success status
    """
    if not todos:
        return {
            "success": False,
            "error": "Todo list cannot be empty"
        }
    
    if not all(isinstance(todo, str) for todo in todos):
        return {
            "success": False,
            "error": "All todo items must be strings"
        }
    
    session, headers = get_session_and_headers()
    
    try:
        # Get today's daily page
        daily_page_uid = get_daily_page()
        
        # Create batch actions for todos
        actions = []
        todo_uids = []
        
        for i, todo in enumerate(todos):
            # Format with TODO syntax
            todo_content = f"{{{{[[TODO]]}}}} {todo}"
            
            # Generate UID
            block_uid = str(uuid.uuid4())[:9]
            todo_uids.append(block_uid)
            
            # Create action
            action = {
                "action": "create-block",
                "location": {
                    "parent-uid": daily_page_uid,
                    "order": "last"
                },
                "block": {
                    "string": todo_content,
                    "uid": block_uid
                }
            }
            
            actions.append(action)
        
        # Execute batch actions
        result = execute_write_action(actions)
        
        if result.get("success", False) or "created_uids" in result:
            return {
                "success": True,
                "created_uids": result.get("created_uids", todo_uids),
                "page_uid": daily_page_uid
            }
        else:
            return {
                "success": False,
                "error": "Failed to create todo items"
            }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except TransactionError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


def update_content(block_uid: str, content: Optional[str] = None, transform_pattern: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Update a block's content or transform it using a pattern.
    
    Args:
        block_uid: Block UID
        content: New content
        transform_pattern: Pattern for transformation
        
    Returns:
        Result with updated content
    """
    if not block_uid:
        return {
            "success": False,
            "error": "Block UID is required"
        }
    
    if not content and not transform_pattern:
        return {
            "success": False,
            "error": "Either content or transform_pattern must be provided"
        }
    
    try:
        # Get current content if doing a transformation
        if transform_pattern:
            # Validate transform pattern
            if not isinstance(transform_pattern, dict):
                return {
                    "success": False,
                    "error": "Transform pattern must be an object"
                }
            
            if "find" not in transform_pattern or "replace" not in transform_pattern:
                return {
                    "success": False,
                    "error": "Transform pattern must include 'find' and 'replace' properties"
                }
            
            query = f'''[:find ?string .
                        :where [?b :block/uid "{block_uid}"]
                                [?b :block/string ?string]]'''
            
            current_content = execute_query(query)
            
            if not current_content:
                return {
                    "success": False,
                    "error": f"Block with UID {block_uid} not found"
                }
            
            # Apply transformation
            find = transform_pattern["find"]
            replace = transform_pattern["replace"]
            global_replace = transform_pattern.get("global", True)
            
            try:
                flags = re.MULTILINE
                count = 0 if global_replace else 1
                new_content = re.sub(find, replace, current_content, count=count, flags=flags)
                
                # Update block
                update_block(block_uid, new_content)
                
                return {
                    "success": True,
                    "content": new_content
                }
            except re.error as e:
                return {
                    "success": False,
                    "error": f"Invalid regex pattern: {str(e)}"
                }
        else:
            # Direct content update
            update_block(block_uid, content)
            
            return {
                "success": True,
                "content": content
            }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except BlockNotFoundError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except TransactionError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }


def update_multiple_contents(updates: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Update multiple blocks in a single operation.
    
    Args:
        updates: List of update operations
        
    Returns:
        Results of updates
    """
    if not updates or not isinstance(updates, list):
        return {
            "success": False,
            "error": "Updates must be a non-empty list"
        }
    
    try:
        # Validate each update
        for i, update in enumerate(updates):
            if "block_uid" not in update:
                return {
                    "success": False,
                    "error": f"Update at index {i} is missing required 'block_uid' property"
                }
            
            if "content" not in update and "transform" not in update:
                return {
                    "success": False,
                    "error": f"Update at index {i} must include either 'content' or 'transform'"
                }
            
            if "transform" in update:
                transform = update["transform"]
                if not isinstance(transform, dict):
                    return {
                        "success": False,
                        "error": f"Transform at index {i} must be an object"
                    }
                
                if "find" not in transform or "replace" not in transform:
                    return {
                        "success": False,
                        "error": f"Transform at index {i} must include 'find' and 'replace' properties"
                    }
        
        # Batch update blocks in chunks of 50
        CHUNK_SIZE = 50
        results = batch_update_blocks(updates, CHUNK_SIZE)
        
        # Count successful updates
        successful = sum(1 for result in results if result.get("success"))
        
        return {
            "success": successful == len(updates),
            "results": results,
            "message": f"Updated {successful}/{len(updates)} blocks successfully"
        }
    except ValidationError as e:
        return {
            "success": False,
            "error": str(e)
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }
    """
    Create nested blocks with proper parent-child relationships.
    
    Args:
        parent_uid: UID of the parent block/page
        blocks_data: List of block data (text, level, children)
        
    Returns:
        Dictionary with success status and created block UIDs
    """
    # For backward compatibility, now uses the standardized hierarchical content processor
    return process_hierarchical_content(parent_uid, blocks_data)
```

--------------------------------------------------------------------------------
/roam_mcp/search.py:
--------------------------------------------------------------------------------

```python
"""Search operations for the Roam MCP server."""

from typing import Dict, List, Any, Optional, Union, Set
from datetime import datetime, timedelta
import re
import logging

from roam_mcp.api import (
    execute_query,
    get_session_and_headers,
    GRAPH_NAME,
    find_page_by_title,
    ValidationError,
    QueryError,
    PageNotFoundError,
    BlockNotFoundError
)
from roam_mcp.utils import (
    format_roam_date,
    resolve_block_references
)

# Set up logging
logger = logging.getLogger("roam-mcp.search")


def validate_search_params(text: Optional[str] = None, tag: Optional[str] = None, 
                          status: Optional[str] = None, page_title_uid: Optional[str] = None):
    """
    Validate common search parameters.
    
    Args:
        text: Optional text to search for
        tag: Optional tag to search for
        status: Optional status to search for
        page_title_uid: Optional page title or UID
        
    Raises:
        ValidationError: If parameters are invalid
    """
    if status and status not in ["TODO", "DONE"]:
        raise ValidationError("Status must be 'TODO' or 'DONE'", "status")


def search_by_text(text: str, page_title_uid: Optional[str] = None, case_sensitive: bool = True) -> Dict[str, Any]:
    """
    Search for blocks containing specific text.
    
    Args:
        text: Text to search for
        page_title_uid: Optional page title or UID to scope the search
        case_sensitive: Whether to perform case-sensitive search
        
    Returns:
        Search results
    """
    if not text:
        return {
            "success": False,
            "matches": [],
            "message": "Search text cannot be empty"
        }
    
    session, headers = get_session_and_headers()
    
    # Prepare the query
    if case_sensitive:
        text_condition = f'(clojure.string/includes? ?s "{text}")'
    else:
        text_condition = f'(clojure.string/includes? (clojure.string/lower-case ?s) "{text.lower()}")'
    
    try:
        if page_title_uid:
            # Try to find the page UID if a title was provided
            page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
            
            if not page_uid:
                return {
                    "success": False,
                    "matches": [],
                    "message": f"Page '{page_title_uid}' not found"
                }
                
            query = f"""[:find ?uid ?s ?order
                      :where
                      [?p :block/uid "{page_uid}"]
                      [?b :block/page ?p]
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :block/order ?order]
                      [{text_condition}]]"""
        else:
            query = f"""[:find ?uid ?s ?page-title
                      :where
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :block/page ?p]
                      [?p :node/title ?page-title]
                      [{text_condition}]]"""
        
        # Execute the query
        logger.debug(f"Executing text search for: {text}")
        results = execute_query(query)
        
        # Process the results
        matches = []
        if page_title_uid:
            # For page-specific search, results are [uid, content, order]
            for uid, content, order in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title_uid
                })
        else:
            # For global search, results are [uid, content, page_title]
            for uid, content, page_title in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title
                })
        
        return {
            "success": True,
            "matches": matches,
            "message": f"Found {len(matches)} block(s) containing \"{text}\""
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error searching by text: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Error searching by text: {str(e)}"
        }


def search_by_tag(tag: str, page_title_uid: Optional[str] = None, near_tag: Optional[str] = None) -> Dict[str, Any]:
    """
    Search for blocks containing a specific tag.
    
    Args:
        tag: Tag to search for (without # or [[ ]])
        page_title_uid: Optional page title or UID to scope the search
        near_tag: Optional second tag that must appear in the same block
        
    Returns:
        Search results
    """
    if not tag:
        return {
            "success": False,
            "matches": [],
            "message": "Tag cannot be empty"
        }
    
    session, headers = get_session_and_headers()
    
    # Format the tag for searching
    # Remove any existing formatting
    clean_tag = tag.replace('#', '').replace('[[', '').replace(']]', '')
    tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
    
    # Build tag conditions
    tag_conditions = []
    for variant in tag_variants:
        tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
    
    tag_condition = f"(or {' '.join(tag_conditions)})"
    
    # Add near_tag condition if provided
    if near_tag:
        clean_near_tag = near_tag.replace('#', '').replace('[[', '').replace(']]', '')
        near_tag_variants = [f"#{clean_near_tag}", f"#[[{clean_near_tag}]]", f"[[{clean_near_tag}]]"]
        
        near_tag_conditions = []
        for variant in near_tag_variants:
            near_tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
        
        near_tag_condition = f"(or {' '.join(near_tag_conditions)})"
        combined_condition = f"(and {tag_condition} {near_tag_condition})"
    else:
        combined_condition = tag_condition
    
    try:
        # Build query based on whether we're searching in a specific page
        if page_title_uid:
            # Try to find the page UID if a title was provided
            page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
            
            if not page_uid:
                return {
                    "success": False,
                    "matches": [],
                    "message": f"Page '{page_title_uid}' not found"
                }
                
            query = f"""[:find ?uid ?s
                      :where
                      [?p :block/uid "{page_uid}"]
                      [?b :block/page ?p]
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [{combined_condition}]]"""
        else:
            query = f"""[:find ?uid ?s ?page-title
                      :where
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :block/page ?p]
                      [?p :node/title ?page-title]
                      [{combined_condition}]]"""
        
        # Execute the query
        logger.debug(f"Executing tag search for: {tag}")
        if near_tag:
            logger.debug(f"With near tag: {near_tag}")
            
        results = execute_query(query)
        
        # Process the results
        matches = []
        if page_title_uid:
            # For page-specific search, results are [uid, content]
            for uid, content in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title_uid
                })
        else:
            # For global search, results are [uid, content, page_title]
            for uid, content, page_title in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title
                })
        
        # Build message
        message = f"Found {len(matches)} block(s) with tag #{clean_tag}"
        if near_tag:
            message += f" near #{clean_near_tag}"
        
        return {
            "success": True,
            "matches": matches,
            "message": message
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error searching by tag: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Error searching by tag: {str(e)}"
        }


def search_by_status(status: str, page_title_uid: Optional[str] = None, include: Optional[str] = None, exclude: Optional[str] = None) -> Dict[str, Any]:
    """
    Search for blocks with a specific status (TODO/DONE).
    
    Args:
        status: Status to search for ("TODO" or "DONE")
        page_title_uid: Optional page title or UID to scope the search
        include: Optional comma-separated keywords to include
        exclude: Optional comma-separated keywords to exclude
        
    Returns:
        Search results
    """
    if status not in ["TODO", "DONE"]:
        return {
            "success": False,
            "matches": [],
            "message": "Status must be either 'TODO' or 'DONE'"
        }
    
    session, headers = get_session_and_headers()
    
    # Status pattern
    status_pattern = f"{{{{[[{status}]]}}}}"
    
    try:
        # Build query based on whether we're searching in a specific page
        if page_title_uid:
            # Try to find the page UID if a title was provided
            page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
            
            if not page_uid:
                return {
                    "success": False,
                    "matches": [],
                    "message": f"Page '{page_title_uid}' not found"
                }
                
            query = f"""[:find ?uid ?s
                      :where
                      [?p :block/uid "{page_uid}"]
                      [?b :block/page ?p]
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [(clojure.string/includes? ?s "{status_pattern}")]]"""
        else:
            query = f"""[:find ?uid ?s ?page-title
                      :where
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :block/page ?p]
                      [?p :node/title ?page-title]
                      [(clojure.string/includes? ?s "{status_pattern}")]]"""
        
        # Execute the query
        logger.debug(f"Executing status search for: {status}")
        results = execute_query(query)
        
        # Process the results
        matches = []
        if page_title_uid:
            # For page-specific search, results are [uid, content]
            for uid, content in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                # Apply include/exclude filters
                if include:
                    include_terms = [term.strip().lower() for term in include.split(',')]
                    if not any(term in resolved_content.lower() for term in include_terms):
                        continue
                        
                if exclude:
                    exclude_terms = [term.strip().lower() for term in exclude.split(',')]
                    if any(term in resolved_content.lower() for term in exclude_terms):
                        continue
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title_uid
                })
        else:
            # For global search, results are [uid, content, page_title]
            for uid, content, page_title in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                # Apply include/exclude filters
                if include:
                    include_terms = [term.strip().lower() for term in include.split(',')]
                    if not any(term in resolved_content.lower() for term in include_terms):
                        continue
                        
                if exclude:
                    exclude_terms = [term.strip().lower() for term in exclude.split(',')]
                    if any(term in resolved_content.lower() for term in exclude_terms):
                        continue
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title
                })
        
        # Build message
        message = f"Found {len(matches)} block(s) with status {status}"
        if include:
            message += f" including '{include}'"
        if exclude:
            message += f" excluding '{exclude}'"
        
        return {
            "success": True,
            "matches": matches,
            "message": message
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error searching by status: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Error searching by status: {str(e)}"
        }


def search_block_refs(block_uid: Optional[str] = None, page_title_uid: Optional[str] = None) -> Dict[str, Any]:
    """
    Search for block references.
    
    Args:
        block_uid: Optional UID of the block to find references to
        page_title_uid: Optional page title or UID to scope the search
        
    Returns:
        Search results
    """
    session, headers = get_session_and_headers()
    
    # Determine what kind of search we're doing
    if block_uid:
        block_ref_pattern = f"(({block_uid}))"
        description = f"referencing block (({block_uid}))"
    else:
        block_ref_pattern = "\\(\\([^)]+\\)\\)"
        description = "containing block references"
    
    try:
        # Build query based on whether we're searching in a specific page
        if page_title_uid:
            # Try to find the page UID if a title was provided
            page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
            
            if not page_uid:
                return {
                    "success": False,
                    "matches": [],
                    "message": f"Page '{page_title_uid}' not found"
                }
                
            if block_uid:
                query = f"""[:find ?uid ?s
                          :where
                          [?p :block/uid "{page_uid}"]
                          [?b :block/page ?p]
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [(clojure.string/includes? ?s "{block_ref_pattern}")]]"""
            else:
                query = f"""[:find ?uid ?s
                          :where
                          [?p :block/uid "{page_uid}"]
                          [?b :block/page ?p]
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [(re-find #"\\(\\([^)]+\\)\\)" ?s)]]"""
        else:
            if block_uid:
                query = f"""[:find ?uid ?s ?page-title
                          :where
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :block/page ?p]
                          [?p :node/title ?page-title]
                          [(clojure.string/includes? ?s "{block_ref_pattern}")]]"""
            else:
                query = f"""[:find ?uid ?s ?page-title
                          :where
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :block/page ?p]
                          [?p :node/title ?page-title]
                          [(re-find #"\\(\\([^)]+\\)\\)" ?s)]]"""
        
        # Execute the query
        logger.debug(f"Executing block reference search")
        results = execute_query(query)
        
        # Process the results
        matches = []
        if page_title_uid:
            # For page-specific search, results are [uid, content]
            for uid, content in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title_uid
                })
        else:
            # For global search, results are [uid, content, page_title]
            for uid, content, page_title in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "page_title": page_title
                })
        
        return {
            "success": True,
            "matches": matches,
            "message": f"Found {len(matches)} block(s) {description}"
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error searching block references: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Error searching block references: {str(e)}"
        }


def search_hierarchy(parent_uid: Optional[str] = None, child_uid: Optional[str] = None, 
                     page_title_uid: Optional[str] = None, max_depth: int = 1) -> Dict[str, Any]:
    """
    Search for parents or children in the block hierarchy.
    
    Args:
        parent_uid: Optional UID of the block to find children of
        child_uid: Optional UID of the block to find parents of
        page_title_uid: Optional page title or UID to scope the search
        max_depth: Maximum depth to search
        
    Returns:
        Search results
    """
    if not parent_uid and not child_uid:
        return {
            "success": False,
            "matches": [],
            "message": "Either parent_uid or child_uid must be provided"
        }
    
    if max_depth < 1:
        return {
            "success": False,
            "matches": [],
            "message": "max_depth must be at least 1"
        }
        
    if max_depth > 10:
        max_depth = 10
        logger.warning("max_depth limited to 10")
    
    session, headers = get_session_and_headers()
    
    # Define ancestor rule
    ancestor_rule = """[
        [(ancestor ?child ?parent ?depth)
            [?parent :block/children ?child]
            [(identity 1) ?depth]]
        [(ancestor ?child ?parent ?depth)
            [?mid :block/children ?child]
            (ancestor ?mid ?parent ?prev-depth)
            [(+ ?prev-depth 1) ?depth]]
    ]"""
    
    try:
        # Determine search type and build query
        if parent_uid:
            # Searching for children
            if page_title_uid:
                # Try to find the page UID if a title was provided
                page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
                
                if not page_uid:
                    return {
                        "success": False,
                        "matches": [],
                        "message": f"Page '{page_title_uid}' not found"
                    }
                    
                query = f"""[:find ?uid ?s ?depth
                          :in $ % ?parent-uid ?max-depth
                          :where
                          [?parent :block/uid ?parent-uid]
                          [?p :block/uid "{page_uid}"]
                          (ancestor ?b ?parent ?depth)
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :block/page ?p]
                          [(<= ?depth ?max-depth)]]"""
                inputs = [ancestor_rule, parent_uid, max_depth]
            else:
                query = f"""[:find ?uid ?s ?page-title ?depth
                          :in $ % ?parent-uid ?max-depth
                          :where
                          [?parent :block/uid ?parent-uid]
                          (ancestor ?b ?parent ?depth)
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :block/page ?p]
                          [?p :node/title ?page-title]
                          [(<= ?depth ?max-depth)]]"""
                inputs = [ancestor_rule, parent_uid, max_depth]
            
            description = f"descendants of block {parent_uid}"
        else:
            # Searching for parents
            if page_title_uid:
                # Try to find the page UID if a title was provided
                page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
                
                if not page_uid:
                    return {
                        "success": False,
                        "matches": [],
                        "message": f"Page '{page_title_uid}' not found"
                    }
                    
                query = f"""[:find ?uid ?s ?depth
                          :in $ % ?child-uid ?max-depth
                          :where
                          [?child :block/uid ?child-uid]
                          [?p :block/uid "{page_uid}"]
                          (ancestor ?child ?b ?depth)
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :block/page ?p]
                          [(<= ?depth ?max-depth)]]"""
                inputs = [ancestor_rule, child_uid, max_depth]
            else:
                query = f"""[:find ?uid ?s ?page-title ?depth
                          :in $ % ?child-uid ?max-depth
                          :where
                          [?child :block/uid ?child-uid]
                          (ancestor ?child ?b ?depth)
                          [?b :block/string ?s]
                          [?b :block/uid ?uid]
                          [?b :block/page ?p]
                          [?p :node/title ?page-title]
                          [(<= ?depth ?max-depth)]]"""
                inputs = [ancestor_rule, child_uid, max_depth]
            
            description = f"ancestors of block {child_uid}"
        
        # Execute the query
        logger.debug(f"Executing hierarchy search with max_depth: {max_depth}")
        results = execute_query(query, inputs)
        
        # Process the results
        matches = []
        if page_title_uid:
            # For page-specific search, results are [uid, content, depth]
            for uid, content, depth in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "depth": depth,
                    "page_title": page_title_uid
                })
        else:
            # For global search, results are [uid, content, page_title, depth]
            for uid, content, page_title, depth in results:
                # Resolve references if present
                resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                
                matches.append({
                    "block_uid": uid,
                    "content": resolved_content,
                    "depth": depth,
                    "page_title": page_title
                })
        
        return {
            "success": True,
            "matches": matches,
            "message": f"Found {len(matches)} block(s) as {description}"
        }
    except PageNotFoundError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except BlockNotFoundError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error searching hierarchy: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Error searching hierarchy: {str(e)}"
        }


def search_by_date(start_date: str, end_date: Optional[str] = None, 
                   type_filter: str = "created", scope: str = "blocks",
                   include_content: bool = True) -> Dict[str, Any]:
    """
    Search for blocks or pages based on creation or modification dates.
    
    Args:
        start_date: Start date in ISO format (YYYY-MM-DD)
        end_date: Optional end date in ISO format (YYYY-MM-DD)
        type_filter: Whether to search by "created", "modified", or "both"
        scope: Whether to search "blocks", "pages", or "both"
        include_content: Whether to include block/page content
        
    Returns:
        Search results
    """
    # Validate inputs
    if type_filter not in ["created", "modified", "both"]:
        return {
            "success": False,
            "matches": [],
            "message": "Type must be 'created', 'modified', or 'both'"
        }
    
    if scope not in ["blocks", "pages", "both"]:
        return {
            "success": False,
            "matches": [],
            "message": "Scope must be 'blocks', 'pages', or 'both'"
        }
    
    # Parse dates
    try:
        start_timestamp = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000)
        
        if end_date:
            # Set end_date to end of day
            end_dt = datetime.strptime(end_date, "%Y-%m-%d")
            end_dt = end_dt.replace(hour=23, minute=59, second=59)
            end_timestamp = int(end_dt.timestamp() * 1000)
        else:
            # Default to now if no end date
            end_timestamp = int(datetime.now().timestamp() * 1000)
    except ValueError:
        return {
            "success": False,
            "matches": [],
            "message": "Invalid date format. Dates should be in YYYY-MM-DD format."
        }
    
    session, headers = get_session_and_headers()
    
    # Track matches across all queries to handle sorting
    all_matches = []
    logger.debug(f"Executing date search: {start_date} to {end_date or 'now'}")
    
    try:
        # Build and execute queries based on scope and type
        # Block queries for creation time
        if scope in ["blocks", "both"] and type_filter in ["created", "both"]:
            logger.debug("Searching blocks by creation time")
            query = f"""[:find ?uid ?s ?page-title ?time
                      :where
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :block/page ?p]
                      [?p :node/title ?page-title]
                      [?b :create/time ?time]
                      [(>= ?time {start_timestamp})]
                      [(<= ?time {end_timestamp})]]
                      :limit 1000"""
            
            block_created_results = execute_query(query)
            
            for uid, content, page_title, time in block_created_results:
                match_data = {
                    "uid": uid,
                    "type": "block",
                    "time": time,
                    "time_type": "created",
                    "page_title": page_title
                }
                
                if include_content:
                    resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                    match_data["content"] = resolved_content
                
                all_matches.append(match_data)
        
        # Block queries for modification time
        if scope in ["blocks", "both"] and type_filter in ["modified", "both"]:
            logger.debug("Searching blocks by modification time")
            query = f"""[:find ?uid ?s ?page-title ?time
                      :where
                      [?b :block/string ?s]
                      [?b :block/uid ?uid]
                      [?b :block/page ?p]
                      [?p :node/title ?page-title]
                      [?b :edit/time ?time]
                      [(>= ?time {start_timestamp})]
                      [(<= ?time {end_timestamp})]]
                      :limit 1000"""
            
            block_modified_results = execute_query(query)
            
            for uid, content, page_title, time in block_modified_results:
                match_data = {
                    "uid": uid,
                    "type": "block",
                    "time": time,
                    "time_type": "modified",
                    "page_title": page_title
                }
                
                if include_content:
                    resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
                    match_data["content"] = resolved_content
                
                all_matches.append(match_data)
        
        # Page queries for creation time
        if scope in ["pages", "both"] and type_filter in ["created", "both"]:
            logger.debug("Searching pages by creation time")
            query = f"""[:find ?uid ?title ?time
                      :where
                      [?p :node/title ?title]
                      [?p :block/uid ?uid]
                      [?p :create/time ?time]
                      [(>= ?time {start_timestamp})]
                      [(<= ?time {end_timestamp})]]
                      :limit 500"""
            
            page_created_results = execute_query(query)
            
            for uid, title, time in page_created_results:
                match_data = {
                    "uid": uid,
                    "type": "page",
                    "time": time,
                    "time_type": "created",
                    "title": title
                }
                
                if include_content:
                    # Get a sample of page content (first few blocks)
                    sample_query = f"""[:find ?s
                                    :where
                                    [?p :block/uid "{uid}"]
                                    [?b :block/page ?p]
                                    [?b :block/string ?s]
                                    [?b :block/order ?o]
                                    [(< ?o 3)]]
                                    :limit 3"""
                    
                    page_blocks = execute_query(sample_query)
                    page_sample = "\n".join([content[0] for content in page_blocks[:3]])
                    
                    if page_blocks:
                        match_data["content"] = f"# {title}\n{page_sample}"
                        if len(page_blocks) > 3:
                            match_data["content"] += "\n..."
                    else:
                        match_data["content"] = f"# {title}\n(No content)"
                
                all_matches.append(match_data)
        
        # Page queries for modification time
        if scope in ["pages", "both"] and type_filter in ["modified", "both"]:
            logger.debug("Searching pages by modification time")
            query = f"""[:find ?uid ?title ?time
                      :where
                      [?p :node/title ?title]
                      [?p :block/uid ?uid]
                      [?p :edit/time ?time]
                      [(>= ?time {start_timestamp})]
                      [(<= ?time {end_timestamp})]]
                      :limit 500"""
            
            page_modified_results = execute_query(query)
            
            for uid, title, time in page_modified_results:
                match_data = {
                    "uid": uid,
                    "type": "page",
                    "time": time,
                    "time_type": "modified",
                    "title": title
                }
                
                if include_content:
                    # Get a sample of page content (first few blocks)
                    sample_query = f"""[:find ?s
                                    :where
                                    [?p :block/uid "{uid}"]
                                    [?b :block/page ?p]
                                    [?b :block/string ?s]
                                    [?b :block/order ?o]
                                    [(< ?o 3)]]
                                    :limit 3"""
                    
                    page_blocks = execute_query(sample_query)
                    page_sample = "\n".join([content[0] for content in page_blocks[:3]])
                    
                    if page_blocks:
                        match_data["content"] = f"# {title}\n{page_sample}"
                        if len(page_blocks) > 3:
                            match_data["content"] += "\n..."
                    else:
                        match_data["content"] = f"# {title}\n(No content)"
                
                all_matches.append(match_data)
        
        # Sort by time (newest first)
        all_matches.sort(key=lambda x: x["time"], reverse=True)
        
        # Deduplicate by UID and time_type
        seen = set()
        unique_matches = []
        
        for match in all_matches:
            key = (match["uid"], match["time_type"])
            if key not in seen:
                seen.add(key)
                unique_matches.append(match)
        
        return {
            "success": True,
            "matches": unique_matches,
            "message": f"Found {len(unique_matches)} matches for the given date range and criteria"
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error searching by date: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Error searching by date: {str(e)}"
        }


def find_pages_modified_today(max_num_pages: int = 50) -> Dict[str, Any]:
    """
    Find pages that have been modified today.
    
    Args:
        max_num_pages: Maximum number of pages to return
        
    Returns:
        List of modified pages
    """
    if max_num_pages < 1:
        return {
            "success": False,
            "pages": [],
            "message": "max_num_pages must be at least 1"
        }
    
    # Define ancestor rule
    ancestor_rule = """[
        [(ancestor ?b ?a)
          [?a :block/children ?b]]
        [(ancestor ?b ?a)
          [?parent :block/children ?b]
          (ancestor ?parent ?a)]
    ]"""
    
    # Get start of today
    today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    start_timestamp = int(today.timestamp() * 1000)
    
    try:
        # Query for pages modified today
        logger.debug(f"Finding pages modified today (since {today.isoformat()})")
        query = f"""[:find ?title
                    :in $ ?start_timestamp %
                    :where
                    [?page :node/title ?title]
                    (ancestor ?block ?page)
                    [?block :edit/time ?time]
                    [(> ?time ?start_timestamp)]]
                    :limit {max_num_pages}"""
        
        results = execute_query(query, [start_timestamp, ancestor_rule])
        
        # Extract unique page titles
        unique_pages = list(set([title[0] for title in results]))[:max_num_pages]
        
        return {
            "success": True,
            "pages": unique_pages,
            "message": f"Found {len(unique_pages)} page(s) modified today"
        }
    except QueryError as e:
        return {
            "success": False,
            "pages": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error finding pages modified today: {str(e)}")
        return {
            "success": False,
            "pages": [],
            "message": f"Error finding pages modified today: {str(e)}"
        }


def execute_datomic_query(query: str, inputs: Optional[List[Any]] = None) -> Dict[str, Any]:
    """
    Execute a custom Datomic query.
    
    Args:
        query: The Datomic query
        inputs: Optional list of inputs
        
    Returns:
        Query results
    """
    if not query:
        return {
            "success": False,
            "matches": [],
            "message": "Query cannot be empty"
        }
    
    try:
        # Validate query format (basic check)
        if not query.strip().startswith('[:find'):
            logger.warning("Query doesn't start with [:find, may not be valid Datalog syntax")
        
        logger.debug(f"Executing custom Datomic query")
        results = execute_query(query, inputs or [])
        
        # Format results for display
        formatted_results = []
        for result in results:
            if isinstance(result, (list, tuple)):
                formatted_result = " | ".join(str(item) for item in result)
            else:
                formatted_result = str(result)
                
            formatted_results.append({
                "content": formatted_result,
                "block_uid": "",
                "page_title": ""
            })
        
        return {
            "success": True,
            "matches": formatted_results,
            "message": f"Query executed successfully. Found {len(formatted_results)} results."
        }
    except QueryError as e:
        return {
            "success": False,
            "matches": [],
            "message": str(e)
        }
    except Exception as e:
        logger.error(f"Error executing datomic query: {str(e)}")
        return {
            "success": False,
            "matches": [],
            "message": f"Failed to execute query: {str(e)}"
        }
```