# Directory Structure
```
├── .dockerignore
├── .gitignore
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── readme.md
├── requirements.txt
├── roam_mcp
│ ├── __init__.py
│ ├── api.py
│ ├── cli.py
│ ├── content_parsers.py
│ ├── content.py
│ ├── memory.py
│ ├── search.py
│ ├── server.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
# Version control
.git
.gitignore
# Python cache files
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
venv/
ENV/
env/
# IDE specific files
.idea/
.vscode/
*.swp
*.swo
# Docker specific
Dockerfile
.dockerignore
# Local development files
.env
*.log
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# OS-specific
.DS_Store
# Python bytecode and C extensions
__pycache__/
*.py[cod]
*$py.class
*.so
# Build system artifacts
.Python
build/
dist/
downloads/
*.egg-info/
.eggs/
*.egg
MANIFEST
wheels/
share/python-wheels/
sdist/
develop-eggs/
lib/
lib64/
parts/
var/
.installed.cfg
# Packaging tools
*.spec
# Logs and runtime files
*.log
pip-log.txt
pip-delete-this-directory.txt
# Coverage and test artifacts
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.pytest_cache/
.hypothesis/
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDEs and editors
.vscode/
.idea/
.spyderproject
.spyproject
.ropeproject
# Jupyter and IPython
.ipynb_checkpoints/
profile_default/
ipython_config.py
# Documentation
docs/_build/
site/
# Type checking
.mypy_cache/
.dmypy.json
.pyre/
.pytype/
# Other project tools
.pybuilder/
target/
scrapy/
.webassets-cache
celerybeat-schedule
celerybeat.pid
# SQLite (dev DBs)
db.sqlite3
db.sqlite3-journal
# Claude Code and MCP configuration
.mcp.json
CLAUDE.md
WARP.md
```
--------------------------------------------------------------------------------
/readme.md:
--------------------------------------------------------------------------------
```markdown
# Roam Research MCP Server
A Model Context Protocol (MCP) server that connects Claude and other AI assistants to your Roam Research graph.
## What This Does
This server acts as a bridge between AI assistants and your Roam Research database. After setup, you can simply ask Claude to work with your Roam data - no coding required.
For example, you can say:
- "Add these meeting notes to today's daily note in Roam"
- "Search my Roam graph for blocks tagged with #ProjectIdeas"
- "Create a new page in Roam called 'Project Planning'"
- "Find all TODO items I created this month"
## Features
### Content Creation
- Create new pages with nested content and headings
- Add blocks to any page with proper hierarchy
- Create structured outlines with customizable nesting
- Import markdown with proper nesting
- Add todo items with automatic TODO status
- Update existing content individually or in batches
- Modify block content with pattern transformations
### Search and Retrieval
- Find pages and blocks by title, text, or tags
- Search for TODO/DONE items with filtering options
- Find recently modified content
- Search block references and explore block hierarchies
- Search by creation or modification dates
- Navigate parent-child relationships in blocks
- Execute custom Datalog queries for advanced needs
### Memory System
- Store information for Claude to remember across conversations
- Recall stored memories with filtering and sorting options
- Tag memories with custom categories
- Access both recent and older memories with flexible retrieval
### URL Content Processing
- Extract and import content from webpages
- Parse and extract text from PDF documents
- Retrieve YouTube video transcripts
- Intelligently detect content type and process accordingly
## Setup Instructions
1. Install Claude Desktop from [https://claude.ai/download](https://claude.ai/download)
2. Edit your Claude Desktop configuration file:
- Mac: `~/Library/Application Support/Claude/claude_desktop_config.json`
- Windows: `%APPDATA%\Claude\claude_desktop_config.json`
3. Add this configuration:
```json
{
"mcpServers": {
"roam-helper": {
"command": "uvx",
"args": ["git+https://github.com/PhiloSolares/roam-mcp.git"],
"env": {
"ROAM_API_TOKEN": "<your_roam_api_token>",
"ROAM_GRAPH_NAME": "<your_roam_graph_name>"
}
}
}
}
```
4. Get your Roam API token:
- Go to your Roam Research graph settings
- Navigate to "API tokens"
- Click "+ New API Token"
- Copy the token to your configuration
## How to Use
Once set up, simply chat with Claude and ask it to work with your Roam graph. Claude will use the appropriate MCP commands behind the scenes.
Example conversations:
**Creating Content:**
> You: "Claude, please create a new page in my Roam graph called 'Project Ideas' with a section for mobile app ideas."
**Searching Content:**
> You: "Find all blocks in my Roam graph tagged with #ProjectIdeas that also mention mobile apps."
>
> You: "Show me all the TODO items I created this week."
**Using the Memory System:**
> You: "Remember that I want to use spaced repetition for learning JavaScript."
>
> Later:
> You: "What learning techniques have we discussed for programming?"
**Working with External Content:**
> You: "Extract the main points from this PDF and add them to my Roam graph."
>
> You: "Get the transcript from this YouTube video about productivity."
## Advanced Configuration
By default, memories are stored with the tag `#[[Memories]]`. To use a different tag:
```json
"env": {
"ROAM_API_TOKEN": "your-token",
"ROAM_GRAPH_NAME": "your-graph",
"MEMORIES_TAG": "#[[Claude/Memories]]"
}
```
## Docker Support
You can run the Roam MCP server in a Docker container:
### Building the Image
```bash
docker build -t roam-mcp .
```
### Running the Container
Run with environment variables:
```bash
docker run -p 3000:3000 \
-e ROAM_API_TOKEN="your_api_token" \
-e ROAM_GRAPH_NAME="your_graph_name" \
roam-mcp
```
### Using with Claude Desktop
Configure Claude Desktop to use the containerized server:
```json
{
"mcpServers": {
"roam-helper": {
"command": "docker",
"args": ["run", "--rm", "-p", "3000:3000",
"-e", "ROAM_API_TOKEN=your_token",
"-e", "ROAM_GRAPH_NAME=your_graph",
"roam-mcp"],
"env": {}
}
}
}
```
## License
MIT License
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp>=1.3.0
httpx>=0.24.0
pydantic>=2.0.0
youtube-transcript-api>=0.6.0
requests>=2.28.0
python-dotenv>=1.0.0
trafilatura>=1.6.0
unstructured[pdf]>=0.10.0
```
--------------------------------------------------------------------------------
/roam_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Roam Research MCP Server - Python implementation
Connect Claude to your Roam Research database
Enhanced version with improved architecture and features
"""
__version__ = "0.3.0"
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Base image
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV ROAM_API_TOKEN=""
ENV ROAM_GRAPH_NAME=""
ENV MEMORIES_TAG="#[[Memories]]"
# Install system dependencies for PDF processing
RUN apt-get update && \
apt-get install -y --no-install-recommends \
gcc \
poppler-utils \
libmagic1 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# Create a non-root user
RUN useradd -m appuser
# Create and set working directory
WORKDIR /app
# Copy requirements file for caching
COPY --chown=appuser:appuser requirements.txt ./
# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# Copy the application code
COPY --chown=appuser:appuser . .
# Change to non-root user
USER appuser
# Expose port for SSE transport
EXPOSE 3000
# Command to run the application (can be overridden)
CMD ["python", "-m", "roam_mcp.cli", "--transport", "sse", "--port", "3000"]
```
--------------------------------------------------------------------------------
/roam_mcp/cli.py:
--------------------------------------------------------------------------------
```python
"""Command-line interface for the Roam MCP server."""
import argparse
import sys
from roam_mcp.server import run_server
def main():
"""Entry point for the Roam MCP server CLI."""
parser = argparse.ArgumentParser(description="Roam Research MCP Server")
# Transport options
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
help="Transport method (stdio or sse)"
)
# Server configuration
parser.add_argument(
"--port",
type=int,
default=3000,
help="Port for SSE transport (default: 3000)"
)
# Verbosity options
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable verbose logging"
)
# Parse arguments
args = parser.parse_args()
# Run the server with the specified transport
try:
run_server(
transport=args.transport,
port=args.port if args.transport == "sse" else None,
verbose=args.verbose
)
except KeyboardInterrupt:
print("\nServer stopped by user", file=sys.stderr)
sys.exit(0)
except Exception as e:
print(f"Error starting server: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "roam-mcp"
version = "0.2.0"
description = "A Model Context Protocol server for Roam Research integration with AI assistants"
readme = "README.md"
requires-python = ">=3.9"
license = {text = "MIT"}
authors = [
{name = "Roam MCP Project Contributors"}
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: End Users/Desktop",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
]
dependencies = [
"mcp>=1.3.0",
"httpx>=0.24.0",
"pydantic>=2.0.0",
"youtube-transcript-api>=0.6.0",
"requests>=2.28.0",
"python-dotenv>=1.0.0",
"trafilatura>=1.6.0",
"unstructured[pdf]>=0.10.0"
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"black>=23.0.0",
"isort>=5.12.0",
"mypy>=1.0.0",
"pylint>=2.17.0"
]
pdf = [
"poppler-utils>=23.01.0"
]
[project.scripts]
roam-mcp = "roam_mcp.cli:main"
[tool.hatch.build.targets.wheel]
packages = ["roam_mcp"]
[tool.black]
line-length = 100
target-version = ["py39"]
[tool.isort]
profile = "black"
line_length = 100
[tool.mypy]
python_version = "3.9"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
[tool.pylint.messages_control]
disable = [
"missing-docstring",
"invalid-name"
]
[project.urls]
"Homepage" = "https://github.com/PhiloSolares/roam-mcp"
"Bug Tracker" = "https://github.com/PhiloSolares/roam-mcp/issues"
```
--------------------------------------------------------------------------------
/roam_mcp/content_parsers.py:
--------------------------------------------------------------------------------
```python
"""External content parsing operations for the Roam MCP server."""
import os
import tempfile
import logging
from typing import Dict, Any, Optional
import httpx
import trafilatura
from unstructured.partition.pdf import partition_pdf
# Set up logging
logger = logging.getLogger("roam-mcp.content_parsers")
async def parse_webpage(url: str) -> Dict[str, Any]:
"""
Parse content from a web page URL.
Args:
url: URL of the webpage to parse
Returns:
Result with parsed content
"""
try:
logger.debug(f"Fetching web page content from: {url}")
downloaded = trafilatura.fetch_url(url)
if not downloaded:
return {
"success": False,
"error": f"Failed to download content from {url}"
}
# Extract main content with document structure preserved
content = trafilatura.extract(
downloaded,
output_format='text',
include_links=False,
include_formatting=True
)
if not content:
return {
"success": False,
"error": f"Failed to extract meaningful content from {url}"
}
# Get metadata
metadata = trafilatura.extract_metadata(downloaded)
title = metadata.get('title', 'Untitled Page')
return {
"success": True,
"content": content,
"title": title,
"url": url
}
except Exception as e:
logger.error(f"Error parsing web page: {str(e)}")
return {
"success": False,
"error": f"Error parsing web page: {str(e)}"
}
async def parse_pdf(url: str) -> Dict[str, Any]:
"""
Parse content from a PDF URL.
Args:
url: URL of the PDF to parse
Returns:
Result with parsed content
"""
try:
logger.debug(f"Fetching PDF content from: {url}")
# Download the PDF to a temporary file
async with httpx.AsyncClient() as client:
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
# Check if it's a PDF based on Content-Type
content_type = response.headers.get('Content-Type', '')
if 'application/pdf' not in content_type.lower():
return {
"success": False,
"error": f"URL does not point to a PDF (Content-Type: {content_type})"
}
# Create a temporary file for the PDF
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_file:
temp_path = temp_file.name
temp_file.write(response.content)
# Extract content using unstructured
try:
elements = partition_pdf(
temp_path,
strategy="hi_res",
extract_images=False,
extract_tables=True
)
# Convert to formatted text while preserving structure
content = "\n\n".join([str(element) for element in elements])
except UnicodeDecodeError:
# Fall back to a simpler strategy if hi_res fails with encoding issues
logger.warning(f"Encountered encoding issues with hi_res strategy, trying fast strategy")
elements = partition_pdf(
temp_path,
strategy="fast",
extract_images=False,
extract_tables=False
)
content = "\n\n".join([str(element) for element in elements])
# Try to extract a title from the filename in the URL
path_parts = url.split('/')
filename = path_parts[-1].split('?')[0] # Remove query parameters
title = os.path.splitext(filename)[0].replace('-', ' ').replace('_', ' ').title()
if not title:
title = "PDF Document"
# Clean up temporary file
os.unlink(temp_path)
return {
"success": True,
"content": content,
"title": title,
"url": url
}
except Exception as e:
logger.error(f"Error parsing PDF: {str(e)}")
# Clean up temporary file if it exists
try:
if 'temp_path' in locals():
os.unlink(temp_path)
except:
pass
return {
"success": False,
"error": f"Error parsing PDF: {str(e)}"
}
```
--------------------------------------------------------------------------------
/roam_mcp/memory.py:
--------------------------------------------------------------------------------
```python
"""Memory system operations for the Roam MCP server."""
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import logging
from roam_mcp.api import (
execute_query,
execute_write_action,
get_session_and_headers,
GRAPH_NAME,
get_daily_page,
add_block_to_page,
MEMORIES_TAG,
ValidationError,
PageNotFoundError,
QueryError
)
from roam_mcp.utils import (
format_roam_date,
resolve_block_references
)
# Set up logging
logger = logging.getLogger("roam-mcp.memory")
def remember(memory: str, categories: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Store a memory with the specified MEMORIES_TAG.
Args:
memory: The memory to store
categories: Optional list of categories to tag the memory with
Returns:
Result with success status
"""
if not memory:
return {
"success": False,
"error": "Memory cannot be empty"
}
session, headers = get_session_and_headers()
try:
# Validate and normalize categories
normalized_categories = []
if categories:
# Ensure all categories are strings
invalid_categories = [cat for cat in categories if not isinstance(cat, str)]
if invalid_categories:
return {
"success": False,
"error": "All categories must be strings"
}
# Normalize category formats
for category in categories:
category = category.strip()
if not category:
continue
# Remove any existing tag syntax
clean_category = category.replace('#', '').replace('[[', '').replace(']]', '')
# Add to normalized list
normalized_categories.append(clean_category)
# Get today's daily page
daily_page_uid = get_daily_page()
# Format memory with tags
formatted_memory = MEMORIES_TAG
# Add the memory text
formatted_memory += f" {memory}"
# Add category tags
for category in normalized_categories:
# Format category as Roam tag
if " " in category or "/" in category:
tag = f"#[[{category}]]"
else:
tag = f"#{category}"
formatted_memory += f" {tag}"
# Create memory block
block_uid = add_block_to_page(daily_page_uid, formatted_memory)
return {
"success": True,
"block_uid": block_uid,
"content": formatted_memory
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except PageNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error storing memory: {str(e)}")
return {
"success": False,
"error": f"Error storing memory: {str(e)}"
}
def recall(sort_by: str = "newest", filter_tag: Optional[str] = None) -> Dict[str, Any]:
"""
Recall stored memories, optionally filtered by tag.
Args:
sort_by: Sort order ("newest" or "oldest")
filter_tag: Optional tag to filter memories by
Returns:
List of memory contents
"""
if sort_by not in ["newest", "oldest"]:
return {
"success": False,
"error": "sort_by must be 'newest' or 'oldest'"
}
session, headers = get_session_and_headers()
# Clean and normalize the MEMORIES_TAG for queries
clean_tag = MEMORIES_TAG.replace('#', '').replace('[[', '').replace(']]', '')
# Prepare filter tag conditions if needed
filter_conditions = ""
if filter_tag:
# Clean and normalize filter tag
clean_filter = filter_tag.replace('#', '').replace('[[', '').replace(']]', '')
# Generate filter tag variants
filter_variants = []
if " " in clean_filter or "/" in clean_filter:
filter_variants = [f"#{clean_filter}", f"#[[{clean_filter}]]", f"[[{clean_filter}]]"]
else:
filter_variants = [f"#{clean_filter}", f"#[[{clean_filter}]]", f"[[{clean_filter}]]"]
# Build filter conditions
filter_conditions_list = []
for variant in filter_variants:
filter_conditions_list.append(f'(clojure.string/includes? ?s "{variant}")')
if filter_conditions_list:
filter_conditions = f" AND (or {' '.join(filter_conditions_list)})"
try:
logger.debug(f"Recalling memories with sort_by={sort_by}")
if filter_tag:
logger.debug(f"Filtering by tag: {filter_tag}")
# Method 1: Search for blocks containing the MEMORIES_TAG across the database
# Generate tag variants
tag_variants = []
if " " in clean_tag or "/" in clean_tag:
tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
else:
tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
# Build tag conditions
tag_conditions = []
for variant in tag_variants:
tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
tag_condition = f"(or {' '.join(tag_conditions)})"
# Create combined condition with filter if needed
combined_condition = tag_condition
if filter_conditions:
combined_condition = f"(and {tag_condition}{filter_conditions})"
# Query blocks with tag
tag_query = f"""[:find ?uid ?s ?time ?page-title
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :create/time ?time]
[?b :block/page ?p]
[?p :node/title ?page-title]
[{combined_condition}]]"""
tag_results = execute_query(tag_query)
# Method 2: Also check for dedicated page with the clean tag name
page_query = f"""[:find ?uid ?s ?time
:where
[?p :node/title "{clean_tag}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :create/time ?time]]"""
# Add filter if needed
if filter_conditions:
page_query = f"""[:find ?uid ?s ?time
:where
[?p :node/title "{clean_tag}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :create/time ?time]
[{filter_conditions.replace('AND ', '')}]]"""
page_results = execute_query(page_query)
# Process and combine results
memories = []
# Process tag results
for uid, content, time, page_title in tag_results:
# Resolve references
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
memories.append({
"content": resolved_content,
"time": time,
"page_title": page_title,
"block_uid": uid
})
# Process page results
for uid, content, time in page_results:
# Resolve references
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
memories.append({
"content": resolved_content,
"time": time,
"page_title": clean_tag,
"block_uid": uid
})
# Sort by time
memories.sort(key=lambda x: x["time"], reverse=(sort_by == "newest"))
# Clean up content - remove the MEMORIES_TAG
for memory in memories:
content = memory["content"]
for variant in tag_variants:
content = content.replace(variant, "")
memory["content"] = content.strip()
# Remove duplicates while preserving order
seen_contents = set()
unique_memories = []
for memory in memories:
content = memory["content"]
if content and content not in seen_contents:
seen_contents.add(content)
unique_memories.append(memory)
# Return just the content strings
memory_contents = [memory["content"] for memory in unique_memories]
return {
"success": True,
"memories": memory_contents,
"message": f"Found {len(memory_contents)} memories"
}
except QueryError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error recalling memories: {str(e)}")
return {
"success": False,
"error": f"Error recalling memories: {str(e)}"
}
```
--------------------------------------------------------------------------------
/roam_mcp/utils.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for the Roam MCP server."""
import re
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional, Set, Match, Tuple, Union
import json
import time
import uuid
# Set up logging
logger = logging.getLogger("roam-mcp.utils")
# Date formatting
def format_roam_date(date: Optional[datetime] = None) -> str:
"""
Format a date in Roam's preferred format (e.g., "March 25th, 2025").
Args:
date: The date to format, defaults to today's date
Returns:
A string in Roam's date format
"""
if date is None:
date = datetime.now()
day = date.day
if 11 <= day <= 13:
suffix = "th"
else:
suffix = {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
return date.strftime(f"%B %-d{suffix}, %Y")
# Regular expressions for markdown elements
MD_BOLD_PATTERN = r'\*\*(.+?)\*\*'
MD_ITALIC_PATTERN = r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)'
MD_ITALIC_UNDERSCORE_PATTERN = r'(?<!_)_(?!_)(.+?)(?<!_)_(?!_)'
MD_HIGHLIGHT_PATTERN = r'==(.+?)=='
MD_LINK_PATTERN = r'\[([^\]]+)\]\(([^)]+)\)'
MD_CODE_BLOCK_PATTERN = r'```([a-zA-Z0-9]*)\s*\n([\s\S]*?)```'
MD_INLINE_CODE_PATTERN = r'`([^`]+)`'
# Table regex patterns
MD_TABLE_PATTERN = r'(?:\|(.+)\|\s*\n\|(?::?-+:?\|)+\s*\n(?:\|(?:.+)\|\s*\n*)+)'
MD_TABLE_ROW_PATTERN = r'\|(.*)\|'
MD_TABLE_HEADER_PATTERN = r'\|(\s*:?-+:?\s*)\|'
MD_TABLE_ALIGNMENT_PATTERN = r'^(:?)-+(:?)$' # For detecting alignment in table headers
# Headings pattern
MD_HEADING_PATTERN = r'^(#{1,6})\s+(.+)$'
# Markdown conversion utilities
def convert_to_roam_markdown(text: str) -> str:
"""
Convert standard markdown to Roam-compatible format.
Args:
text: Standard markdown text
Returns:
Roam-formatted markdown text
"""
# Convert tables first (they may contain other markdown elements)
text = convert_tables(text)
# Handle code blocks (must be done before other inline elements)
text = convert_code_blocks(text)
# Handle double asterisks/underscores (bold)
text = re.sub(MD_BOLD_PATTERN, r'**\1**', text)
# Handle single asterisks/underscores (italic)
text = re.sub(MD_ITALIC_PATTERN, r'__\1__', text) # Single asterisk to double underscore
text = re.sub(MD_ITALIC_UNDERSCORE_PATTERN, r'__\1__', text) # Single underscore to double underscore
# Handle highlights
text = re.sub(MD_HIGHLIGHT_PATTERN, r'^^\\1^^', text)
# Convert tasks
text = re.sub(r'- \[ \]', r'- {{[[TODO]]}}', text)
text = re.sub(r'- \[x\]', r'- {{[[DONE]]}}', text)
# Convert links
text = re.sub(MD_LINK_PATTERN, r'[\1](\2)', text)
# Handle headings (convert to Roam's heading format)
text = convert_headings(text)
# Handle inline code
text = re.sub(MD_INLINE_CODE_PATTERN, r'`\1`', text)
return text
def convert_headings(text: str) -> str:
"""
Convert markdown headings to Roam's heading format.
Args:
text: Markdown text with potential headings
Returns:
Text with headings converted to Roam format
"""
def heading_replacer(match: Match) -> str:
level = len(match.group(1)) # Number of # characters
content = match.group(2).strip()
# For text format, we'll just keep the heading text and let block attributes
# handle the actual heading level in Roam
return content
# Process line by line to avoid matching # in code blocks
lines = text.split('\n')
for i, line in enumerate(lines):
heading_match = re.match(MD_HEADING_PATTERN, line)
if heading_match:
lines[i] = heading_replacer(heading_match)
return '\n'.join(lines)
def convert_code_blocks(text: str) -> str:
"""
Convert markdown code blocks while preserving language and indentation.
Args:
text: Markdown text with potential code blocks
Returns:
Text with code blocks properly formatted
"""
def code_block_replacer(match: Match) -> str:
language = match.group(1).strip()
code_content = match.group(2)
# Preserve language info
language_tag = f"{language}\n" if language else "\n"
# Clean up indentation
lines = code_content.split('\n')
# Find the common indentation level
non_empty_lines = [line for line in lines if line.strip()]
if non_empty_lines:
common_indent = min(len(line) - len(line.lstrip()) for line in non_empty_lines)
# Remove common indentation
code_content = '\n'.join(line[common_indent:] if line.strip() else line for line in lines)
return f"```{language_tag}{code_content}```"
return re.sub(MD_CODE_BLOCK_PATTERN, code_block_replacer, text)
def convert_tables(text: str) -> str:
"""
Convert markdown tables to Roam format.
Args:
text: Markdown text with potential tables
Returns:
Text with tables converted to Roam format
"""
def table_replacer(match: Match) -> str:
table_text = match.group(0)
# Find all rows
rows = re.findall(MD_TABLE_ROW_PATTERN, table_text)
if len(rows) < 2: # Need at least header and separator
return table_text
# First row is header, second is separator, rest are data
header_cells = [cell.strip() for cell in rows[0].split('|') if cell.strip()]
separator_cells = [cell.strip() for cell in rows[1].split('|') if cell.strip()]
# Determine column alignments from separator row
alignments = []
for sep in separator_cells:
alignment_match = re.match(MD_TABLE_ALIGNMENT_PATTERN, sep)
if alignment_match:
left_colon = bool(alignment_match.group(1))
right_colon = bool(alignment_match.group(2))
if left_colon and right_colon:
alignments.append("center")
elif right_colon:
alignments.append("right")
else:
alignments.append("left")
else:
alignments.append("left") # Default alignment
# Generate Roam table format
roam_table = "{{table}}\n"
# Add header row
for i, header in enumerate(header_cells):
indent = " " * (i + 1)
roam_table += f"{indent}- {header}\n"
# Add data rows - start from index 2 to skip header and separator
for row_idx in range(2, len(rows)):
data_cells = [cell.strip() for cell in rows[row_idx].split('|') if cell.strip()]
for i, cell in enumerate(data_cells):
if i < len(header_cells): # Only process cells that have a corresponding header
indent = " " * (i + 1)
roam_table += f"{indent}- {cell}\n"
return roam_table
return re.sub(MD_TABLE_PATTERN, table_replacer, text)
class MarkdownNode:
"""Class representing a node in the markdown parsing tree."""
def __init__(self, content: str, level: int = 0, heading_level: int = 0):
self.content = content
self.level = level
self.heading_level = heading_level
self.children = []
def add_child(self, node: 'MarkdownNode') -> None:
"""Add a child node to this node."""
self.children.append(node)
def to_dict(self) -> Dict[str, Any]:
"""Convert node to dictionary representation."""
result = {
"text": self.content,
"level": self.level
}
if self.heading_level:
result["heading_level"] = self.heading_level
if self.children:
result["children"] = [child.to_dict() for child in self.children]
return result
def parse_markdown_list(markdown: str) -> List[Dict[str, Any]]:
"""
Parse a markdown list into a hierarchical structure.
Args:
markdown: Markdown text with nested lists
Returns:
List of dictionaries with 'text', 'level', and 'children' keys
"""
# Convert markdown syntax first
markdown = convert_to_roam_markdown(markdown)
lines = markdown.split('\n')
root = MarkdownNode("ROOT", -1) # Root node to hold all top-level items
node_stack = [root]
current_level = -1
in_code_block = False
code_block_content = []
code_block_indent = 0
for line_idx, line in enumerate(lines):
if not line.strip() and not in_code_block:
continue
# Handle code blocks
if "```" in line and not in_code_block:
# Start of code block
in_code_block = True
code_block_content = [line]
# Store the indentation level
code_block_indent = len(line) - len(line.lstrip())
continue
elif "```" in line and in_code_block:
# End of code block - process the entire block
code_block_content.append(line)
# Calculate the level based on indentation
level = code_block_indent // 2
# Join the content with proper line breaks
content = "\n".join(code_block_content)
# Create a node for the code block
node = MarkdownNode(content, level)
# Find the right parent for this node
while len(node_stack) > 1 and node_stack[-1].level >= level:
node_stack.pop()
# Add to parent
node_stack[-1].add_child(node)
# Update stack and level
node_stack.append(node)
current_level = level
# Reset code block state
in_code_block = False
code_block_content = []
continue
elif in_code_block:
# In a code block - just collect the line
code_block_content.append(line)
continue
# Check for heading
heading_match = re.match(MD_HEADING_PATTERN, line)
if heading_match:
level = 0 # Headings are top-level
heading_text = heading_match.group(2).strip()
heading_level = len(heading_match.group(1)) # Number of # characters
# Reset stack for headings
while len(node_stack) > 1:
node_stack.pop()
# Create heading node
node = MarkdownNode(heading_text, level, heading_level)
node_stack[-1].add_child(node)
node_stack.append(node)
current_level = level
continue
# Regular list items
match = re.match(r'^(\s*)[-*+]\s+(.+)$', line)
if match:
indent, content = match.groups()
level = len(indent) // 2 + 1 # Convert indentation to level, starting with 1
# Check for TODO/DONE
if "{{[[TODO]]}}" in content or "{{[[DONE]]}}" in content:
level_to_append = level
else:
level_to_append = level
# Pop stack until we find parent level
while len(node_stack) > 1 and node_stack[-1].level >= level:
node_stack.pop()
# Create new node
node = MarkdownNode(content, level_to_append)
node_stack[-1].add_child(node)
node_stack.append(node)
current_level = level
else:
# Non-list line - treat as continuation of previous item or as top-level text
content = line.strip()
if content and current_level >= 0 and len(node_stack) > 1:
# Add to the current node's content
node_stack[-1].content += "\n" + content
elif content:
# Create as top-level text
node = MarkdownNode(content, 0)
node_stack[0].add_child(node)
node_stack = [root, node]
current_level = 0
# Convert the tree to the expected dictionary format with proper hierarchy
def build_hierarchy(node):
"""Convert a node and its children to a hierarchical dictionary structure."""
result = {
"text": node.content,
"level": node.level
}
if node.heading_level:
result["heading_level"] = node.heading_level
if node.children:
result["children"] = [build_hierarchy(child) for child in node.children]
return result
# Build result with correct hierarchy
hierarchical_result = []
for child in root.children:
hierarchical_result.append(build_hierarchy(child))
# We'll now convert this to the flattened format for backward compatibility
# while preserving hierarchy information for functions that can use it
flattened_result = []
def flatten_hierarchy(item, parent_level=-1, path=None):
"""Flatten a hierarchical structure while preserving parent-child information."""
if path is None:
path = []
# Get item properties
text = item["text"]
level = item.get("level", parent_level + 1)
heading_level = item.get("heading_level", 0)
# Create the flattened item
flat_item = {
"text": text,
"level": level
}
if heading_level:
flat_item["heading_level"] = heading_level
# Add path information for reconstructing hierarchy
flat_item["_path"] = path.copy()
# Add to results
flattened_result.append(flat_item)
# Process children
children = item.get("children", [])
if children:
for i, child in enumerate(children):
child_path = path + [i]
flatten_hierarchy(child, level, child_path)
# Flatten the hierarchical result
for i, item in enumerate(hierarchical_result):
flatten_hierarchy(item, -1, [i])
# We return the flattened result but with _path information
# for reconstructing hierarchy if needed
return flattened_result
def convert_roam_dates(text: str) -> str:
"""
Convert date references to Roam date format.
Args:
text: Text with potential date references
Returns:
Text with dates in Roam format
"""
# Convert ISO dates (YYYY-MM-DD)
def replace_date(match: Match) -> str:
date_str = match.group(0)
try:
date = datetime.strptime(date_str, "%Y-%m-%d")
return format_roam_date(date)
except ValueError:
return date_str
return re.sub(r'\b\d{4}-\d{2}-\d{2}\b', replace_date, text)
def extract_youtube_video_id(url: str) -> Optional[str]:
"""
Extract the video ID from a YouTube URL.
Args:
url: YouTube URL
Returns:
Video ID or None if not found
"""
patterns = [
r"(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})",
r"youtube\.com\/embed\/([a-zA-Z0-9_-]{11})",
r"youtube\.com\/v\/([a-zA-Z0-9_-]{11})",
r"youtube\.com\/user\/[^\/]+\/\?v=([a-zA-Z0-9_-]{11})"
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def detect_url_type(url: str) -> str:
"""
Detect the type of content a URL points to.
Args:
url: URL to analyze
Returns:
Content type: 'youtube', 'pdf', 'webpage', or 'unknown'
"""
url_lower = url.lower()
# Check for YouTube
youtube_patterns = [
r"(?:youtube\.com\/watch\?v=|youtu\.be\/)",
r"youtube\.com\/embed\/",
r"youtube\.com\/v\/",
r"youtube\.com\/user\/[^\/]+\/\?v="
]
for pattern in youtube_patterns:
if re.search(pattern, url_lower):
return "youtube"
# Check for PDF
if url_lower.endswith('.pdf') or '/pdf/' in url_lower:
return "pdf"
# Default to webpage
return "webpage"
def create_block_action(parent_uid: str, content: str, order: Union[int, str] = "last",
uid: Optional[str] = None, heading: Optional[int] = None) -> Dict[str, Any]:
"""
Create a block action for batch operations.
Args:
parent_uid: UID of the parent block/page
content: Block content
order: Position of the block
uid: Optional UID for the block
heading: Optional heading level (1-3)
Returns:
Block action dictionary
"""
block_data = {
"string": content
}
if uid:
block_data["uid"] = uid
else:
# Generate a unique UID if none provided
block_data["uid"] = str(uuid.uuid4())[:9]
if heading and heading > 0 and heading <= 3:
block_data["heading"] = heading
action = {
"action": "create-block",
"location": {
"parent-uid": parent_uid,
"order": order
},
"block": block_data
}
logger.debug(f"Created block action for parent {parent_uid}: {content[:30]}{'...' if len(content) > 30 else ''}")
return action
def process_nested_content(content: List[Dict], parent_uid: str, session, headers, graph_name: str) -> List[str]:
"""
Recursively process nested content structure and create blocks.
Args:
content: List of content items with potential children
parent_uid: UID of the parent block
session: Active session for API requests
headers: Request headers with authentication
graph_name: Roam graph name
Returns:
List of created block UIDs
"""
from roam_mcp.api import execute_batch_actions # Import here to avoid circular imports
if not content:
return []
# Sort content by level
content = sorted(content, key=lambda x: x.get("level", 0))
# Create batch actions
batch_actions = []
level_parent_map = {0: parent_uid}
# Process items level by level (top-down)
for item in content:
level = item.get("level", 0)
text = item.get("text", "")
heading_level = item.get("heading_level", 0)
# Find parent for this level
parent_level = level - 1
if parent_level < 0:
parent_level = 0
parent_for_item = level_parent_map.get(parent_level, parent_uid)
# Create block action
action = create_block_action(
parent_uid=parent_for_item,
content=text,
order="last",
heading=heading_level
)
batch_actions.append(action)
# Add temp ID for this level for child reference
level_parent_map[level] = f"temp_{len(batch_actions)-1}"
# Execute the batch
result = execute_batch_actions(batch_actions)
return result.get("created_uids", [])
def find_block_uid(session, headers, graph_name: str, block_content: str, max_retries: int = 3) -> Optional[str]:
"""
Search for a block by its content to find its UID with retries.
Args:
session: Active session for API requests
headers: Request headers with authentication
graph_name: Roam graph name
block_content: Content to search for
max_retries: Maximum number of retries
Returns:
Block UID or None if not found
"""
# Escape quotes in content
escaped_content = block_content.replace('"', '\\"')
for attempt in range(max_retries):
search_query = f'''[:find ?uid .
:where [?e :block/string "{escaped_content}"]
[?e :block/uid ?uid]]'''
response = session.post(
f'https://api.roamresearch.com/api/graph/{graph_name}/q',
headers=headers,
json={"query": search_query}
)
if response.status_code == 200 and response.json().get('result'):
block_uid = response.json()['result']
return block_uid
# If not found and not the last attempt, wait and retry
if attempt < max_retries - 1:
wait_time = 1 * (attempt + 1) # Exponential backoff
logger.debug(f"Block not found, retrying in {wait_time}s (attempt {attempt+1}/{max_retries})")
time.sleep(wait_time)
logger.debug(f"Could not find block UID for content: {block_content[:50]}...")
return None
def find_page_by_title(session, headers, graph_name: str, title: str) -> Optional[str]:
"""
Find a page by title, with case-insensitive matching.
Args:
session: Active session for API requests
headers: Request headers with authentication
graph_name: Roam graph name
title: Page title to search for
Returns:
Page UID or None if not found
"""
# Clean up the title
title = title.strip()
# First try direct page lookup (more reliable than case-insensitive queries in Roam)
query = f'''[:find ?uid .
:where [?e :node/title "{title}"]
[?e :block/uid ?uid]]'''
response = session.post(
f'https://api.roamresearch.com/api/graph/{graph_name}/q',
headers=headers,
json={"query": query}
)
if response.status_code == 200 and response.json().get('result'):
return response.json()['result']
# If not found, try checking if it's a UID
if len(title) == 9 and re.match(r'^[a-zA-Z0-9_-]{9}$', title):
# This looks like a UID, check if it's a valid page UID
uid_query = f'''[:find ?title .
:where [?e :block/uid "{title}"]
[?e :node/title ?title]]'''
uid_response = session.post(
f'https://api.roamresearch.com/api/graph/{graph_name}/q',
headers=headers,
json={"query": uid_query}
)
if uid_response.status_code == 200 and uid_response.json().get('result'):
return title
# If still not found, try case-insensitive match by getting all pages
all_pages_query = f'''[:find ?title ?uid
:where [?e :node/title ?title]
[?e :block/uid ?uid]]'''
all_pages_response = session.post(
f'https://api.roamresearch.com/api/graph/{graph_name}/q',
headers=headers,
json={"query": all_pages_query}
)
if all_pages_response.status_code == 200 and all_pages_response.json().get('result'):
for page_title, uid in all_pages_response.json()['result']:
if page_title.lower() == title.lower():
return uid
return None
def resolve_block_references(session, headers, graph_name: str, content: str, max_depth: int = 3, current_depth: int = 0) -> str:
"""
Resolve block references in content recursively.
Args:
session: Active session for API requests
headers: Request headers with authentication
graph_name: Roam graph name
content: Content with potential block references
max_depth: Maximum recursion depth
current_depth: Current recursion depth
Returns:
Content with block references resolved
"""
if current_depth >= max_depth:
return content
# Find all block references
ref_pattern = r'\(\(([a-zA-Z0-9_-]{9})\)\)'
refs = re.findall(ref_pattern, content)
if not refs:
return content
# For each reference, get its content
for ref in refs:
try:
query = f'''[:find ?string .
:where [?b :block/uid "{ref}"]
[?b :block/string ?string]]'''
response = session.post(
f'https://api.roamresearch.com/api/graph/{graph_name}/q',
headers=headers,
json={"query": query}
)
if response.status_code == 200 and response.json().get('result'):
ref_content = response.json()['result']
# Recursively resolve nested references
resolved_ref = resolve_block_references(
session, headers, graph_name,
ref_content, max_depth, current_depth + 1
)
# Replace reference with content
content = content.replace(f"(({ref}))", resolved_ref)
except Exception as e:
logger.warning(f"Failed to resolve reference (({ref})): {str(e)}")
return content
```
--------------------------------------------------------------------------------
/roam_mcp/api.py:
--------------------------------------------------------------------------------
```python
"""Core API functions for interacting with Roam Research."""
import os
import re
import sys
import logging
from typing import Dict, List, Any, Optional, Union, Set, Tuple, Callable
import requests
from datetime import datetime
import json
import time
from functools import wraps
from roam_mcp.utils import (
format_roam_date,
find_block_uid,
find_page_by_title,
process_nested_content,
resolve_block_references
)
# Set up logging
logger = logging.getLogger("roam-mcp.api")
# Get API credentials from environment variables
API_TOKEN = os.environ.get("ROAM_API_TOKEN")
GRAPH_NAME = os.environ.get("ROAM_GRAPH_NAME")
MEMORIES_TAG = os.environ.get("MEMORIES_TAG", "#[[Memories]]")
# Validate API credentials
if not API_TOKEN:
logger.warning("ROAM_API_TOKEN environment variable is not set")
if not GRAPH_NAME:
logger.warning("ROAM_GRAPH_NAME environment variable is not set")
# Enhanced Error Hierarchy
class RoamAPIError(Exception):
"""Base exception for all Roam API errors."""
def __init__(self, message: str, code: Optional[str] = None, details: Optional[Dict] = None, remediation: Optional[str] = None):
self.message = message
self.code = code or "UNKNOWN_ERROR"
self.details = details or {}
self.remediation = remediation
super().__init__(self._format_message())
def _format_message(self) -> str:
msg = f"{self.code}: {self.message}"
if self.details:
msg += f" - Details: {json.dumps(self.details)}"
if self.remediation:
msg += f" - Suggestion: {self.remediation}"
return msg
class AuthenticationError(RoamAPIError):
"""Exception raised for authentication errors."""
def __init__(self, message: str, details: Optional[Dict] = None):
super().__init__(
message=message,
code="AUTH_ERROR",
details=details,
remediation="Check your API token and graph name in environment variables."
)
class PageNotFoundError(RoamAPIError):
"""Exception raised when a page cannot be found."""
def __init__(self, title: str, details: Optional[Dict] = None):
super().__init__(
message=f"Page '{title}' not found",
code="PAGE_NOT_FOUND",
details=details,
remediation="Check the page title for typos or create the page first."
)
class BlockNotFoundError(RoamAPIError):
"""Exception raised when a block cannot be found."""
def __init__(self, uid: str, details: Optional[Dict] = None):
super().__init__(
message=f"Block with UID '{uid}' not found",
code="BLOCK_NOT_FOUND",
details=details,
remediation="Check the block UID for accuracy."
)
class ValidationError(RoamAPIError):
"""Exception raised for input validation errors."""
def __init__(self, message: str, param: str, details: Optional[Dict] = None):
super().__init__(
message=message,
code="VALIDATION_ERROR",
details={"parameter": param, **(details or {})},
remediation="Check the input parameters and correct the formatting."
)
class QueryError(RoamAPIError):
"""Exception raised for query execution errors."""
def __init__(self, message: str, query: str, details: Optional[Dict] = None):
super().__init__(
message=message,
code="QUERY_ERROR",
details={"query": query, **(details or {})},
remediation="Check the query syntax or parameters."
)
class RateLimitError(RoamAPIError):
"""Exception raised when rate limits are exceeded."""
def __init__(self, message: str, details: Optional[Dict] = None):
super().__init__(
message=message,
code="RATE_LIMIT_ERROR",
details=details,
remediation="Retry after a delay or reduce the request frequency."
)
class TransactionError(RoamAPIError):
"""Exception raised for transaction failures."""
def __init__(self, message: str, action_type: str, details: Optional[Dict] = None):
super().__init__(
message=message,
code="TRANSACTION_ERROR",
details={"action_type": action_type, **(details or {})},
remediation="Check the action data or retry the operation."
)
class PreserveAuthSession(requests.Session):
"""Session class that preserves authentication headers during redirects."""
def rebuild_auth(self, prepared_request, response):
"""Preserve the Authorization header on redirects."""
return
# Retry decorator for API calls
def retry_on_error(max_retries=3, base_delay=1, backoff_factor=2, retry_on=(RateLimitError, requests.exceptions.RequestException)):
"""
Decorator to retry API calls with exponential backoff.
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
backoff_factor: Multiplier for delay on each retry
retry_on: Tuple of exception types to retry on
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while True:
try:
return func(*args, **kwargs)
except retry_on as e:
retries += 1
if retries > max_retries:
logger.error(f"Maximum retries ({max_retries}) exceeded: {str(e)}")
raise
delay = base_delay * (backoff_factor ** (retries - 1))
logger.warning(f"Retrying after error: {str(e)}. Attempt {retries}/{max_retries} in {delay:.2f}s")
time.sleep(delay)
return wrapper
return decorator
def validate_credentials():
"""
Validate that required API credentials are set.
Raises:
AuthenticationError: If required credentials are missing
"""
if not API_TOKEN or not GRAPH_NAME:
missing = []
if not API_TOKEN:
missing.append("ROAM_API_TOKEN")
if not GRAPH_NAME:
missing.append("ROAM_GRAPH_NAME")
raise AuthenticationError(
f"Missing required credentials: {', '.join(missing)}",
{"missing": missing}
)
def get_session_and_headers() -> Tuple[requests.Session, Dict[str, str]]:
"""
Create a session with authentication headers.
Returns:
Tuple of (session, headers)
Raises:
AuthenticationError: If required environment variables are missing
"""
validate_credentials()
session = PreserveAuthSession()
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
}
return session, headers
@retry_on_error()
def execute_query(query: str, inputs: Optional[List[Any]] = None) -> Any:
"""
Execute a Datalog query against the Roam graph.
Args:
query: Datalog query string
inputs: Optional list of query inputs
Returns:
Query results
Raises:
QueryError: If the query fails
AuthenticationError: If authentication fails
RateLimitError: If rate limits are exceeded
"""
validate_credentials()
session, headers = get_session_and_headers()
# Prepare query data
data = {
"query": query,
}
if inputs:
data["inputs"] = inputs
# Log query (without inputs for security)
logger.debug(f"Executing query: {query}")
# Execute query
try:
response = session.post(
f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/q',
headers=headers,
json=data
)
if response.status_code == 401:
raise AuthenticationError("Authentication failed", {"status_code": response.status_code})
if response.status_code == 429:
raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code})
response.raise_for_status()
result = response.json().get('result')
# Log result size
if isinstance(result, list):
logger.debug(f"Query returned {len(result)} results")
return result
except requests.RequestException as e:
error_msg = f"Query failed: {str(e)}"
error_details = {}
if hasattr(e, 'response') and e.response:
error_details["status_code"] = e.response.status_code
try:
error_details["response"] = e.response.json()
except:
error_details["response_text"] = e.response.text[:500]
# Classify error based on status code if available
if hasattr(e, 'response') and e.response:
if e.response.status_code == 401:
raise AuthenticationError("Authentication failed", error_details) from e
elif e.response.status_code == 429:
raise RateLimitError("Rate limit exceeded", error_details) from e
logger.error(error_msg, extra={"details": error_details})
raise QueryError(error_msg, query, error_details) from e
@retry_on_error()
def execute_write_action(action_data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]:
"""
Execute a write action or a batch of actions on the Roam graph.
Args:
action_data: The action data to write or a list of actions for batch operation
Returns:
Response data
Raises:
TransactionError: If the write action fails
AuthenticationError: If authentication fails
RateLimitError: If rate limits are exceeded
"""
validate_credentials()
session, headers = get_session_and_headers()
# Check if it's a batch operation or single action
is_batch = isinstance(action_data, list)
# If it's a batch operation, wrap it in a batch container
if is_batch:
# Log batch size
logger.debug(f"Executing batch write action with {len(action_data)} operations")
# Group operations by type for debugging
action_types = {}
for action in action_data:
action_type = action.get("action", "unknown")
if action_type in action_types:
action_types[action_type] += 1
else:
action_types[action_type] = 1
logger.debug(f"Batch operation types: {action_types}")
# Prepare batch action
batch_data = {
"action": "batch-actions",
"actions": action_data
}
action_type = "batch-actions"
operation_data = batch_data
else:
# Log action type
action_type = action_data.get("action", "unknown")
logger.debug(f"Executing write action: {action_type}")
operation_data = action_data
# Debug log the operation data
logger.debug(f"Sending data: {json.dumps(operation_data)[:100]}...")
# Execute action
try:
response = session.post(
f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/write',
headers=headers,
json=operation_data # Use json parameter for proper JSON encoding
)
logger.debug(f"Status code: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
if response.status_code == 401:
raise AuthenticationError("Authentication failed", {"status_code": response.status_code})
if response.status_code == 429:
raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code})
# Special handling for empty responses
if response.status_code == 200 and not response.text:
logger.debug("Received empty response with status 200 (success)")
return {"success": True}
response.raise_for_status()
# Try to parse JSON response
try:
result = response.json()
logger.debug(f"Response: {json.dumps(result)[:500]}")
# Success even with error message for batch operations that partly succeed
if "batch-error-message" in result and "num-actions-successfully-transacted-before-failure" in result:
num_success = result.get("num-actions-successfully-transacted-before-failure", 0)
logger.debug(f"Batch partially succeeded with {num_success} actions before failure")
return result
return result
except json.JSONDecodeError:
# Some successful operations return empty responses
if 200 <= response.status_code < 300:
logger.debug("Success with non-JSON response")
return {"success": True}
else:
logger.debug(f"Failed to parse response as JSON: {response.text[:500]}")
raise TransactionError(
f"Failed to parse response as JSON",
action_type,
{"response_text": response.text[:500]}
)
except requests.RequestException as e:
error_details = {}
if hasattr(e, 'response') and e.response:
error_details["status_code"] = e.response.status_code
try:
error_details["response"] = e.response.json()
except:
error_details["response_text"] = e.response.text[:500]
# Classify error based on status code if available
if hasattr(e, 'response') and e.response:
if e.response.status_code == 401:
raise AuthenticationError("Authentication failed", error_details) from e
elif e.response.status_code == 429:
raise RateLimitError("Rate limit exceeded", error_details) from e
error_msg = f"Write action failed: {str(e)}"
logger.error(error_msg, extra={"details": error_details})
raise TransactionError(error_msg, action_type, error_details) from e
def execute_batch_actions(actions: List[Dict[str, Any]], chunk_size: int = 50) -> Dict[str, Any]:
"""
Execute a batch of actions, optionally chunking into multiple requests.
Args:
actions: List of actions to execute
chunk_size: Maximum number of actions per request
Returns:
Combined results of all batch operations
Raises:
TransactionError: If any batch fails
"""
if not actions:
return {"success": True, "created_uids": []}
# Single batch if under chunk size
if len(actions) <= chunk_size:
result = execute_write_action(actions)
# Check for tempids-to-uids mapping in response
if "tempids-to-uids" in result:
return {"success": True, "created_uids": list(result["tempids-to-uids"].values())}
elif "successful" in result and result["successful"]:
return {"success": True, "created_uids": []}
else:
return result
# Split into chunks for larger batches
chunks = [actions[i:i + chunk_size] for i in range(0, len(actions), chunk_size)]
logger.debug(f"Splitting batch operation into {len(chunks)} chunks of max {chunk_size} actions")
# Track results across chunks
combined_results = {
"created_uids": [],
"success": True
}
# Track temporary and real UIDs for parent-child relationships
temp_uid_map = {}
# Execute each chunk
for i, chunk in enumerate(chunks):
logger.debug(f"Executing batch chunk {i+1}/{len(chunks)} with {len(chunk)} actions")
# Update parent UIDs with real UIDs from previous chunks
if i > 0 and temp_uid_map:
for action in chunk:
if action["action"] == "create-block":
parent_uid = action["location"]["parent-uid"]
if parent_uid.startswith("temp_") and parent_uid in temp_uid_map:
action["location"]["parent-uid"] = temp_uid_map[parent_uid]
result = execute_write_action(chunk)
# Collect UIDs from this chunk
created_uids = []
if "tempids-to-uids" in result:
created_uids = list(result["tempids-to-uids"].values())
if created_uids:
# Map temp UIDs to real UIDs for next chunks
if i < len(chunks) - 1:
for j, uid in enumerate(created_uids):
temp_key = f"temp_{i}_{j}"
temp_uid_map[temp_key] = uid
combined_results["created_uids"].extend(created_uids)
# Add delay between batches to ensure ordering
if i < len(chunks) - 1:
time.sleep(0.5)
return combined_results
def find_or_create_page(title: str) -> str:
"""
Find a page by title or create it if it doesn't exist.
Args:
title: Page title
Returns:
Page UID
Raises:
TransactionError: If page creation fails
ValidationError: If title is invalid
AuthenticationError: If authentication fails
"""
validate_credentials()
session, headers = get_session_and_headers()
# Validate title
if not title or not isinstance(title, str):
raise ValidationError("Page title must be a non-empty string", "title")
title = title.strip()
if not title:
raise ValidationError("Page title cannot be empty or just whitespace", "title")
# Try to find the page first
logger.debug(f"Looking for page: {title}")
query = f'''[:find ?uid .
:where [?e :node/title "{title}"]
[?e :block/uid ?uid]]'''
page_uid = execute_query(query)
if page_uid:
logger.debug(f"Found existing page: {title} (UID: {page_uid})")
return page_uid
# Create the page if it doesn't exist
logger.debug(f"Creating new page: {title}")
action_data = {
"action": "create-page",
"page": {"title": title}
}
try:
response = execute_write_action(action_data)
if response.get("success", False):
# Wait a moment for the page to be created
time.sleep(1)
# Try to find the page again
page_uid = execute_query(query)
if page_uid:
logger.debug(f"Created page: {title} (UID: {page_uid})")
return page_uid
# If still not found, try one more time with a longer delay
time.sleep(2)
page_uid = execute_query(query)
if page_uid:
logger.debug(f"Found newly created page: {title} (UID: {page_uid})")
return page_uid
# If we get here, something went wrong
error_msg = f"Failed to create page: {title}"
logger.error(error_msg)
raise TransactionError(error_msg, "create-page", {"title": title, "response": response})
except TransactionError:
# Rethrow existing TransactionError
raise
except Exception as e:
error_msg = f"Failed to create page: {title}"
logger.error(error_msg)
raise TransactionError(error_msg, "create-page", {"title": title, "error": str(e)}) from e
def get_daily_page() -> str:
"""
Get or create today's daily page.
Returns:
Daily page UID
Raises:
TransactionError: If page creation fails
"""
today = datetime.now()
date_str = format_roam_date(today)
logger.debug(f"Getting daily page for: {date_str}")
return find_or_create_page(date_str)
def add_block_to_page(page_uid: str, content: str, order: Union[int, str] = "last") -> Optional[str]:
"""
Add a block to a page.
Args:
page_uid: Parent page UID
content: Block content
order: Position ("first", "last", or integer index)
Returns:
New block UID or None if creation failed
Raises:
BlockNotFoundError: If page does not exist
ValidationError: If parameters are invalid
TransactionError: If block creation fails
"""
# Validate parameters
if not page_uid:
raise ValidationError("Parent page UID is required", "page_uid")
if not content:
raise ValidationError("Block content cannot be empty", "content")
# Generate a unique block UID
import uuid
block_uid = str(uuid.uuid4())[:9]
action_data = {
"action": "create-block",
"location": {
"parent-uid": page_uid,
"order": order
},
"block": {
"string": content,
"uid": block_uid
}
}
logger.debug(f"Adding block to page {page_uid}")
try:
result = execute_write_action(action_data)
if result.get("success", False):
# Add a brief delay to ensure the block is created
time.sleep(1)
# Verify the block exists
session, headers = get_session_and_headers()
found_uid = find_block_uid(session, headers, GRAPH_NAME, content)
if found_uid:
logger.debug(f"Created block with UID: {found_uid}")
return found_uid
# If we couldn't find the UID by content, return the one we generated
logger.debug(f"Block created but couldn't verify, returning generated UID: {block_uid}")
return block_uid
else:
logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}")
return None
except Exception as e:
if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)):
raise
error_msg = f"Failed to create block: {str(e)}"
logger.error(error_msg)
raise TransactionError(error_msg, "create-block", {"page_uid": page_uid}) from e
def update_block(block_uid: str, content: str) -> bool:
"""
Update a block's content.
Args:
block_uid: Block UID
content: New content
Returns:
Success flag
Raises:
BlockNotFoundError: If block does not exist
ValidationError: If parameters are invalid
TransactionError: If block update fails
"""
# Validate parameters
if not block_uid:
raise ValidationError("Block UID is required", "block_uid")
if content is None:
raise ValidationError("Block content cannot be None", "content")
action_data = {
"action": "update-block",
"block": {
"uid": block_uid,
"string": content
}
}
logger.debug(f"Updating block: {block_uid}")
try:
execute_write_action(action_data)
return True
except Exception as e:
if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)):
raise
error_msg = f"Failed to update block: {str(e)}"
logger.error(error_msg)
raise TransactionError(error_msg, "update-block", {"block_uid": block_uid}) from e
def transform_block(block_uid: str, find_pattern: str, replace_with: str, global_replace: bool = True) -> str:
"""
Transform a block's content using regex pattern replacement.
Args:
block_uid: Block UID
find_pattern: Regex pattern to find
replace_with: Text to replace with
global_replace: Whether to replace all occurrences
Returns:
Updated content
Raises:
BlockNotFoundError: If block does not exist
ValidationError: If parameters are invalid
QueryError: If block retrieval fails
TransactionError: If block update fails
"""
# Validate parameters
if not block_uid:
raise ValidationError("Block UID is required", "block_uid")
if not find_pattern:
raise ValidationError("Find pattern cannot be empty", "find_pattern")
# First get the current content
query = f'''[:find ?string .
:where [?b :block/uid "{block_uid}"]
[?b :block/string ?string]]'''
logger.debug(f"Getting content for block: {block_uid}")
try:
current_content = execute_query(query)
if not current_content:
raise BlockNotFoundError(block_uid)
# Apply transformation
logger.debug(f"Transforming block {block_uid} with pattern: {find_pattern}")
flags = re.MULTILINE
count = 0 if global_replace else 1
try:
new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags)
except re.error as e:
raise ValidationError(f"Invalid regex pattern: {str(e)}", "find_pattern", {"pattern": find_pattern})
# Update the block
update_block(block_uid, new_content)
return new_content
except (BlockNotFoundError, ValidationError, QueryError, TransactionError):
# Rethrow existing errors
raise
except Exception as e:
error_msg = f"Failed to transform block: {str(e)}"
logger.error(error_msg)
raise TransactionError(error_msg, "transform-block", {"block_uid": block_uid}) from e
def batch_update_blocks(updates: List[Dict[str, Any]], chunk_size: int = 50) -> List[Dict[str, Any]]:
"""
Update multiple blocks in a single operation.
Args:
updates: List of update operations
chunk_size: Maximum number of actions per batch
Returns:
List of results
Raises:
ValidationError: If updates are not valid
"""
if not isinstance(updates, list):
raise ValidationError("Updates must be a list", "updates")
if not updates:
return []
session, headers = get_session_and_headers()
results = []
batch_actions = []
logger.debug(f"Batch updating {len(updates)} blocks")
# Validate each update and prepare batch actions
for i, update in enumerate(updates):
try:
block_uid = update.get("block_uid")
if not block_uid:
results.append({"success": False, "error": "Missing block_uid"})
continue
# Check block exists
query = f'''[:find ?string .
:where [?b :block/uid "{block_uid}"]
[?b :block/string ?string]]'''
current_content = execute_query(query)
if not current_content:
results.append({
"success": False,
"block_uid": block_uid,
"error": f"Block with UID {block_uid} not found"
})
continue
# Handle direct content update
if "content" in update:
batch_actions.append({
"action": "update-block",
"block": {
"uid": block_uid,
"string": update["content"]
}
})
results.append({
"success": True,
"block_uid": block_uid,
"content": update["content"]
})
# Handle pattern transformation
elif "transform" in update:
transform = update["transform"]
try:
find_pattern = transform["find"]
replace_with = transform["replace"]
global_replace = transform.get("global", True)
# Apply transformation
flags = re.MULTILINE
count = 0 if global_replace else 1
new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags)
batch_actions.append({
"action": "update-block",
"block": {
"uid": block_uid,
"string": new_content
}
})
results.append({
"success": True,
"block_uid": block_uid,
"content": new_content
})
except re.error as e:
results.append({
"success": False,
"block_uid": block_uid,
"error": f"Invalid regex pattern: {str(e)}"
})
except KeyError as e:
results.append({
"success": False,
"block_uid": block_uid,
"error": f"Missing required transform key: {str(e)}"
})
else:
results.append({
"success": False,
"block_uid": block_uid,
"error": "Neither content nor transform provided"
})
except Exception as e:
logger.error(f"Error preparing update for block {update.get('block_uid', 'unknown')}: {str(e)}")
results.append({
"success": False,
"block_uid": update.get("block_uid", "unknown"),
"error": str(e)
})
# Execute batch updates if we have any valid actions
if batch_actions:
try:
execute_batch_actions(batch_actions, chunk_size)
except Exception as e:
logger.error(f"Error executing batch update: {str(e)}")
# Mark all previously successful results as failed
for result in results:
if result.get("success"):
result["success"] = False
result["error"] = f"Batch update failed: {str(e)}"
# Log success rate
successful = sum(1 for r in results if r.get("success"))
logger.debug(f"Batch update completed: {successful}/{len(updates)} successful")
return results
def get_page_content(title: str, resolve_refs: bool = True, max_depth: int = 5) -> str:
"""
Get the content of a page with optional block reference resolution.
Args:
title: Page title
resolve_refs: Whether to resolve block references
max_depth: Maximum depth of nested blocks to retrieve (default: 5)
Returns:
Page content as markdown
Raises:
PageNotFoundError: If page retrieval fails
QueryError: If query execution fails
"""
session, headers = get_session_and_headers()
# First find the page UID
logger.debug(f"Getting content for page: {title}")
page_uid = find_page_by_title(session, headers, GRAPH_NAME, title)
if not page_uid:
raise PageNotFoundError(title)
# Build block hierarchy iteratively
block_map = {}
top_level_blocks = []
# Query to get immediate children of a parent (page or block)
def get_children(parent_uid: str, depth: int = 0) -> None:
if depth >= max_depth:
return
query = f"""[:find ?uid ?string ?order
:where
[?parent :block/uid "{parent_uid}"]
[?parent :block/children ?child]
[?child :block/uid ?uid]
[?child :block/string ?string]
[?child :block/order ?order]]"""
try:
results = execute_query(query)
if not results:
return
for uid, content, order in results:
# Resolve references if requested
if resolve_refs:
content = resolve_block_references(session, headers, GRAPH_NAME, content)
# Create block object
block = {
"uid": uid,
"content": content,
"order": order,
"children": []
}
block_map[uid] = block
# Add to top-level or parent's children
if parent_uid == page_uid:
top_level_blocks.append(block)
elif parent_uid in block_map:
block_map[parent_uid]["children"].append(block)
# Recursively fetch children
get_children(uid, depth + 1)
except QueryError as e:
logger.warning(f"Failed to fetch children for {parent_uid}: {str(e)}")
raise
try:
# Start with the page's top-level blocks
get_children(page_uid)
if not top_level_blocks:
logger.debug(f"No content found on page: {title}")
return f"# {title}\n\nNo content found on this page."
# Sort blocks by order
def sort_blocks(blocks):
blocks.sort(key=lambda b: b["order"])
for block in blocks:
sort_blocks(block["children"])
sort_blocks(top_level_blocks)
# Convert to markdown
markdown = f"# {title}\n\n"
def blocks_to_md(blocks, level=0):
result = ""
for block in blocks:
indent = " " * level
result += f"{indent}- {block['content']}\n"
if block["children"]:
result += blocks_to_md(block["children"], level + 1)
return result
markdown += blocks_to_md(top_level_blocks)
logger.debug(f"Retrieved page content for: {title}")
return markdown
except QueryError:
# Rethrow existing QueryError
raise
except Exception as e:
error_msg = f"Failed to get page content: {str(e)}"
logger.error(error_msg)
raise QueryError(error_msg, "Iterative child fetch", {"page_title": title, "page_uid": page_uid}) from e
```
--------------------------------------------------------------------------------
/roam_mcp/server.py:
--------------------------------------------------------------------------------
```python
"""Core server module for Roam MCP server."""
import os
import sys
import logging
import traceback
from typing import Dict, List, Any, Optional, Union
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled
from mcp.server.fastmcp import FastMCP
from datetime import datetime
# Import operations
from roam_mcp.api import (
API_TOKEN,
GRAPH_NAME,
MEMORIES_TAG,
get_page_content,
ValidationError,
QueryError,
PageNotFoundError,
BlockNotFoundError,
TransactionError,
AuthenticationError,
RateLimitError
)
from roam_mcp.search import (
search_by_text,
search_by_tag,
search_by_status,
search_block_refs,
search_hierarchy,
search_by_date,
find_pages_modified_today,
execute_datomic_query
)
from roam_mcp.content import (
create_page,
create_block,
create_outline,
import_markdown,
add_todos,
update_content,
update_multiple_contents
)
from roam_mcp.memory import (
remember,
recall
)
from roam_mcp.utils import (
extract_youtube_video_id,
detect_url_type
)
from roam_mcp.content_parsers import parse_webpage, parse_pdf
# Initialize FastMCP server
mcp = FastMCP("roam-research")
# Configure logging
logger = logging.getLogger("roam-mcp")
def setup_logging(verbose=False):
"""Configure logging with appropriate level of detail."""
log_level = logging.DEBUG if verbose else logging.INFO
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Clear any existing handlers
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Add console handler
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
def validate_environment():
"""Validate that required environment variables are set."""
if not API_TOKEN or not GRAPH_NAME:
missing = []
if not API_TOKEN:
missing.append("ROAM_API_TOKEN")
if not GRAPH_NAME:
missing.append("ROAM_GRAPH_NAME")
error_msg = f"""
Missing required environment variables: {', '.join(missing)}
Please configure these variables either:
1. In your MCP settings file:
- For Claude: ~/Library/Application Support/Claude/claude_desktop_config.json
- For Cline: ~/Library/Application Support/Code/User/globalStorage/saoudrizwan.claude-dev/settings/cline_mcp_settings.json
Example configuration:
{{
"mcpServers": {{
"roam-helper": {{
"command": "uvx",
"args": ["git+https://github.com/PhiloSolares/roam-mcp.git"],
"env": {{
"ROAM_API_TOKEN": "your-api-token",
"ROAM_GRAPH_NAME": "your-graph-name"
}}
}}
}}
}}
2. Or in a .env file in the roam-mcp directory:
ROAM_API_TOKEN=your-api-token
ROAM_GRAPH_NAME=your-graph-name
"""
logger.error(error_msg)
return False
return True
def format_error_response(error: Exception) -> str:
"""Format an error for user-friendly display."""
if isinstance(error, ValidationError):
return f"Validation error: {str(error)}"
elif isinstance(error, PageNotFoundError):
return f"Page not found: {str(error)}"
elif isinstance(error, BlockNotFoundError):
return f"Block not found: {str(error)}"
elif isinstance(error, QueryError):
return f"Query error: {str(error)}"
elif isinstance(error, TransactionError):
return f"Transaction error: {str(error)}"
elif isinstance(error, AuthenticationError):
return f"Authentication error: {str(error)}"
elif isinstance(error, RateLimitError):
return f"Rate limit exceeded: {str(error)}"
else:
return f"Error: {str(error)}"
@mcp.tool()
async def search_roam(search_terms: List[str]) -> str:
"""Search Roam database for content containing the specified terms.
Args:
search_terms: List of keywords to search for
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not search_terms:
return "Please provide at least one search term"
all_results = []
for term in search_terms:
result = search_by_text(term)
if result["success"]:
all_results.extend(result["matches"])
# Limit to 3000 words
word_count = 0
max_word_count = 3000
filtered_results = []
for match in all_results:
content = match["content"]
block_word_count = len(content.split())
if word_count + block_word_count <= max_word_count:
filtered_results.append(f"Page: {match.get('page_title', 'Unknown')}\n{content}")
word_count += block_word_count
else:
break
if not filtered_results:
return f"No results found for terms: {', '.join(search_terms)}"
return "\n\n".join(filtered_results)
except Exception as e:
logger.error(f"Error searching Roam: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_fetch_page_by_title(title: str) -> str:
"""Retrieve complete page contents by exact title, including all nested blocks and resolved block references.
Args:
title: Title of the page
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not title:
return "Error: title is required"
content = get_page_content(title)
return content
except Exception as e:
logger.error(f"Error fetching page: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_create_page(title: str, content: Optional[List[Dict[str, Any]]] = None) -> str:
"""Create a new page in Roam Research with optional content using explicit nesting levels.
Args:
title: Title of the new page
content: Initial content for the page as an array of blocks with explicit nesting levels.
Each block must have a 'text' field with the content as a string.
Example:
[
{"text": "Heading", "level": 0},
{"text": "Bullet point", "level": 1},
{"text": "Another point", "level": 1, "children": [
{"text": "Nested point", "level": 2}
]}
]
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not title:
return "Error: title is required"
result = create_page(title, content)
if result["success"]:
return f"Page created successfully: {result['page_url']}"
else:
return f"Error creating page: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error creating page: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_create_block(content: str, page_uid: Optional[str] = None, title: Optional[str] = None) -> str:
"""Add a new block to an existing Roam page. If no page specified, adds to today's daily note.
Args:
content: Content of the block
page_uid: Optional: UID of the page to add block to
title: Optional: Title of the page to add block to
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not content:
return "Error: content is required"
result = create_block(content, page_uid, title)
if result["success"]:
block_uid = result.get("block_uid", "unknown")
parent_uid = result.get("parent_uid", "unknown")
return f"Block created successfully with UID: {block_uid} under parent: {parent_uid}"
else:
return f"Error creating block: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error creating block: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_create_outline(outline: List[Dict[str, Any]], page_title_uid: Optional[str] = None, block_text_uid: Optional[str] = None) -> str:
"""Add a structured outline to an existing page or block with customizable nesting levels.
Args:
outline: Array of outline items with block text and explicit nesting level
page_title_uid: Title or UID of the page. Leave blank to use the default daily page
block_text_uid: A title heading for the outline or the UID of the block under which content will be nested
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not outline:
return "Error: outline is required and cannot be empty"
result = create_outline(outline, page_title_uid, block_text_uid)
if result["success"]:
created_count = len(result.get("created_uids", []))
page_uid = result.get("page_uid", "unknown")
parent_uid = result.get("parent_uid", "unknown")
return f"Outline created successfully with {created_count} blocks on page {page_uid} under parent {parent_uid}"
else:
return f"Error creating outline: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error creating outline: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_import_markdown(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None,
parent_uid: Optional[str] = None, parent_string: Optional[str] = None,
order: str = "last") -> str:
"""Import nested markdown content into Roam under a specific block.
Args:
content: Nested markdown content to import
page_uid: Optional: UID of the page containing the parent block
page_title: Optional: Title of the page containing the parent block
parent_uid: Optional: UID of the parent block to add content under
parent_string: Optional: Exact string content of the parent block to add content under
order: Optional: Where to add the content under the parent ("first" or "last")
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not content:
return "Error: content is required and cannot be empty"
result = import_markdown(content, page_uid, page_title, parent_uid, parent_string, order)
if result["success"]:
created_count = len(result.get("created_uids", []))
page_uid = result.get("page_uid", "unknown")
parent_uid = result.get("parent_uid", "unknown")
return f"Markdown imported successfully with {created_count} blocks on page {page_uid} under parent {parent_uid}"
else:
return f"Error importing markdown: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error importing markdown: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_add_todo(todos: List[str]) -> str:
"""Add a list of todo items as individual blocks to today's daily page in Roam.
Args:
todos: List of todo items to add
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not todos:
return "Error: todos list cannot be empty"
result = add_todos(todos)
if result["success"]:
return f"Added {len(todos)} todo items to today's daily page"
else:
return f"Error adding todos: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error adding todos: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_search_for_tag(primary_tag: str, page_title_uid: Optional[str] = None, near_tag: Optional[str] = None) -> str:
"""Search for blocks containing a specific tag and optionally filter by blocks that also contain another tag nearby.
Args:
primary_tag: The main tag to search for (without the [[ ]] brackets)
page_title_uid: Optional: Title or UID of the page to search in
near_tag: Optional: Another tag to filter results by - will only return blocks where both tags appear
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not primary_tag:
return "Error: primary_tag is required"
result = search_by_tag(primary_tag, page_title_uid, near_tag)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
formatted += f"- {match['content']}{page_info}\n"
return formatted
else:
return f"Error searching for tag: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error searching for tag: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_search_by_status(status: str, page_title_uid: Optional[str] = None,
include: Optional[str] = None, exclude: Optional[str] = None) -> str:
"""Search for blocks with a specific status (TODO/DONE) across all pages or within a specific page.
Args:
status: Status to search for (TODO or DONE)
page_title_uid: Optional: Title or UID of the page to search in
include: Optional: Comma-separated list of terms to filter results by inclusion
exclude: Optional: Comma-separated list of terms to filter results by exclusion
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not status or status not in ["TODO", "DONE"]:
return "Error: status must be either 'TODO' or 'DONE'"
result = search_by_status(status, page_title_uid, include, exclude)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
formatted += f"- {match['content']}{page_info}\n"
return formatted
else:
return f"Error searching by status: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error searching by status: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_search_block_refs(block_uid: Optional[str] = None, page_title_uid: Optional[str] = None) -> str:
"""Search for block references within a page or across the entire graph.
Args:
block_uid: Optional: UID of the block to find references to
page_title_uid: Optional: Title or UID of the page to search in
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
result = search_block_refs(block_uid, page_title_uid)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
formatted += f"- {match['content']}{page_info}\n"
return formatted
else:
return f"Error searching block references: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error searching block references: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_search_hierarchy(parent_uid: Optional[str] = None, child_uid: Optional[str] = None,
page_title_uid: Optional[str] = None, max_depth: int = 1) -> str:
"""Search for parent or child blocks in the block hierarchy.
Args:
parent_uid: Optional: UID of the block to find children of
child_uid: Optional: UID of the block to find parents of
page_title_uid: Optional: Title or UID of the page to search in
max_depth: Optional: How many levels deep to search (default: 1)
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not parent_uid and not child_uid:
return "Error: Either parent_uid or child_uid must be provided"
result = search_hierarchy(parent_uid, child_uid, page_title_uid, max_depth)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
depth_info = f" (depth: {match['depth']})"
formatted += f"- {match['content']}{page_info}{depth_info}\n"
return formatted
else:
return f"Error searching hierarchy: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error searching hierarchy: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_find_pages_modified_today(max_num_pages: int = 50) -> str:
"""Find pages that have been modified today (since midnight).
Args:
max_num_pages: Max number of pages to retrieve (default: 50)
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if max_num_pages < 1:
return "Error: max_num_pages must be at least 1"
result = find_pages_modified_today(max_num_pages)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for page in result["pages"]:
formatted += f"- {page}\n"
return formatted
else:
return f"Error finding modified pages: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error finding modified pages: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_search_by_text(text: str, page_title_uid: Optional[str] = None) -> str:
"""Search for blocks containing specific text across all pages or within a specific page.
Args:
text: The text to search for
page_title_uid: Optional: Title or UID of the page to search in
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not text:
return "Error: text is required"
result = search_by_text(text, page_title_uid)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
formatted += f"- {match['content']}{page_info}\n"
return formatted
else:
return f"Error searching by text: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error searching by text: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_update_block(block_uid: str, content: Optional[str] = None,
transform_pattern: Optional[Dict[str, Any]] = None) -> str:
"""Update a single block identified by its UID.
Args:
block_uid: UID of the block to update
content: New content for the block
transform_pattern: Pattern to transform the current content
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not block_uid:
return "Error: block_uid is required"
if not content and not transform_pattern:
return "Error: Either content or transform_pattern must be provided"
result = update_content(block_uid, content, transform_pattern)
if result["success"]:
return f"Block updated successfully: {result['content']}"
else:
return f"Error updating block: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error updating block: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_update_multiple_blocks(updates: List[Dict[str, Any]]) -> str:
"""Efficiently update multiple blocks in a single batch operation.
Args:
updates: Array of block updates to perform
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not updates or not isinstance(updates, list):
return "Error: updates must be a non-empty list"
result = update_multiple_contents(updates)
if result["success"]:
successful = sum(1 for r in result["results"] if r.get("success"))
return f"Updated {successful}/{len(updates)} blocks successfully"
else:
return f"Error updating blocks: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error updating blocks: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_search_by_date(start_date: str, end_date: Optional[str] = None,
type_filter: str = "created", scope: str = "blocks",
include_content: bool = True) -> str:
"""Search for blocks or pages based on creation or modification dates.
Args:
start_date: Start date in ISO format (YYYY-MM-DD)
end_date: Optional: End date in ISO format (YYYY-MM-DD)
type_filter: Whether to search by "created", "modified", or "both"
scope: Whether to search "blocks", "pages", or "both"
include_content: Whether to include the content of matching blocks/pages
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not start_date:
return "Error: start_date is required"
if type_filter not in ["created", "modified", "both"]:
return "Error: type_filter must be 'created', 'modified', or 'both'"
if scope not in ["blocks", "pages", "both"]:
return "Error: scope must be 'blocks', 'pages', or 'both'"
result = search_by_date(start_date, end_date, type_filter, scope, include_content)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
date_info = datetime.fromtimestamp(match["time"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
if match["type"] == "block":
page_info = f" (in page: {match.get('page_title', 'Unknown')})"
content_info = f": {match.get('content', '')}" if include_content else ""
formatted += f"- Block {match['uid']} {date_info}{page_info}{content_info}\n"
else: # page
title_info = f" (title: {match.get('title', 'Unknown')})"
content_info = f": {match.get('content', '')}" if include_content else ""
formatted += f"- Page {match['uid']} {date_info}{title_info}{content_info}\n"
return formatted
else:
return f"Error searching by date: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error searching by date: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_remember(memory: str, categories: Optional[List[str]] = None) -> str:
"""Add a memory or piece of information to remember, stored on the daily page with tag.
Args:
memory: The memory detail or information to remember
categories: Optional categories to tag the memory with
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not memory:
return "Error: memory is required"
result = remember(memory, categories)
if result["success"]:
return f"Memory stored successfully: {result['content']}"
else:
return f"Error storing memory: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error storing memory: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_recall(sort_by: str = "newest", filter_tag: Optional[str] = None) -> str:
"""Retrieve stored memories, optionally filtered by tag and sorted by creation date.
Args:
sort_by: Sort order for memories based on creation date
filter_tag: Include only memories with a specific filter tag
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if sort_by not in ["newest", "oldest"]:
return "Error: sort_by must be 'newest' or 'oldest'"
result = recall(sort_by, filter_tag)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for memory in result["memories"]:
formatted += f"- {memory}\n"
return formatted
else:
return f"Error recalling memories: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error recalling memories: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def roam_datomic_query(query: str, inputs: Optional[List[Any]] = None) -> str:
"""Execute a custom Datomic query on the Roam graph beyond the available search tools.
Args:
query: The Datomic query to execute (in Datalog syntax)
inputs: Optional array of input parameters for the query
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
if not query:
return "Error: query is required"
result = execute_datomic_query(query, inputs)
if result["success"]:
# Format the results
formatted = f"{result['message']}\n\n"
for match in result["matches"]:
formatted += f"- {match['content']}\n"
return formatted
else:
return f"Error executing query: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error executing query: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.tool()
async def get_youtube_transcript(url: str) -> str:
"""Fetch and return the transcript of a YouTube video.
Args:
url: URL of the YouTube video
"""
video_id = extract_youtube_video_id(url)
if not video_id:
return "Invalid YouTube URL. Unable to extract video ID."
try:
# Define the prioritized list of language codes
languages = [
'en', 'en-US', 'en-GB', 'de', 'es', 'hi', 'zh', 'ar', 'bn', 'pt',
'ru', 'ja', 'pa'
]
# Attempt to retrieve the available transcripts
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# Try to find a transcript in the prioritized languages
for language in languages:
try:
transcript = transcript_list.find_transcript([language])
# Check if the transcript is manually created or generated, prefer manually created
if transcript.is_generated:
continue
text = " ".join([line["text"] for line in transcript.fetch()])
return text
except Exception:
continue
# If no suitable transcript is found in the specified languages, try to fetch a generated transcript
try:
generated_transcript = transcript_list.find_generated_transcript(
languages)
text = " ".join(
[line["text"] for line in generated_transcript.fetch()])
return text
except Exception:
return "No suitable transcript found for this video."
except TranscriptsDisabled:
return "Transcripts are disabled for this video."
except Exception as e:
logger.error(f"Error fetching YouTube transcript: {str(e)}", exc_info=True)
return f"An error occurred while fetching the transcript: {str(e)}"
@mcp.tool()
async def fetch_webpage_content(url: str) -> str:
"""Fetch and extract the main content from a web page.
Args:
url: URL of the web page to fetch
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
logger.debug(f"Fetching webpage content: {url}")
result = await parse_webpage(url)
if result["success"]:
return f"# {result['title']}\n\nSource: {url}\n\n{result['content']}"
else:
return f"Error fetching webpage: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error in fetch_webpage_content: {str(e)}", exc_info=True)
return f"Error fetching webpage: {str(e)}"
@mcp.tool()
async def fetch_pdf_content(url: str) -> str:
"""Fetch and extract the content from a PDF file.
Args:
url: URL of the PDF file to fetch
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
logger.debug(f"Fetching PDF content: {url}")
result = await parse_pdf(url)
if result["success"]:
return f"# {result['title']}\n\nSource: {url}\n\n{result['content']}"
else:
return f"Error fetching PDF: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"Error in fetch_pdf_content: {str(e)}", exc_info=True)
return f"Error fetching PDF: {str(e)}"
@mcp.tool()
async def parse_url(url: str) -> str:
"""Intelligently parse content from a URL - supports webpages, PDFs, and YouTube videos.
Args:
url: URL to parse
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
# Detect URL type
url_type = detect_url_type(url)
if url_type == "youtube":
# Use existing YouTube transcript function
return await get_youtube_transcript(url)
elif url_type == "pdf":
return await fetch_pdf_content(url)
else: # webpage or unknown
return await fetch_webpage_content(url)
except Exception as e:
logger.error(f"Error parsing URL: {str(e)}", exc_info=True)
return f"Error parsing URL: {str(e)}"
@mcp.tool()
async def get_roam_graph_info() -> str:
"""Get information about your Roam Research graph.
"""
if not validate_environment():
return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
try:
# Get page count
query = """[:find (count ?p)
:where [?p :node/title]]"""
result = execute_datomic_query(query)
if result["success"] and result["matches"]:
page_count = result["matches"][0]["content"]
else:
page_count = "Unknown"
# Get block count
query = """[:find (count ?b)
:where [?b :block/string]]"""
result = execute_datomic_query(query)
if result["success"] and result["matches"]:
block_count = result["matches"][0]["content"]
else:
block_count = "Unknown"
# Format the output
memory_tag = MEMORIES_TAG if MEMORIES_TAG else "Not set (using default #[[Memories]])"
formatted_info = f"""
Graph Name: {GRAPH_NAME}
Pages: {page_count}
Blocks: {block_count}
API Access: Enabled
Memory Tag: {memory_tag}
"""
return formatted_info
except Exception as e:
logger.error(f"Error retrieving graph information: {str(e)}", exc_info=True)
return format_error_response(e)
@mcp.prompt()
async def summarize_page(page_title: str) -> dict:
"""
Create a prompt to summarize a page in Roam Research.
Args:
page_title: Title of the page to summarize
"""
if not validate_environment():
return {
"messages": [{
"role": "user",
"content": "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
}]
}
try:
content = get_page_content(page_title)
return {
"messages": [{
"role": "user",
"content": f"Please provide a concise summary of the following page content from my Roam Research database:\n\n{content}"
}]
}
except Exception as e:
logger.error(f"Error creating summary prompt: {str(e)}", exc_info=True)
return {
"messages": [{
"role": "user",
"content": f"I wanted to summarize my Roam page titled '{page_title}', but there was an error retrieving the content: {format_error_response(e)}. Can you help me troubleshoot this issue with my Roam Research integration?"
}]
}
def run_server(transport="stdio", port=None, verbose=False):
"""Run the MCP server with the specified transport."""
# Configure logging based on verbosity
setup_logging(verbose)
logger.info("Server starting...")
# Validate environment variables
valid_env = validate_environment()
if valid_env:
logger.info(f"API token and graph name are set")
logger.info(f"MEMORIES_TAG is set to: {MEMORIES_TAG}")
else:
logger.warning("Missing required environment variables")
# Run the server
try:
if transport == "stdio":
logger.info("Starting server with stdio transport")
mcp.run(transport="stdio")
elif transport == "sse":
if not port:
port = 3000
logger.info(f"Starting server with SSE transport on port {port}")
mcp.run(transport="sse", port=port)
else:
logger.error(f"Unsupported transport: {transport}")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Error running server: {str(e)}")
traceback.print_exc()
```
--------------------------------------------------------------------------------
/roam_mcp/content.py:
--------------------------------------------------------------------------------
```python
"""Content operations for the Roam MCP server (pages, blocks, and outlines)."""
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import re
import logging
import uuid
import time
import json
from roam_mcp.api import (
execute_query,
execute_write_action,
execute_batch_actions,
get_session_and_headers,
GRAPH_NAME,
find_or_create_page,
get_daily_page,
add_block_to_page,
update_block,
batch_update_blocks,
find_page_by_title,
ValidationError,
BlockNotFoundError,
PageNotFoundError,
TransactionError
)
from roam_mcp.utils import (
format_roam_date,
convert_to_roam_markdown,
parse_markdown_list,
process_nested_content,
find_block_uid,
create_block_action
)
# Set up logging
logger = logging.getLogger("roam-mcp.content")
def process_hierarchical_content(parent_uid: str, content_data: List[Dict[str, Any]], order: str = "last") -> Dict[str, Any]:
"""
Process hierarchical content with proper parent-child relationships.
This is a standardized utility function used across different content creation methods.
Args:
parent_uid: UID of the parent block/page
content_data: List of content items with text, level, and optional children/heading_level attributes
order: Where to add content ("first" or "last")
Returns:
Dictionary with success status and created block UIDs
"""
if not content_data:
return {
"success": True,
"created_uids": []
}
# First, validate the hierarchical structure
def validate_item(item, path="root"):
errors = []
# Check required fields
if not item.get("text") and not item.get("string"):
errors.append(f"Item at {path} is missing required 'text' field")
# Ensure level is valid
level = item.get("level")
if level is not None and not isinstance(level, int):
errors.append(f"Item at {path} has invalid 'level', must be an integer")
# Validate heading level
heading_level = item.get("heading_level", 0)
if heading_level and (not isinstance(heading_level, int) or heading_level < 0 or heading_level > 3):
errors.append(f"Item at {path} has invalid 'heading_level', must be an integer between 0 and 3")
# Validate children recursively
children = item.get("children", [])
if not isinstance(children, list):
errors.append(f"Item at {path} has invalid 'children', must be a list")
else:
for i, child in enumerate(children):
child_path = f"{path}.children[{i}]"
child_errors = validate_item(child, child_path)
errors.extend(child_errors)
return errors
# Validate all items
all_errors = []
for i, item in enumerate(content_data):
item_path = f"item[{i}]"
errors = validate_item(item, item_path)
all_errors.extend(errors)
if all_errors:
return {
"success": False,
"error": f"Invalid content structure: {'; '.join(all_errors)}"
}
# Process hierarchical content with proper nesting
session, headers = get_session_and_headers()
all_created_uids = []
# Define a recursive function to process items
def process_item(item, parent_uid, level_to_uid, current_level):
created_uids = []
# Get item properties
text = item.get("text", item.get("string", ""))
# Strip leading dash characters that might cause double bullets
text = re.sub(r'^-\s+', '', text)
level = item.get("level", current_level)
heading_level = item.get("heading_level", 0)
# Find the appropriate parent for this level
parent_level = level - 1
if parent_level < -1:
parent_level = -1
effective_parent = level_to_uid.get(parent_level, parent_uid)
# Create block with a unique UID
block_uid = str(uuid.uuid4())[:9]
action_data = {
"action": "create-block",
"location": {
"parent-uid": effective_parent,
"order": order if level == 0 else "last"
},
"block": {
"string": text,
"uid": block_uid
}
}
# Add heading level if specified
if heading_level and heading_level > 0 and heading_level <= 3:
action_data["block"]["heading"] = heading_level
# Execute the action
result = execute_write_action(action_data)
if result.get("success", False):
created_uids.append(block_uid)
level_to_uid[level] = block_uid
logger.debug(f"Created block at level {level} with UID: {block_uid}")
# Process children if any
children = item.get("children", [])
if children:
for child in children:
# Process each child with this block as parent
child_result = process_item(child, block_uid, level_to_uid, level + 1)
created_uids.extend(child_result)
# Add a brief delay for API stability
time.sleep(0.3)
else:
logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}")
return created_uids
try:
# Process each top-level item
level_to_uid = {-1: parent_uid} # Start with parent as level -1
for item in content_data:
item_uids = process_item(item, parent_uid, level_to_uid, 0)
all_created_uids.extend(item_uids)
return {
"success": True,
"created_uids": all_created_uids
}
except Exception as e:
error_msg = f"Failed to process hierarchical content: {str(e)}"
logger.error(error_msg)
return {
"success": False,
"error": error_msg,
"created_uids": all_created_uids # Return any UIDs created before failure
}
def create_nested_blocks(parent_uid: str, blocks_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Create nested blocks with proper parent-child relationships.
Args:
parent_uid: UID of the parent block/page
blocks_data: List of block data (text, level, children)
Returns:
Dictionary with success status and created block UIDs
"""
# For backward compatibility, now uses the standardized hierarchical content processor
return process_hierarchical_content(parent_uid, blocks_data)
def create_page(title: str, content: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
"""
Create a new page in Roam Research with optional nested content.
Args:
title: Title for the new page
content: Optional content as a list of dicts with 'text', optional 'level', and optional 'children'
Each item should have:
- 'text' or 'string': Content text
- 'level': Nesting level (optional, defaults to parent_level + 1)
- 'heading_level': Heading level 1-3 (optional)
- 'children': List of child items (optional)
Returns:
Result with page UID and created block UIDs
"""
if not title:
return {
"success": False,
"error": "Title is required"
}
session, headers = get_session_and_headers()
try:
# Create the page
page_uid = find_or_create_page(title)
# Add content if provided
if content:
# Use the standardized hierarchical content processor
result = process_hierarchical_content(page_uid, content)
if result["success"]:
return {
"success": True,
"uid": page_uid,
"created_uids": result.get("created_uids", []),
"page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
}
else:
return {
"success": False,
"error": result.get("error", "Failed to create content"),
"uid": page_uid,
"page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
}
return {
"success": True,
"uid": page_uid,
"page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except TransactionError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error creating page: {str(e)}")
return {
"success": False,
"error": f"Error creating page: {str(e)}"
}
def create_block(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None) -> Dict[str, Any]:
"""
Create a new block in Roam Research.
Args:
content: Block content - can be single-line text or multi-line content
that will be parsed into a hierarchical structure
page_uid: Optional page UID
page_title: Optional page title
Returns:
Result with block UID
"""
if not content:
return {
"success": False,
"error": "Content is required"
}
session, headers = get_session_and_headers()
try:
# Determine target page
target_page_uid = None
if page_uid:
# Use provided page UID
target_page_uid = page_uid
elif page_title:
# Find or create page by title
target_page_uid = find_or_create_page(page_title)
else:
# Use today's daily page
target_page_uid = get_daily_page()
# Handle multi-line content
if "\n" in content:
# Parse as nested structure
markdown_content = convert_to_roam_markdown(content)
parsed_content = parse_markdown_list(markdown_content)
# Check if there's any content
if not parsed_content:
return {
"success": False,
"error": "Failed to parse content"
}
# Build hierarchical structure
def build_hierarchy_from_parsed(items):
# Sort by level first
sorted_items = sorted(items, key=lambda x: x.get("level", 0))
# Group items by level
level_groups = {}
for item in sorted_items:
level = item.get("level", 0)
if level not in level_groups:
level_groups[level] = []
level_groups[level].append(item)
# Find the minimum level (root level)
min_level = min(level_groups.keys()) if level_groups else 0
root_items = level_groups.get(min_level, [])
# Track parents at each level
current_parents = {}
hierarchical_items = []
# Process items level by level
for level in sorted(level_groups.keys()):
for item in level_groups[level]:
if level == min_level:
# Root level items
hierarchical_items.append(item)
current_parents[level] = item
else:
# Find the parent
parent_level = level - 1
while parent_level >= min_level:
if parent_level in current_parents:
parent = current_parents[parent_level]
if "children" not in parent:
parent["children"] = []
parent["children"].append(item)
current_parents[level] = item
break
parent_level -= 1
# If no parent found, add as root
if parent_level < min_level:
hierarchical_items.append(item)
current_parents[level] = item
return hierarchical_items
# Build hierarchical structure
hierarchical_content = build_hierarchy_from_parsed(parsed_content)
# Process using the standardized hierarchical content processor
result = process_hierarchical_content(target_page_uid, hierarchical_content)
if result["success"]:
return {
"success": True,
"block_uid": result["created_uids"][0] if result["created_uids"] else None,
"parent_uid": target_page_uid,
"created_uids": result["created_uids"]
}
else:
return {
"success": False,
"error": result.get("error", "Failed to create hierarchical blocks"),
"parent_uid": target_page_uid
}
else:
# Create a simple block with explicit UID
block_uid = str(uuid.uuid4())[:9]
action_data = {
"action": "create-block",
"location": {
"parent-uid": target_page_uid,
"order": "last"
},
"block": {
"string": content,
"uid": block_uid
}
}
result = execute_write_action(action_data)
if result.get("success", False):
# Verify the block exists after a brief delay
time.sleep(0.5)
found_uid = find_block_uid(session, headers, GRAPH_NAME, content)
return {
"success": True,
"block_uid": found_uid or block_uid,
"parent_uid": target_page_uid
}
else:
return {
"success": False,
"error": "Failed to create block"
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except PageNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except BlockNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except TransactionError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error creating block: {str(e)}")
return {
"success": False,
"error": f"Error creating block: {str(e)}"
}
def create_outline(outline: List[Dict[str, Any]], page_title_uid: Optional[str] = None, block_text_uid: Optional[str] = None) -> Dict[str, Any]:
"""
Create a structured outline in Roam Research.
Args:
outline: List of outline items with text and level
Each item should have:
- 'text': Content text (required)
- 'level': Nesting level (required)
- 'heading_level': Heading level 1-3 (optional)
page_title_uid: Optional page title or UID
block_text_uid: Optional block text or UID to add outline under
Returns:
Result with created block UIDs
"""
# Validate outline
if not outline:
return {
"success": False,
"error": "Outline cannot be empty"
}
# Check for valid levels
invalid_items = [item for item in outline if not item.get("text") or not isinstance(item.get("level"), int)]
if invalid_items:
return {
"success": False,
"error": "All outline items must have text and a valid level"
}
session, headers = get_session_and_headers()
try:
# Determine target page
target_page_uid = None
if page_title_uid:
# Find page by title or UID
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if page_uid:
target_page_uid = page_uid
else:
# Create new page if not found
target_page_uid = find_or_create_page(page_title_uid)
else:
# Use today's daily page
target_page_uid = get_daily_page()
# Determine parent block
parent_uid = target_page_uid
if block_text_uid:
# Check if it's a valid block UID (9 characters)
if len(block_text_uid) == 9 and re.match(r'^[a-zA-Z0-9_-]{9}$', block_text_uid):
# Verify block exists
query = f'''[:find ?uid
:where [?b :block/uid "{block_text_uid}"]
[?b :block/uid ?uid]]'''
result = execute_query(query)
if result:
parent_uid = block_text_uid
else:
return {
"success": False,
"error": f"Block with UID {block_text_uid} not found"
}
else:
# Create a header block with the given text
action_data = {
"action": "create-block",
"location": {
"parent-uid": target_page_uid,
"order": "last"
},
"block": {
"string": block_text_uid,
"uid": str(uuid.uuid4())[:9]
}
}
execute_write_action(action_data)
time.sleep(0.5) # Add delay to ensure block is created
header_uid = find_block_uid(session, headers, GRAPH_NAME, block_text_uid)
if not header_uid:
return {
"success": False,
"error": f"Failed to create header block with text: {block_text_uid}"
}
parent_uid = header_uid
# Build hierarchical structure from flat outline items
def build_outline_hierarchy(items):
# First, sort by level
sorted_items = sorted(items, key=lambda x: x.get("level", 0))
# Group items by level
level_groups = {}
for item in sorted_items:
level = item.get("level", 0)
if level not in level_groups:
level_groups[level] = []
level_groups[level].append(item)
# Build parent-child relationships based on item position and level
min_level = min(level_groups.keys()) if level_groups else 0
hierarchical_items = []
# Track parent nodes at each level
level_parents = {}
# Process items in order
for item in sorted_items:
level = item.get("level", 0)
# If this is a root-level item, add it to the result directly
if level == min_level:
hierarchical_items.append(item)
level_parents[level] = item
else:
# Find the nearest parent level
parent_level = level - 1
while parent_level >= min_level and parent_level not in level_parents:
parent_level -= 1
# If we found a parent, add this item as its child
if parent_level >= min_level:
parent = level_parents[parent_level]
if "children" not in parent:
parent["children"] = []
parent["children"].append(item)
level_parents[level] = item
else:
# If no parent found, add it as a root item
hierarchical_items.append(item)
level_parents[level] = item
return hierarchical_items
# Build hierarchical structure from outline
hierarchical_outline = build_outline_hierarchy(outline)
# Use the standardized hierarchical content processor
result = process_hierarchical_content(parent_uid, hierarchical_outline)
if result["success"]:
return {
"success": True,
"page_uid": target_page_uid,
"parent_uid": parent_uid,
"created_uids": result.get("created_uids", [])
}
else:
return {
"success": False,
"error": result.get("error", "Failed to create outline"),
"page_uid": target_page_uid,
"parent_uid": parent_uid
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except PageNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except BlockNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except TransactionError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error creating outline: {str(e)}")
return {
"success": False,
"error": f"Error creating outline: {str(e)}"
}
def import_markdown(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None,
parent_uid: Optional[str] = None, parent_string: Optional[str] = None,
order: str = "last") -> Dict[str, Any]:
"""
Import markdown content into Roam Research.
Args:
content: Markdown content to import
page_uid: Optional page UID
page_title: Optional page title
parent_uid: Optional parent block UID
parent_string: Optional parent block text
order: Position ("first" or "last")
Returns:
Result with created block UIDs
"""
if not content:
return {
"success": False,
"error": "Content cannot be empty"
}
if order not in ["first", "last"]:
return {
"success": False,
"error": "Order must be 'first' or 'last'"
}
session, headers = get_session_and_headers()
try:
# Determine target page
target_page_uid = None
if page_uid:
# Use provided page UID
target_page_uid = page_uid
elif page_title:
# Find or create page by title
target_page_uid = find_or_create_page(page_title)
else:
# Use today's daily page
target_page_uid = get_daily_page()
# Determine parent block
parent_block_uid = target_page_uid
if parent_uid:
# Verify block exists
query = f'''[:find ?uid .
:where [?b :block/uid "{parent_uid}"]
[?b :block/uid ?uid]]'''
result = execute_query(query)
if result:
parent_block_uid = parent_uid
else:
return {
"success": False,
"error": f"Block with UID {parent_uid} not found"
}
elif parent_string:
# Find block by string
found_uid = find_block_uid(session, headers, GRAPH_NAME, parent_string)
if found_uid:
parent_block_uid = found_uid
else:
# Create parent block if it doesn't exist
block_uid = str(uuid.uuid4())[:9]
action_data = {
"action": "create-block",
"location": {
"parent-uid": target_page_uid,
"order": "last"
},
"block": {
"string": parent_string,
"uid": block_uid
}
}
execute_write_action(action_data)
time.sleep(1) # Wait for block to be created
found_uid = find_block_uid(session, headers, GRAPH_NAME, parent_string)
if found_uid:
parent_block_uid = found_uid
else:
parent_block_uid = block_uid
logger.debug(f"Created parent block with UID: {block_uid}")
# Convert markdown to Roam format
roam_markdown = convert_to_roam_markdown(content)
# Parse markdown into hierarchical structure
parsed_content = parse_markdown_list(roam_markdown)
if not parsed_content:
return {
"success": False,
"error": "Failed to parse markdown content"
}
# Build a proper hierarchical structure from the parsed markdown
def build_hierarchy(items):
# Group items by level
level_groups = {}
for item in items:
level = item.get("level", 0)
if level not in level_groups:
level_groups[level] = []
level_groups[level].append(item)
# Start with the root level (usually 0)
min_level = min(level_groups.keys()) if level_groups else 0
root_items = level_groups.get(min_level, [])
# Recursive function to build the tree
def attach_children(parent_items, parent_level):
for parent in parent_items:
children = []
child_level = parent_level + 1
# If there are items at the next level
if child_level in level_groups:
# Find children whose current parent would be this item
# based on the flattened list's position
parent_index = items.index(parent)
for potential_child in level_groups[child_level]:
child_index = items.index(potential_child)
# Is this child positioned after the parent and before the next parent?
if child_index > parent_index:
# Check if there's another parent of the same level between this parent and the child
next_parent_index = float('inf')
for next_parent in level_groups[parent_level]:
next_idx = items.index(next_parent)
if next_idx > parent_index and next_idx < child_index:
next_parent_index = next_idx
break
if child_index < next_parent_index:
children.append(potential_child)
# Set the children
if children:
parent["children"] = children
# Recursively attach children to these children
attach_children(children, child_level)
# Start the recursive process
attach_children(root_items, min_level)
return root_items
# Build a hierarchical structure that preserves parent-child relationships
hierarchical_content = build_hierarchy(parsed_content)
# Process the hierarchical content using the standardized utility
result = process_hierarchical_content(parent_block_uid, hierarchical_content, order)
if result["success"]:
return {
"success": True,
"page_uid": target_page_uid,
"parent_uid": parent_block_uid,
"created_uids": result.get("created_uids", [])
}
else:
return {
"success": False,
"error": result.get("error", "Failed to import markdown"),
"page_uid": target_page_uid,
"parent_uid": parent_block_uid
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except PageNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except BlockNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except TransactionError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error importing markdown: {str(e)}")
return {
"success": False,
"error": f"Error importing markdown: {str(e)}"
}
def add_todos(todos: List[str]) -> Dict[str, Any]:
"""
Add todo items to today's daily page.
Args:
todos: List of todo items
Returns:
Result with success status
"""
if not todos:
return {
"success": False,
"error": "Todo list cannot be empty"
}
if not all(isinstance(todo, str) for todo in todos):
return {
"success": False,
"error": "All todo items must be strings"
}
session, headers = get_session_and_headers()
try:
# Get today's daily page
daily_page_uid = get_daily_page()
# Create batch actions for todos
actions = []
todo_uids = []
for i, todo in enumerate(todos):
# Format with TODO syntax
todo_content = f"{{{{[[TODO]]}}}} {todo}"
# Generate UID
block_uid = str(uuid.uuid4())[:9]
todo_uids.append(block_uid)
# Create action
action = {
"action": "create-block",
"location": {
"parent-uid": daily_page_uid,
"order": "last"
},
"block": {
"string": todo_content,
"uid": block_uid
}
}
actions.append(action)
# Execute batch actions
result = execute_write_action(actions)
if result.get("success", False) or "created_uids" in result:
return {
"success": True,
"created_uids": result.get("created_uids", todo_uids),
"page_uid": daily_page_uid
}
else:
return {
"success": False,
"error": "Failed to create todo items"
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except PageNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except TransactionError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def update_content(block_uid: str, content: Optional[str] = None, transform_pattern: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Update a block's content or transform it using a pattern.
Args:
block_uid: Block UID
content: New content
transform_pattern: Pattern for transformation
Returns:
Result with updated content
"""
if not block_uid:
return {
"success": False,
"error": "Block UID is required"
}
if not content and not transform_pattern:
return {
"success": False,
"error": "Either content or transform_pattern must be provided"
}
try:
# Get current content if doing a transformation
if transform_pattern:
# Validate transform pattern
if not isinstance(transform_pattern, dict):
return {
"success": False,
"error": "Transform pattern must be an object"
}
if "find" not in transform_pattern or "replace" not in transform_pattern:
return {
"success": False,
"error": "Transform pattern must include 'find' and 'replace' properties"
}
query = f'''[:find ?string .
:where [?b :block/uid "{block_uid}"]
[?b :block/string ?string]]'''
current_content = execute_query(query)
if not current_content:
return {
"success": False,
"error": f"Block with UID {block_uid} not found"
}
# Apply transformation
find = transform_pattern["find"]
replace = transform_pattern["replace"]
global_replace = transform_pattern.get("global", True)
try:
flags = re.MULTILINE
count = 0 if global_replace else 1
new_content = re.sub(find, replace, current_content, count=count, flags=flags)
# Update block
update_block(block_uid, new_content)
return {
"success": True,
"content": new_content
}
except re.error as e:
return {
"success": False,
"error": f"Invalid regex pattern: {str(e)}"
}
else:
# Direct content update
update_block(block_uid, content)
return {
"success": True,
"content": content
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except BlockNotFoundError as e:
return {
"success": False,
"error": str(e)
}
except TransactionError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def update_multiple_contents(updates: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Update multiple blocks in a single operation.
Args:
updates: List of update operations
Returns:
Results of updates
"""
if not updates or not isinstance(updates, list):
return {
"success": False,
"error": "Updates must be a non-empty list"
}
try:
# Validate each update
for i, update in enumerate(updates):
if "block_uid" not in update:
return {
"success": False,
"error": f"Update at index {i} is missing required 'block_uid' property"
}
if "content" not in update and "transform" not in update:
return {
"success": False,
"error": f"Update at index {i} must include either 'content' or 'transform'"
}
if "transform" in update:
transform = update["transform"]
if not isinstance(transform, dict):
return {
"success": False,
"error": f"Transform at index {i} must be an object"
}
if "find" not in transform or "replace" not in transform:
return {
"success": False,
"error": f"Transform at index {i} must include 'find' and 'replace' properties"
}
# Batch update blocks in chunks of 50
CHUNK_SIZE = 50
results = batch_update_blocks(updates, CHUNK_SIZE)
# Count successful updates
successful = sum(1 for result in results if result.get("success"))
return {
"success": successful == len(updates),
"results": results,
"message": f"Updated {successful}/{len(updates)} blocks successfully"
}
except ValidationError as e:
return {
"success": False,
"error": str(e)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
"""
Create nested blocks with proper parent-child relationships.
Args:
parent_uid: UID of the parent block/page
blocks_data: List of block data (text, level, children)
Returns:
Dictionary with success status and created block UIDs
"""
# For backward compatibility, now uses the standardized hierarchical content processor
return process_hierarchical_content(parent_uid, blocks_data)
```
--------------------------------------------------------------------------------
/roam_mcp/search.py:
--------------------------------------------------------------------------------
```python
"""Search operations for the Roam MCP server."""
from typing import Dict, List, Any, Optional, Union, Set
from datetime import datetime, timedelta
import re
import logging
from roam_mcp.api import (
execute_query,
get_session_and_headers,
GRAPH_NAME,
find_page_by_title,
ValidationError,
QueryError,
PageNotFoundError,
BlockNotFoundError
)
from roam_mcp.utils import (
format_roam_date,
resolve_block_references
)
# Set up logging
logger = logging.getLogger("roam-mcp.search")
def validate_search_params(text: Optional[str] = None, tag: Optional[str] = None,
status: Optional[str] = None, page_title_uid: Optional[str] = None):
"""
Validate common search parameters.
Args:
text: Optional text to search for
tag: Optional tag to search for
status: Optional status to search for
page_title_uid: Optional page title or UID
Raises:
ValidationError: If parameters are invalid
"""
if status and status not in ["TODO", "DONE"]:
raise ValidationError("Status must be 'TODO' or 'DONE'", "status")
def search_by_text(text: str, page_title_uid: Optional[str] = None, case_sensitive: bool = True) -> Dict[str, Any]:
"""
Search for blocks containing specific text.
Args:
text: Text to search for
page_title_uid: Optional page title or UID to scope the search
case_sensitive: Whether to perform case-sensitive search
Returns:
Search results
"""
if not text:
return {
"success": False,
"matches": [],
"message": "Search text cannot be empty"
}
session, headers = get_session_and_headers()
# Prepare the query
if case_sensitive:
text_condition = f'(clojure.string/includes? ?s "{text}")'
else:
text_condition = f'(clojure.string/includes? (clojure.string/lower-case ?s) "{text.lower()}")'
try:
if page_title_uid:
# Try to find the page UID if a title was provided
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if not page_uid:
return {
"success": False,
"matches": [],
"message": f"Page '{page_title_uid}' not found"
}
query = f"""[:find ?uid ?s ?order
:where
[?p :block/uid "{page_uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/order ?order]
[{text_condition}]]"""
else:
query = f"""[:find ?uid ?s ?page-title
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[{text_condition}]]"""
# Execute the query
logger.debug(f"Executing text search for: {text}")
results = execute_query(query)
# Process the results
matches = []
if page_title_uid:
# For page-specific search, results are [uid, content, order]
for uid, content, order in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title_uid
})
else:
# For global search, results are [uid, content, page_title]
for uid, content, page_title in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title
})
return {
"success": True,
"matches": matches,
"message": f"Found {len(matches)} block(s) containing \"{text}\""
}
except PageNotFoundError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error searching by text: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Error searching by text: {str(e)}"
}
def search_by_tag(tag: str, page_title_uid: Optional[str] = None, near_tag: Optional[str] = None) -> Dict[str, Any]:
"""
Search for blocks containing a specific tag.
Args:
tag: Tag to search for (without # or [[ ]])
page_title_uid: Optional page title or UID to scope the search
near_tag: Optional second tag that must appear in the same block
Returns:
Search results
"""
if not tag:
return {
"success": False,
"matches": [],
"message": "Tag cannot be empty"
}
session, headers = get_session_and_headers()
# Format the tag for searching
# Remove any existing formatting
clean_tag = tag.replace('#', '').replace('[[', '').replace(']]', '')
tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
# Build tag conditions
tag_conditions = []
for variant in tag_variants:
tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
tag_condition = f"(or {' '.join(tag_conditions)})"
# Add near_tag condition if provided
if near_tag:
clean_near_tag = near_tag.replace('#', '').replace('[[', '').replace(']]', '')
near_tag_variants = [f"#{clean_near_tag}", f"#[[{clean_near_tag}]]", f"[[{clean_near_tag}]]"]
near_tag_conditions = []
for variant in near_tag_variants:
near_tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
near_tag_condition = f"(or {' '.join(near_tag_conditions)})"
combined_condition = f"(and {tag_condition} {near_tag_condition})"
else:
combined_condition = tag_condition
try:
# Build query based on whether we're searching in a specific page
if page_title_uid:
# Try to find the page UID if a title was provided
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if not page_uid:
return {
"success": False,
"matches": [],
"message": f"Page '{page_title_uid}' not found"
}
query = f"""[:find ?uid ?s
:where
[?p :block/uid "{page_uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[{combined_condition}]]"""
else:
query = f"""[:find ?uid ?s ?page-title
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[{combined_condition}]]"""
# Execute the query
logger.debug(f"Executing tag search for: {tag}")
if near_tag:
logger.debug(f"With near tag: {near_tag}")
results = execute_query(query)
# Process the results
matches = []
if page_title_uid:
# For page-specific search, results are [uid, content]
for uid, content in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title_uid
})
else:
# For global search, results are [uid, content, page_title]
for uid, content, page_title in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title
})
# Build message
message = f"Found {len(matches)} block(s) with tag #{clean_tag}"
if near_tag:
message += f" near #{clean_near_tag}"
return {
"success": True,
"matches": matches,
"message": message
}
except PageNotFoundError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error searching by tag: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Error searching by tag: {str(e)}"
}
def search_by_status(status: str, page_title_uid: Optional[str] = None, include: Optional[str] = None, exclude: Optional[str] = None) -> Dict[str, Any]:
"""
Search for blocks with a specific status (TODO/DONE).
Args:
status: Status to search for ("TODO" or "DONE")
page_title_uid: Optional page title or UID to scope the search
include: Optional comma-separated keywords to include
exclude: Optional comma-separated keywords to exclude
Returns:
Search results
"""
if status not in ["TODO", "DONE"]:
return {
"success": False,
"matches": [],
"message": "Status must be either 'TODO' or 'DONE'"
}
session, headers = get_session_and_headers()
# Status pattern
status_pattern = f"{{{{[[{status}]]}}}}"
try:
# Build query based on whether we're searching in a specific page
if page_title_uid:
# Try to find the page UID if a title was provided
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if not page_uid:
return {
"success": False,
"matches": [],
"message": f"Page '{page_title_uid}' not found"
}
query = f"""[:find ?uid ?s
:where
[?p :block/uid "{page_uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[(clojure.string/includes? ?s "{status_pattern}")]]"""
else:
query = f"""[:find ?uid ?s ?page-title
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[(clojure.string/includes? ?s "{status_pattern}")]]"""
# Execute the query
logger.debug(f"Executing status search for: {status}")
results = execute_query(query)
# Process the results
matches = []
if page_title_uid:
# For page-specific search, results are [uid, content]
for uid, content in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
# Apply include/exclude filters
if include:
include_terms = [term.strip().lower() for term in include.split(',')]
if not any(term in resolved_content.lower() for term in include_terms):
continue
if exclude:
exclude_terms = [term.strip().lower() for term in exclude.split(',')]
if any(term in resolved_content.lower() for term in exclude_terms):
continue
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title_uid
})
else:
# For global search, results are [uid, content, page_title]
for uid, content, page_title in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
# Apply include/exclude filters
if include:
include_terms = [term.strip().lower() for term in include.split(',')]
if not any(term in resolved_content.lower() for term in include_terms):
continue
if exclude:
exclude_terms = [term.strip().lower() for term in exclude.split(',')]
if any(term in resolved_content.lower() for term in exclude_terms):
continue
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title
})
# Build message
message = f"Found {len(matches)} block(s) with status {status}"
if include:
message += f" including '{include}'"
if exclude:
message += f" excluding '{exclude}'"
return {
"success": True,
"matches": matches,
"message": message
}
except PageNotFoundError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error searching by status: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Error searching by status: {str(e)}"
}
def search_block_refs(block_uid: Optional[str] = None, page_title_uid: Optional[str] = None) -> Dict[str, Any]:
"""
Search for block references.
Args:
block_uid: Optional UID of the block to find references to
page_title_uid: Optional page title or UID to scope the search
Returns:
Search results
"""
session, headers = get_session_and_headers()
# Determine what kind of search we're doing
if block_uid:
block_ref_pattern = f"(({block_uid}))"
description = f"referencing block (({block_uid}))"
else:
block_ref_pattern = "\\(\\([^)]+\\)\\)"
description = "containing block references"
try:
# Build query based on whether we're searching in a specific page
if page_title_uid:
# Try to find the page UID if a title was provided
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if not page_uid:
return {
"success": False,
"matches": [],
"message": f"Page '{page_title_uid}' not found"
}
if block_uid:
query = f"""[:find ?uid ?s
:where
[?p :block/uid "{page_uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[(clojure.string/includes? ?s "{block_ref_pattern}")]]"""
else:
query = f"""[:find ?uid ?s
:where
[?p :block/uid "{page_uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/uid ?uid]
[(re-find #"\\(\\([^)]+\\)\\)" ?s)]]"""
else:
if block_uid:
query = f"""[:find ?uid ?s ?page-title
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[(clojure.string/includes? ?s "{block_ref_pattern}")]]"""
else:
query = f"""[:find ?uid ?s ?page-title
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[(re-find #"\\(\\([^)]+\\)\\)" ?s)]]"""
# Execute the query
logger.debug(f"Executing block reference search")
results = execute_query(query)
# Process the results
matches = []
if page_title_uid:
# For page-specific search, results are [uid, content]
for uid, content in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title_uid
})
else:
# For global search, results are [uid, content, page_title]
for uid, content, page_title in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"page_title": page_title
})
return {
"success": True,
"matches": matches,
"message": f"Found {len(matches)} block(s) {description}"
}
except PageNotFoundError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error searching block references: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Error searching block references: {str(e)}"
}
def search_hierarchy(parent_uid: Optional[str] = None, child_uid: Optional[str] = None,
page_title_uid: Optional[str] = None, max_depth: int = 1) -> Dict[str, Any]:
"""
Search for parents or children in the block hierarchy.
Args:
parent_uid: Optional UID of the block to find children of
child_uid: Optional UID of the block to find parents of
page_title_uid: Optional page title or UID to scope the search
max_depth: Maximum depth to search
Returns:
Search results
"""
if not parent_uid and not child_uid:
return {
"success": False,
"matches": [],
"message": "Either parent_uid or child_uid must be provided"
}
if max_depth < 1:
return {
"success": False,
"matches": [],
"message": "max_depth must be at least 1"
}
if max_depth > 10:
max_depth = 10
logger.warning("max_depth limited to 10")
session, headers = get_session_and_headers()
# Define ancestor rule
ancestor_rule = """[
[(ancestor ?child ?parent ?depth)
[?parent :block/children ?child]
[(identity 1) ?depth]]
[(ancestor ?child ?parent ?depth)
[?mid :block/children ?child]
(ancestor ?mid ?parent ?prev-depth)
[(+ ?prev-depth 1) ?depth]]
]"""
try:
# Determine search type and build query
if parent_uid:
# Searching for children
if page_title_uid:
# Try to find the page UID if a title was provided
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if not page_uid:
return {
"success": False,
"matches": [],
"message": f"Page '{page_title_uid}' not found"
}
query = f"""[:find ?uid ?s ?depth
:in $ % ?parent-uid ?max-depth
:where
[?parent :block/uid ?parent-uid]
[?p :block/uid "{page_uid}"]
(ancestor ?b ?parent ?depth)
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[(<= ?depth ?max-depth)]]"""
inputs = [ancestor_rule, parent_uid, max_depth]
else:
query = f"""[:find ?uid ?s ?page-title ?depth
:in $ % ?parent-uid ?max-depth
:where
[?parent :block/uid ?parent-uid]
(ancestor ?b ?parent ?depth)
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[(<= ?depth ?max-depth)]]"""
inputs = [ancestor_rule, parent_uid, max_depth]
description = f"descendants of block {parent_uid}"
else:
# Searching for parents
if page_title_uid:
# Try to find the page UID if a title was provided
page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
if not page_uid:
return {
"success": False,
"matches": [],
"message": f"Page '{page_title_uid}' not found"
}
query = f"""[:find ?uid ?s ?depth
:in $ % ?child-uid ?max-depth
:where
[?child :block/uid ?child-uid]
[?p :block/uid "{page_uid}"]
(ancestor ?child ?b ?depth)
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[(<= ?depth ?max-depth)]]"""
inputs = [ancestor_rule, child_uid, max_depth]
else:
query = f"""[:find ?uid ?s ?page-title ?depth
:in $ % ?child-uid ?max-depth
:where
[?child :block/uid ?child-uid]
(ancestor ?child ?b ?depth)
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[(<= ?depth ?max-depth)]]"""
inputs = [ancestor_rule, child_uid, max_depth]
description = f"ancestors of block {child_uid}"
# Execute the query
logger.debug(f"Executing hierarchy search with max_depth: {max_depth}")
results = execute_query(query, inputs)
# Process the results
matches = []
if page_title_uid:
# For page-specific search, results are [uid, content, depth]
for uid, content, depth in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"depth": depth,
"page_title": page_title_uid
})
else:
# For global search, results are [uid, content, page_title, depth]
for uid, content, page_title, depth in results:
# Resolve references if present
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
matches.append({
"block_uid": uid,
"content": resolved_content,
"depth": depth,
"page_title": page_title
})
return {
"success": True,
"matches": matches,
"message": f"Found {len(matches)} block(s) as {description}"
}
except PageNotFoundError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except BlockNotFoundError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error searching hierarchy: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Error searching hierarchy: {str(e)}"
}
def search_by_date(start_date: str, end_date: Optional[str] = None,
type_filter: str = "created", scope: str = "blocks",
include_content: bool = True) -> Dict[str, Any]:
"""
Search for blocks or pages based on creation or modification dates.
Args:
start_date: Start date in ISO format (YYYY-MM-DD)
end_date: Optional end date in ISO format (YYYY-MM-DD)
type_filter: Whether to search by "created", "modified", or "both"
scope: Whether to search "blocks", "pages", or "both"
include_content: Whether to include block/page content
Returns:
Search results
"""
# Validate inputs
if type_filter not in ["created", "modified", "both"]:
return {
"success": False,
"matches": [],
"message": "Type must be 'created', 'modified', or 'both'"
}
if scope not in ["blocks", "pages", "both"]:
return {
"success": False,
"matches": [],
"message": "Scope must be 'blocks', 'pages', or 'both'"
}
# Parse dates
try:
start_timestamp = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000)
if end_date:
# Set end_date to end of day
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
end_dt = end_dt.replace(hour=23, minute=59, second=59)
end_timestamp = int(end_dt.timestamp() * 1000)
else:
# Default to now if no end date
end_timestamp = int(datetime.now().timestamp() * 1000)
except ValueError:
return {
"success": False,
"matches": [],
"message": "Invalid date format. Dates should be in YYYY-MM-DD format."
}
session, headers = get_session_and_headers()
# Track matches across all queries to handle sorting
all_matches = []
logger.debug(f"Executing date search: {start_date} to {end_date or 'now'}")
try:
# Build and execute queries based on scope and type
# Block queries for creation time
if scope in ["blocks", "both"] and type_filter in ["created", "both"]:
logger.debug("Searching blocks by creation time")
query = f"""[:find ?uid ?s ?page-title ?time
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[?b :create/time ?time]
[(>= ?time {start_timestamp})]
[(<= ?time {end_timestamp})]]
:limit 1000"""
block_created_results = execute_query(query)
for uid, content, page_title, time in block_created_results:
match_data = {
"uid": uid,
"type": "block",
"time": time,
"time_type": "created",
"page_title": page_title
}
if include_content:
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
match_data["content"] = resolved_content
all_matches.append(match_data)
# Block queries for modification time
if scope in ["blocks", "both"] and type_filter in ["modified", "both"]:
logger.debug("Searching blocks by modification time")
query = f"""[:find ?uid ?s ?page-title ?time
:where
[?b :block/string ?s]
[?b :block/uid ?uid]
[?b :block/page ?p]
[?p :node/title ?page-title]
[?b :edit/time ?time]
[(>= ?time {start_timestamp})]
[(<= ?time {end_timestamp})]]
:limit 1000"""
block_modified_results = execute_query(query)
for uid, content, page_title, time in block_modified_results:
match_data = {
"uid": uid,
"type": "block",
"time": time,
"time_type": "modified",
"page_title": page_title
}
if include_content:
resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
match_data["content"] = resolved_content
all_matches.append(match_data)
# Page queries for creation time
if scope in ["pages", "both"] and type_filter in ["created", "both"]:
logger.debug("Searching pages by creation time")
query = f"""[:find ?uid ?title ?time
:where
[?p :node/title ?title]
[?p :block/uid ?uid]
[?p :create/time ?time]
[(>= ?time {start_timestamp})]
[(<= ?time {end_timestamp})]]
:limit 500"""
page_created_results = execute_query(query)
for uid, title, time in page_created_results:
match_data = {
"uid": uid,
"type": "page",
"time": time,
"time_type": "created",
"title": title
}
if include_content:
# Get a sample of page content (first few blocks)
sample_query = f"""[:find ?s
:where
[?p :block/uid "{uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/order ?o]
[(< ?o 3)]]
:limit 3"""
page_blocks = execute_query(sample_query)
page_sample = "\n".join([content[0] for content in page_blocks[:3]])
if page_blocks:
match_data["content"] = f"# {title}\n{page_sample}"
if len(page_blocks) > 3:
match_data["content"] += "\n..."
else:
match_data["content"] = f"# {title}\n(No content)"
all_matches.append(match_data)
# Page queries for modification time
if scope in ["pages", "both"] and type_filter in ["modified", "both"]:
logger.debug("Searching pages by modification time")
query = f"""[:find ?uid ?title ?time
:where
[?p :node/title ?title]
[?p :block/uid ?uid]
[?p :edit/time ?time]
[(>= ?time {start_timestamp})]
[(<= ?time {end_timestamp})]]
:limit 500"""
page_modified_results = execute_query(query)
for uid, title, time in page_modified_results:
match_data = {
"uid": uid,
"type": "page",
"time": time,
"time_type": "modified",
"title": title
}
if include_content:
# Get a sample of page content (first few blocks)
sample_query = f"""[:find ?s
:where
[?p :block/uid "{uid}"]
[?b :block/page ?p]
[?b :block/string ?s]
[?b :block/order ?o]
[(< ?o 3)]]
:limit 3"""
page_blocks = execute_query(sample_query)
page_sample = "\n".join([content[0] for content in page_blocks[:3]])
if page_blocks:
match_data["content"] = f"# {title}\n{page_sample}"
if len(page_blocks) > 3:
match_data["content"] += "\n..."
else:
match_data["content"] = f"# {title}\n(No content)"
all_matches.append(match_data)
# Sort by time (newest first)
all_matches.sort(key=lambda x: x["time"], reverse=True)
# Deduplicate by UID and time_type
seen = set()
unique_matches = []
for match in all_matches:
key = (match["uid"], match["time_type"])
if key not in seen:
seen.add(key)
unique_matches.append(match)
return {
"success": True,
"matches": unique_matches,
"message": f"Found {len(unique_matches)} matches for the given date range and criteria"
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error searching by date: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Error searching by date: {str(e)}"
}
def find_pages_modified_today(max_num_pages: int = 50) -> Dict[str, Any]:
"""
Find pages that have been modified today.
Args:
max_num_pages: Maximum number of pages to return
Returns:
List of modified pages
"""
if max_num_pages < 1:
return {
"success": False,
"pages": [],
"message": "max_num_pages must be at least 1"
}
# Define ancestor rule
ancestor_rule = """[
[(ancestor ?b ?a)
[?a :block/children ?b]]
[(ancestor ?b ?a)
[?parent :block/children ?b]
(ancestor ?parent ?a)]
]"""
# Get start of today
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_timestamp = int(today.timestamp() * 1000)
try:
# Query for pages modified today
logger.debug(f"Finding pages modified today (since {today.isoformat()})")
query = f"""[:find ?title
:in $ ?start_timestamp %
:where
[?page :node/title ?title]
(ancestor ?block ?page)
[?block :edit/time ?time]
[(> ?time ?start_timestamp)]]
:limit {max_num_pages}"""
results = execute_query(query, [start_timestamp, ancestor_rule])
# Extract unique page titles
unique_pages = list(set([title[0] for title in results]))[:max_num_pages]
return {
"success": True,
"pages": unique_pages,
"message": f"Found {len(unique_pages)} page(s) modified today"
}
except QueryError as e:
return {
"success": False,
"pages": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error finding pages modified today: {str(e)}")
return {
"success": False,
"pages": [],
"message": f"Error finding pages modified today: {str(e)}"
}
def execute_datomic_query(query: str, inputs: Optional[List[Any]] = None) -> Dict[str, Any]:
"""
Execute a custom Datomic query.
Args:
query: The Datomic query
inputs: Optional list of inputs
Returns:
Query results
"""
if not query:
return {
"success": False,
"matches": [],
"message": "Query cannot be empty"
}
try:
# Validate query format (basic check)
if not query.strip().startswith('[:find'):
logger.warning("Query doesn't start with [:find, may not be valid Datalog syntax")
logger.debug(f"Executing custom Datomic query")
results = execute_query(query, inputs or [])
# Format results for display
formatted_results = []
for result in results:
if isinstance(result, (list, tuple)):
formatted_result = " | ".join(str(item) for item in result)
else:
formatted_result = str(result)
formatted_results.append({
"content": formatted_result,
"block_uid": "",
"page_title": ""
})
return {
"success": True,
"matches": formatted_results,
"message": f"Query executed successfully. Found {len(formatted_results)} results."
}
except QueryError as e:
return {
"success": False,
"matches": [],
"message": str(e)
}
except Exception as e:
logger.error(f"Error executing datomic query: {str(e)}")
return {
"success": False,
"matches": [],
"message": f"Failed to execute query: {str(e)}"
}
```