This is page 1 of 2. Use http://codebase.md/philosolares/roam-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .gitignore
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── readme.md
├── requirements.txt
├── roam_mcp
│ ├── __init__.py
│ ├── api.py
│ ├── cli.py
│ ├── content_parsers.py
│ ├── content.py
│ ├── memory.py
│ ├── search.py
│ ├── server.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
1 | # Version control
2 | .git
3 | .gitignore
4 |
5 | # Python cache files
6 | __pycache__/
7 | *.py[cod]
8 | *$py.class
9 | *.so
10 | .Python
11 | env/
12 | build/
13 | develop-eggs/
14 | dist/
15 | downloads/
16 | eggs/
17 | .eggs/
18 | lib/
19 | lib64/
20 | parts/
21 | sdist/
22 | var/
23 | *.egg-info/
24 | .installed.cfg
25 | *.egg
26 |
27 | # Virtual environments
28 | venv/
29 | ENV/
30 | env/
31 |
32 | # IDE specific files
33 | .idea/
34 | .vscode/
35 | *.swp
36 | *.swo
37 |
38 | # Docker specific
39 | Dockerfile
40 | .dockerignore
41 |
42 | # Local development files
43 | .env
44 | *.log
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # OS-specific
2 | .DS_Store
3 |
4 | # Python bytecode and C extensions
5 | __pycache__/
6 | *.py[cod]
7 | *$py.class
8 | *.so
9 |
10 | # Build system artifacts
11 | .Python
12 | build/
13 | dist/
14 | downloads/
15 | *.egg-info/
16 | .eggs/
17 | *.egg
18 | MANIFEST
19 | wheels/
20 | share/python-wheels/
21 | sdist/
22 | develop-eggs/
23 | lib/
24 | lib64/
25 | parts/
26 | var/
27 | .installed.cfg
28 |
29 | # Packaging tools
30 | *.spec
31 |
32 | # Logs and runtime files
33 | *.log
34 | pip-log.txt
35 | pip-delete-this-directory.txt
36 |
37 | # Coverage and test artifacts
38 | htmlcov/
39 | .tox/
40 | .nox/
41 | .coverage
42 | .coverage.*
43 | .cache
44 | nosetests.xml
45 | coverage.xml
46 | *.cover
47 | *.py,cover
48 | .pytest_cache/
49 | .hypothesis/
50 |
51 | # Environments
52 | .env
53 | .venv
54 | env/
55 | venv/
56 | ENV/
57 | env.bak/
58 | venv.bak/
59 |
60 | # IDEs and editors
61 | .vscode/
62 | .idea/
63 | .spyderproject
64 | .spyproject
65 | .ropeproject
66 |
67 | # Jupyter and IPython
68 | .ipynb_checkpoints/
69 | profile_default/
70 | ipython_config.py
71 |
72 | # Documentation
73 | docs/_build/
74 | site/
75 |
76 | # Type checking
77 | .mypy_cache/
78 | .dmypy.json
79 | .pyre/
80 | .pytype/
81 |
82 | # Other project tools
83 | .pybuilder/
84 | target/
85 | scrapy/
86 | .webassets-cache
87 | celerybeat-schedule
88 | celerybeat.pid
89 |
90 | # SQLite (dev DBs)
91 | db.sqlite3
92 | db.sqlite3-journal
93 |
94 | # Claude Code and MCP configuration
95 | .mcp.json
96 | CLAUDE.md
97 | WARP.md
```
--------------------------------------------------------------------------------
/readme.md:
--------------------------------------------------------------------------------
```markdown
1 | # Roam Research MCP Server
2 |
3 | A Model Context Protocol (MCP) server that connects Claude and other AI assistants to your Roam Research graph.
4 |
5 | ## What This Does
6 |
7 | This server acts as a bridge between AI assistants and your Roam Research database. After setup, you can simply ask Claude to work with your Roam data - no coding required.
8 |
9 | For example, you can say:
10 | - "Add these meeting notes to today's daily note in Roam"
11 | - "Search my Roam graph for blocks tagged with #ProjectIdeas"
12 | - "Create a new page in Roam called 'Project Planning'"
13 | - "Find all TODO items I created this month"
14 |
15 | ## Features
16 |
17 | ### Content Creation
18 | - Create new pages with nested content and headings
19 | - Add blocks to any page with proper hierarchy
20 | - Create structured outlines with customizable nesting
21 | - Import markdown with proper nesting
22 | - Add todo items with automatic TODO status
23 | - Update existing content individually or in batches
24 | - Modify block content with pattern transformations
25 |
26 | ### Search and Retrieval
27 | - Find pages and blocks by title, text, or tags
28 | - Search for TODO/DONE items with filtering options
29 | - Find recently modified content
30 | - Search block references and explore block hierarchies
31 | - Search by creation or modification dates
32 | - Navigate parent-child relationships in blocks
33 | - Execute custom Datalog queries for advanced needs
34 |
35 | ### Memory System
36 | - Store information for Claude to remember across conversations
37 | - Recall stored memories with filtering and sorting options
38 | - Tag memories with custom categories
39 | - Access both recent and older memories with flexible retrieval
40 |
41 | ### URL Content Processing
42 | - Extract and import content from webpages
43 | - Parse and extract text from PDF documents
44 | - Retrieve YouTube video transcripts
45 | - Intelligently detect content type and process accordingly
46 |
47 | ## Setup Instructions
48 |
49 | 1. Install Claude Desktop from [https://claude.ai/download](https://claude.ai/download)
50 |
51 | 2. Edit your Claude Desktop configuration file:
52 | - Mac: `~/Library/Application Support/Claude/claude_desktop_config.json`
53 | - Windows: `%APPDATA%\Claude\claude_desktop_config.json`
54 |
55 | 3. Add this configuration:
56 |
57 | ```json
58 | {
59 | "mcpServers": {
60 | "roam-helper": {
61 | "command": "uvx",
62 | "args": ["git+https://github.com/PhiloSolares/roam-mcp.git"],
63 | "env": {
64 | "ROAM_API_TOKEN": "<your_roam_api_token>",
65 | "ROAM_GRAPH_NAME": "<your_roam_graph_name>"
66 | }
67 | }
68 | }
69 | }
70 | ```
71 |
72 | 4. Get your Roam API token:
73 | - Go to your Roam Research graph settings
74 | - Navigate to "API tokens"
75 | - Click "+ New API Token"
76 | - Copy the token to your configuration
77 |
78 | ## How to Use
79 |
80 | Once set up, simply chat with Claude and ask it to work with your Roam graph. Claude will use the appropriate MCP commands behind the scenes.
81 |
82 | Example conversations:
83 |
84 | **Creating Content:**
85 | > You: "Claude, please create a new page in my Roam graph called 'Project Ideas' with a section for mobile app ideas."
86 |
87 | **Searching Content:**
88 | > You: "Find all blocks in my Roam graph tagged with #ProjectIdeas that also mention mobile apps."
89 | >
90 | > You: "Show me all the TODO items I created this week."
91 |
92 | **Using the Memory System:**
93 | > You: "Remember that I want to use spaced repetition for learning JavaScript."
94 | >
95 | > Later:
96 | > You: "What learning techniques have we discussed for programming?"
97 |
98 | **Working with External Content:**
99 | > You: "Extract the main points from this PDF and add them to my Roam graph."
100 | >
101 | > You: "Get the transcript from this YouTube video about productivity."
102 |
103 | ## Advanced Configuration
104 |
105 | By default, memories are stored with the tag `#[[Memories]]`. To use a different tag:
106 |
107 | ```json
108 | "env": {
109 | "ROAM_API_TOKEN": "your-token",
110 | "ROAM_GRAPH_NAME": "your-graph",
111 | "MEMORIES_TAG": "#[[Claude/Memories]]"
112 | }
113 | ```
114 |
115 | ## Docker Support
116 |
117 | You can run the Roam MCP server in a Docker container:
118 |
119 | ### Building the Image
120 |
121 | ```bash
122 | docker build -t roam-mcp .
123 | ```
124 |
125 | ### Running the Container
126 |
127 | Run with environment variables:
128 |
129 | ```bash
130 | docker run -p 3000:3000 \
131 | -e ROAM_API_TOKEN="your_api_token" \
132 | -e ROAM_GRAPH_NAME="your_graph_name" \
133 | roam-mcp
134 | ```
135 |
136 | ### Using with Claude Desktop
137 |
138 | Configure Claude Desktop to use the containerized server:
139 |
140 | ```json
141 | {
142 | "mcpServers": {
143 | "roam-helper": {
144 | "command": "docker",
145 | "args": ["run", "--rm", "-p", "3000:3000",
146 | "-e", "ROAM_API_TOKEN=your_token",
147 | "-e", "ROAM_GRAPH_NAME=your_graph",
148 | "roam-mcp"],
149 | "env": {}
150 | }
151 | }
152 | }
153 | ```
154 |
155 | ## License
156 |
157 | MIT License
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
1 | mcp>=1.3.0
2 | httpx>=0.24.0
3 | pydantic>=2.0.0
4 | youtube-transcript-api>=0.6.0
5 | requests>=2.28.0
6 | python-dotenv>=1.0.0
7 | trafilatura>=1.6.0
8 | unstructured[pdf]>=0.10.0
```
--------------------------------------------------------------------------------
/roam_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Roam Research MCP Server - Python implementation
3 | Connect Claude to your Roam Research database
4 |
5 | Enhanced version with improved architecture and features
6 | """
7 |
8 | __version__ = "0.3.0"
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
1 | # Base image
2 | FROM python:3.11-slim
3 |
4 | # Set environment variables
5 | ENV PYTHONDONTWRITEBYTECODE=1
6 | ENV PYTHONUNBUFFERED=1
7 | ENV ROAM_API_TOKEN=""
8 | ENV ROAM_GRAPH_NAME=""
9 | ENV MEMORIES_TAG="#[[Memories]]"
10 |
11 | # Install system dependencies for PDF processing
12 | RUN apt-get update && \
13 | apt-get install -y --no-install-recommends \
14 | gcc \
15 | poppler-utils \
16 | libmagic1 \
17 | && apt-get clean \
18 | && rm -rf /var/lib/apt/lists/*
19 |
20 | # Create a non-root user
21 | RUN useradd -m appuser
22 |
23 | # Create and set working directory
24 | WORKDIR /app
25 |
26 | # Copy requirements file for caching
27 | COPY --chown=appuser:appuser requirements.txt ./
28 |
29 | # Install Python dependencies
30 | RUN pip install --no-cache-dir --upgrade pip && \
31 | pip install --no-cache-dir -r requirements.txt
32 |
33 | # Copy the application code
34 | COPY --chown=appuser:appuser . .
35 |
36 | # Change to non-root user
37 | USER appuser
38 |
39 | # Expose port for SSE transport
40 | EXPOSE 3000
41 |
42 | # Command to run the application (can be overridden)
43 | CMD ["python", "-m", "roam_mcp.cli", "--transport", "sse", "--port", "3000"]
```
--------------------------------------------------------------------------------
/roam_mcp/cli.py:
--------------------------------------------------------------------------------
```python
1 | """Command-line interface for the Roam MCP server."""
2 |
3 | import argparse
4 | import sys
5 | from roam_mcp.server import run_server
6 |
7 | def main():
8 | """Entry point for the Roam MCP server CLI."""
9 | parser = argparse.ArgumentParser(description="Roam Research MCP Server")
10 |
11 | # Transport options
12 | parser.add_argument(
13 | "--transport",
14 | choices=["stdio", "sse"],
15 | default="stdio",
16 | help="Transport method (stdio or sse)"
17 | )
18 |
19 | # Server configuration
20 | parser.add_argument(
21 | "--port",
22 | type=int,
23 | default=3000,
24 | help="Port for SSE transport (default: 3000)"
25 | )
26 |
27 | # Verbosity options
28 | parser.add_argument(
29 | "-v", "--verbose",
30 | action="store_true",
31 | help="Enable verbose logging"
32 | )
33 |
34 | # Parse arguments
35 | args = parser.parse_args()
36 |
37 | # Run the server with the specified transport
38 | try:
39 | run_server(
40 | transport=args.transport,
41 | port=args.port if args.transport == "sse" else None,
42 | verbose=args.verbose
43 | )
44 | except KeyboardInterrupt:
45 | print("\nServer stopped by user", file=sys.stderr)
46 | sys.exit(0)
47 | except Exception as e:
48 | print(f"Error starting server: {str(e)}", file=sys.stderr)
49 | sys.exit(1)
50 |
51 | if __name__ == "__main__":
52 | main()
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [build-system]
2 | requires = ["hatchling"]
3 | build-backend = "hatchling.build"
4 |
5 | [project]
6 | name = "roam-mcp"
7 | version = "0.2.0"
8 | description = "A Model Context Protocol server for Roam Research integration with AI assistants"
9 | readme = "README.md"
10 | requires-python = ">=3.9"
11 | license = {text = "MIT"}
12 | authors = [
13 | {name = "Roam MCP Project Contributors"}
14 | ]
15 | classifiers = [
16 | "Development Status :: 4 - Beta",
17 | "Intended Audience :: End Users/Desktop",
18 | "License :: OSI Approved :: MIT License",
19 | "Programming Language :: Python :: 3",
20 | "Programming Language :: Python :: 3.9",
21 | "Programming Language :: Python :: 3.10",
22 | "Programming Language :: Python :: 3.11",
23 | ]
24 | dependencies = [
25 | "mcp>=1.3.0",
26 | "httpx>=0.24.0",
27 | "pydantic>=2.0.0",
28 | "youtube-transcript-api>=0.6.0",
29 | "requests>=2.28.0",
30 | "python-dotenv>=1.0.0",
31 | "trafilatura>=1.6.0",
32 | "unstructured[pdf]>=0.10.0"
33 | ]
34 |
35 | [project.optional-dependencies]
36 | dev = [
37 | "pytest>=7.0.0",
38 | "black>=23.0.0",
39 | "isort>=5.12.0",
40 | "mypy>=1.0.0",
41 | "pylint>=2.17.0"
42 | ]
43 | pdf = [
44 | "poppler-utils>=23.01.0"
45 | ]
46 |
47 | [project.scripts]
48 | roam-mcp = "roam_mcp.cli:main"
49 |
50 | [tool.hatch.build.targets.wheel]
51 | packages = ["roam_mcp"]
52 |
53 | [tool.black]
54 | line-length = 100
55 | target-version = ["py39"]
56 |
57 | [tool.isort]
58 | profile = "black"
59 | line_length = 100
60 |
61 | [tool.mypy]
62 | python_version = "3.9"
63 | warn_return_any = true
64 | warn_unused_configs = true
65 | disallow_untyped_defs = true
66 | disallow_incomplete_defs = true
67 |
68 | [tool.pylint.messages_control]
69 | disable = [
70 | "missing-docstring",
71 | "invalid-name"
72 | ]
73 |
74 | [project.urls]
75 | "Homepage" = "https://github.com/PhiloSolares/roam-mcp"
76 | "Bug Tracker" = "https://github.com/PhiloSolares/roam-mcp/issues"
```
--------------------------------------------------------------------------------
/roam_mcp/content_parsers.py:
--------------------------------------------------------------------------------
```python
1 | """External content parsing operations for the Roam MCP server."""
2 |
3 | import os
4 | import tempfile
5 | import logging
6 | from typing import Dict, Any, Optional
7 | import httpx
8 | import trafilatura
9 | from unstructured.partition.pdf import partition_pdf
10 |
11 | # Set up logging
12 | logger = logging.getLogger("roam-mcp.content_parsers")
13 |
14 | async def parse_webpage(url: str) -> Dict[str, Any]:
15 | """
16 | Parse content from a web page URL.
17 |
18 | Args:
19 | url: URL of the webpage to parse
20 |
21 | Returns:
22 | Result with parsed content
23 | """
24 | try:
25 | logger.debug(f"Fetching web page content from: {url}")
26 | downloaded = trafilatura.fetch_url(url)
27 |
28 | if not downloaded:
29 | return {
30 | "success": False,
31 | "error": f"Failed to download content from {url}"
32 | }
33 |
34 | # Extract main content with document structure preserved
35 | content = trafilatura.extract(
36 | downloaded,
37 | output_format='text',
38 | include_links=False,
39 | include_formatting=True
40 | )
41 |
42 | if not content:
43 | return {
44 | "success": False,
45 | "error": f"Failed to extract meaningful content from {url}"
46 | }
47 |
48 | # Get metadata
49 | metadata = trafilatura.extract_metadata(downloaded)
50 | title = metadata.get('title', 'Untitled Page')
51 |
52 | return {
53 | "success": True,
54 | "content": content,
55 | "title": title,
56 | "url": url
57 | }
58 | except Exception as e:
59 | logger.error(f"Error parsing web page: {str(e)}")
60 | return {
61 | "success": False,
62 | "error": f"Error parsing web page: {str(e)}"
63 | }
64 |
65 | async def parse_pdf(url: str) -> Dict[str, Any]:
66 | """
67 | Parse content from a PDF URL.
68 |
69 | Args:
70 | url: URL of the PDF to parse
71 |
72 | Returns:
73 | Result with parsed content
74 | """
75 | try:
76 | logger.debug(f"Fetching PDF content from: {url}")
77 |
78 | # Download the PDF to a temporary file
79 | async with httpx.AsyncClient() as client:
80 | response = await client.get(url, follow_redirects=True)
81 | response.raise_for_status()
82 |
83 | # Check if it's a PDF based on Content-Type
84 | content_type = response.headers.get('Content-Type', '')
85 | if 'application/pdf' not in content_type.lower():
86 | return {
87 | "success": False,
88 | "error": f"URL does not point to a PDF (Content-Type: {content_type})"
89 | }
90 |
91 | # Create a temporary file for the PDF
92 | with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_file:
93 | temp_path = temp_file.name
94 | temp_file.write(response.content)
95 |
96 | # Extract content using unstructured
97 | try:
98 | elements = partition_pdf(
99 | temp_path,
100 | strategy="hi_res",
101 | extract_images=False,
102 | extract_tables=True
103 | )
104 |
105 | # Convert to formatted text while preserving structure
106 | content = "\n\n".join([str(element) for element in elements])
107 | except UnicodeDecodeError:
108 | # Fall back to a simpler strategy if hi_res fails with encoding issues
109 | logger.warning(f"Encountered encoding issues with hi_res strategy, trying fast strategy")
110 | elements = partition_pdf(
111 | temp_path,
112 | strategy="fast",
113 | extract_images=False,
114 | extract_tables=False
115 | )
116 | content = "\n\n".join([str(element) for element in elements])
117 |
118 | # Try to extract a title from the filename in the URL
119 | path_parts = url.split('/')
120 | filename = path_parts[-1].split('?')[0] # Remove query parameters
121 | title = os.path.splitext(filename)[0].replace('-', ' ').replace('_', ' ').title()
122 | if not title:
123 | title = "PDF Document"
124 |
125 | # Clean up temporary file
126 | os.unlink(temp_path)
127 |
128 | return {
129 | "success": True,
130 | "content": content,
131 | "title": title,
132 | "url": url
133 | }
134 | except Exception as e:
135 | logger.error(f"Error parsing PDF: {str(e)}")
136 | # Clean up temporary file if it exists
137 | try:
138 | if 'temp_path' in locals():
139 | os.unlink(temp_path)
140 | except:
141 | pass
142 |
143 | return {
144 | "success": False,
145 | "error": f"Error parsing PDF: {str(e)}"
146 | }
```
--------------------------------------------------------------------------------
/roam_mcp/memory.py:
--------------------------------------------------------------------------------
```python
1 | """Memory system operations for the Roam MCP server."""
2 |
3 | from typing import Dict, List, Any, Optional, Union
4 | from datetime import datetime
5 | import logging
6 |
7 | from roam_mcp.api import (
8 | execute_query,
9 | execute_write_action,
10 | get_session_and_headers,
11 | GRAPH_NAME,
12 | get_daily_page,
13 | add_block_to_page,
14 | MEMORIES_TAG,
15 | ValidationError,
16 | PageNotFoundError,
17 | QueryError
18 | )
19 | from roam_mcp.utils import (
20 | format_roam_date,
21 | resolve_block_references
22 | )
23 |
24 | # Set up logging
25 | logger = logging.getLogger("roam-mcp.memory")
26 |
27 |
28 | def remember(memory: str, categories: Optional[List[str]] = None) -> Dict[str, Any]:
29 | """
30 | Store a memory with the specified MEMORIES_TAG.
31 |
32 | Args:
33 | memory: The memory to store
34 | categories: Optional list of categories to tag the memory with
35 |
36 | Returns:
37 | Result with success status
38 | """
39 | if not memory:
40 | return {
41 | "success": False,
42 | "error": "Memory cannot be empty"
43 | }
44 |
45 | session, headers = get_session_and_headers()
46 |
47 | try:
48 | # Validate and normalize categories
49 | normalized_categories = []
50 | if categories:
51 | # Ensure all categories are strings
52 | invalid_categories = [cat for cat in categories if not isinstance(cat, str)]
53 | if invalid_categories:
54 | return {
55 | "success": False,
56 | "error": "All categories must be strings"
57 | }
58 |
59 | # Normalize category formats
60 | for category in categories:
61 | category = category.strip()
62 | if not category:
63 | continue
64 |
65 | # Remove any existing tag syntax
66 | clean_category = category.replace('#', '').replace('[[', '').replace(']]', '')
67 |
68 | # Add to normalized list
69 | normalized_categories.append(clean_category)
70 |
71 | # Get today's daily page
72 | daily_page_uid = get_daily_page()
73 |
74 | # Format memory with tags
75 | formatted_memory = MEMORIES_TAG
76 |
77 | # Add the memory text
78 | formatted_memory += f" {memory}"
79 |
80 | # Add category tags
81 | for category in normalized_categories:
82 | # Format category as Roam tag
83 | if " " in category or "/" in category:
84 | tag = f"#[[{category}]]"
85 | else:
86 | tag = f"#{category}"
87 |
88 | formatted_memory += f" {tag}"
89 |
90 | # Create memory block
91 | block_uid = add_block_to_page(daily_page_uid, formatted_memory)
92 |
93 | return {
94 | "success": True,
95 | "block_uid": block_uid,
96 | "content": formatted_memory
97 | }
98 | except ValidationError as e:
99 | return {
100 | "success": False,
101 | "error": str(e)
102 | }
103 | except PageNotFoundError as e:
104 | return {
105 | "success": False,
106 | "error": str(e)
107 | }
108 | except Exception as e:
109 | logger.error(f"Error storing memory: {str(e)}")
110 | return {
111 | "success": False,
112 | "error": f"Error storing memory: {str(e)}"
113 | }
114 |
115 |
116 | def recall(sort_by: str = "newest", filter_tag: Optional[str] = None) -> Dict[str, Any]:
117 | """
118 | Recall stored memories, optionally filtered by tag.
119 |
120 | Args:
121 | sort_by: Sort order ("newest" or "oldest")
122 | filter_tag: Optional tag to filter memories by
123 |
124 | Returns:
125 | List of memory contents
126 | """
127 | if sort_by not in ["newest", "oldest"]:
128 | return {
129 | "success": False,
130 | "error": "sort_by must be 'newest' or 'oldest'"
131 | }
132 |
133 | session, headers = get_session_and_headers()
134 |
135 | # Clean and normalize the MEMORIES_TAG for queries
136 | clean_tag = MEMORIES_TAG.replace('#', '').replace('[[', '').replace(']]', '')
137 |
138 | # Prepare filter tag conditions if needed
139 | filter_conditions = ""
140 | if filter_tag:
141 | # Clean and normalize filter tag
142 | clean_filter = filter_tag.replace('#', '').replace('[[', '').replace(']]', '')
143 |
144 | # Generate filter tag variants
145 | filter_variants = []
146 | if " " in clean_filter or "/" in clean_filter:
147 | filter_variants = [f"#{clean_filter}", f"#[[{clean_filter}]]", f"[[{clean_filter}]]"]
148 | else:
149 | filter_variants = [f"#{clean_filter}", f"#[[{clean_filter}]]", f"[[{clean_filter}]]"]
150 |
151 | # Build filter conditions
152 | filter_conditions_list = []
153 | for variant in filter_variants:
154 | filter_conditions_list.append(f'(clojure.string/includes? ?s "{variant}")')
155 |
156 | if filter_conditions_list:
157 | filter_conditions = f" AND (or {' '.join(filter_conditions_list)})"
158 |
159 | try:
160 | logger.debug(f"Recalling memories with sort_by={sort_by}")
161 | if filter_tag:
162 | logger.debug(f"Filtering by tag: {filter_tag}")
163 |
164 | # Method 1: Search for blocks containing the MEMORIES_TAG across the database
165 | # Generate tag variants
166 | tag_variants = []
167 | if " " in clean_tag or "/" in clean_tag:
168 | tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
169 | else:
170 | tag_variants = [f"#{clean_tag}", f"#[[{clean_tag}]]", f"[[{clean_tag}]]"]
171 |
172 | # Build tag conditions
173 | tag_conditions = []
174 | for variant in tag_variants:
175 | tag_conditions.append(f'(clojure.string/includes? ?s "{variant}")')
176 |
177 | tag_condition = f"(or {' '.join(tag_conditions)})"
178 |
179 | # Create combined condition with filter if needed
180 | combined_condition = tag_condition
181 | if filter_conditions:
182 | combined_condition = f"(and {tag_condition}{filter_conditions})"
183 |
184 | # Query blocks with tag
185 | tag_query = f"""[:find ?uid ?s ?time ?page-title
186 | :where
187 | [?b :block/string ?s]
188 | [?b :block/uid ?uid]
189 | [?b :create/time ?time]
190 | [?b :block/page ?p]
191 | [?p :node/title ?page-title]
192 | [{combined_condition}]]"""
193 |
194 | tag_results = execute_query(tag_query)
195 |
196 | # Method 2: Also check for dedicated page with the clean tag name
197 | page_query = f"""[:find ?uid ?s ?time
198 | :where
199 | [?p :node/title "{clean_tag}"]
200 | [?b :block/page ?p]
201 | [?b :block/string ?s]
202 | [?b :block/uid ?uid]
203 | [?b :create/time ?time]]"""
204 |
205 | # Add filter if needed
206 | if filter_conditions:
207 | page_query = f"""[:find ?uid ?s ?time
208 | :where
209 | [?p :node/title "{clean_tag}"]
210 | [?b :block/page ?p]
211 | [?b :block/string ?s]
212 | [?b :block/uid ?uid]
213 | [?b :create/time ?time]
214 | [{filter_conditions.replace('AND ', '')}]]"""
215 |
216 | page_results = execute_query(page_query)
217 |
218 | # Process and combine results
219 | memories = []
220 |
221 | # Process tag results
222 | for uid, content, time, page_title in tag_results:
223 | # Resolve references
224 | resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
225 |
226 | memories.append({
227 | "content": resolved_content,
228 | "time": time,
229 | "page_title": page_title,
230 | "block_uid": uid
231 | })
232 |
233 | # Process page results
234 | for uid, content, time in page_results:
235 | # Resolve references
236 | resolved_content = resolve_block_references(session, headers, GRAPH_NAME, content)
237 |
238 | memories.append({
239 | "content": resolved_content,
240 | "time": time,
241 | "page_title": clean_tag,
242 | "block_uid": uid
243 | })
244 |
245 | # Sort by time
246 | memories.sort(key=lambda x: x["time"], reverse=(sort_by == "newest"))
247 |
248 | # Clean up content - remove the MEMORIES_TAG
249 | for memory in memories:
250 | content = memory["content"]
251 | for variant in tag_variants:
252 | content = content.replace(variant, "")
253 | memory["content"] = content.strip()
254 |
255 | # Remove duplicates while preserving order
256 | seen_contents = set()
257 | unique_memories = []
258 |
259 | for memory in memories:
260 | content = memory["content"]
261 | if content and content not in seen_contents:
262 | seen_contents.add(content)
263 | unique_memories.append(memory)
264 |
265 | # Return just the content strings
266 | memory_contents = [memory["content"] for memory in unique_memories]
267 |
268 | return {
269 | "success": True,
270 | "memories": memory_contents,
271 | "message": f"Found {len(memory_contents)} memories"
272 | }
273 | except QueryError as e:
274 | return {
275 | "success": False,
276 | "error": str(e)
277 | }
278 | except Exception as e:
279 | logger.error(f"Error recalling memories: {str(e)}")
280 | return {
281 | "success": False,
282 | "error": f"Error recalling memories: {str(e)}"
283 | }
```
--------------------------------------------------------------------------------
/roam_mcp/utils.py:
--------------------------------------------------------------------------------
```python
1 | """Utility functions for the Roam MCP server."""
2 |
3 | import re
4 | import logging
5 | from datetime import datetime
6 | from typing import List, Dict, Any, Optional, Set, Match, Tuple, Union
7 | import json
8 | import time
9 | import uuid
10 |
11 | # Set up logging
12 | logger = logging.getLogger("roam-mcp.utils")
13 |
14 | # Date formatting
15 | def format_roam_date(date: Optional[datetime] = None) -> str:
16 | """
17 | Format a date in Roam's preferred format (e.g., "March 25th, 2025").
18 |
19 | Args:
20 | date: The date to format, defaults to today's date
21 |
22 | Returns:
23 | A string in Roam's date format
24 | """
25 | if date is None:
26 | date = datetime.now()
27 |
28 | day = date.day
29 | if 11 <= day <= 13:
30 | suffix = "th"
31 | else:
32 | suffix = {1: "st", 2: "nd", 3: "rd"}.get(day % 10, "th")
33 |
34 | return date.strftime(f"%B %-d{suffix}, %Y")
35 |
36 |
37 | # Regular expressions for markdown elements
38 | MD_BOLD_PATTERN = r'\*\*(.+?)\*\*'
39 | MD_ITALIC_PATTERN = r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)'
40 | MD_ITALIC_UNDERSCORE_PATTERN = r'(?<!_)_(?!_)(.+?)(?<!_)_(?!_)'
41 | MD_HIGHLIGHT_PATTERN = r'==(.+?)=='
42 | MD_LINK_PATTERN = r'\[([^\]]+)\]\(([^)]+)\)'
43 | MD_CODE_BLOCK_PATTERN = r'```([a-zA-Z0-9]*)\s*\n([\s\S]*?)```'
44 | MD_INLINE_CODE_PATTERN = r'`([^`]+)`'
45 |
46 | # Table regex patterns
47 | MD_TABLE_PATTERN = r'(?:\|(.+)\|\s*\n\|(?::?-+:?\|)+\s*\n(?:\|(?:.+)\|\s*\n*)+)'
48 | MD_TABLE_ROW_PATTERN = r'\|(.*)\|'
49 | MD_TABLE_HEADER_PATTERN = r'\|(\s*:?-+:?\s*)\|'
50 | MD_TABLE_ALIGNMENT_PATTERN = r'^(:?)-+(:?)$' # For detecting alignment in table headers
51 |
52 | # Headings pattern
53 | MD_HEADING_PATTERN = r'^(#{1,6})\s+(.+)$'
54 |
55 |
56 | # Markdown conversion utilities
57 | def convert_to_roam_markdown(text: str) -> str:
58 | """
59 | Convert standard markdown to Roam-compatible format.
60 |
61 | Args:
62 | text: Standard markdown text
63 |
64 | Returns:
65 | Roam-formatted markdown text
66 | """
67 | # Convert tables first (they may contain other markdown elements)
68 | text = convert_tables(text)
69 |
70 | # Handle code blocks (must be done before other inline elements)
71 | text = convert_code_blocks(text)
72 |
73 | # Handle double asterisks/underscores (bold)
74 | text = re.sub(MD_BOLD_PATTERN, r'**\1**', text)
75 |
76 | # Handle single asterisks/underscores (italic)
77 | text = re.sub(MD_ITALIC_PATTERN, r'__\1__', text) # Single asterisk to double underscore
78 | text = re.sub(MD_ITALIC_UNDERSCORE_PATTERN, r'__\1__', text) # Single underscore to double underscore
79 |
80 | # Handle highlights
81 | text = re.sub(MD_HIGHLIGHT_PATTERN, r'^^\\1^^', text)
82 |
83 | # Convert tasks
84 | text = re.sub(r'- \[ \]', r'- {{[[TODO]]}}', text)
85 | text = re.sub(r'- \[x\]', r'- {{[[DONE]]}}', text)
86 |
87 | # Convert links
88 | text = re.sub(MD_LINK_PATTERN, r'[\1](\2)', text)
89 |
90 | # Handle headings (convert to Roam's heading format)
91 | text = convert_headings(text)
92 |
93 | # Handle inline code
94 | text = re.sub(MD_INLINE_CODE_PATTERN, r'`\1`', text)
95 |
96 | return text
97 |
98 |
99 | def convert_headings(text: str) -> str:
100 | """
101 | Convert markdown headings to Roam's heading format.
102 |
103 | Args:
104 | text: Markdown text with potential headings
105 |
106 | Returns:
107 | Text with headings converted to Roam format
108 | """
109 | def heading_replacer(match: Match) -> str:
110 | level = len(match.group(1)) # Number of # characters
111 | content = match.group(2).strip()
112 |
113 | # For text format, we'll just keep the heading text and let block attributes
114 | # handle the actual heading level in Roam
115 | return content
116 |
117 | # Process line by line to avoid matching # in code blocks
118 | lines = text.split('\n')
119 | for i, line in enumerate(lines):
120 | heading_match = re.match(MD_HEADING_PATTERN, line)
121 | if heading_match:
122 | lines[i] = heading_replacer(heading_match)
123 |
124 | return '\n'.join(lines)
125 |
126 |
127 | def convert_code_blocks(text: str) -> str:
128 | """
129 | Convert markdown code blocks while preserving language and indentation.
130 |
131 | Args:
132 | text: Markdown text with potential code blocks
133 |
134 | Returns:
135 | Text with code blocks properly formatted
136 | """
137 | def code_block_replacer(match: Match) -> str:
138 | language = match.group(1).strip()
139 | code_content = match.group(2)
140 |
141 | # Preserve language info
142 | language_tag = f"{language}\n" if language else "\n"
143 |
144 | # Clean up indentation
145 | lines = code_content.split('\n')
146 | # Find the common indentation level
147 | non_empty_lines = [line for line in lines if line.strip()]
148 | if non_empty_lines:
149 | common_indent = min(len(line) - len(line.lstrip()) for line in non_empty_lines)
150 | # Remove common indentation
151 | code_content = '\n'.join(line[common_indent:] if line.strip() else line for line in lines)
152 |
153 | return f"```{language_tag}{code_content}```"
154 |
155 | return re.sub(MD_CODE_BLOCK_PATTERN, code_block_replacer, text)
156 |
157 |
158 | def convert_tables(text: str) -> str:
159 | """
160 | Convert markdown tables to Roam format.
161 |
162 | Args:
163 | text: Markdown text with potential tables
164 |
165 | Returns:
166 | Text with tables converted to Roam format
167 | """
168 | def table_replacer(match: Match) -> str:
169 | table_text = match.group(0)
170 |
171 | # Find all rows
172 | rows = re.findall(MD_TABLE_ROW_PATTERN, table_text)
173 | if len(rows) < 2: # Need at least header and separator
174 | return table_text
175 |
176 | # First row is header, second is separator, rest are data
177 | header_cells = [cell.strip() for cell in rows[0].split('|') if cell.strip()]
178 | separator_cells = [cell.strip() for cell in rows[1].split('|') if cell.strip()]
179 |
180 | # Determine column alignments from separator row
181 | alignments = []
182 | for sep in separator_cells:
183 | alignment_match = re.match(MD_TABLE_ALIGNMENT_PATTERN, sep)
184 | if alignment_match:
185 | left_colon = bool(alignment_match.group(1))
186 | right_colon = bool(alignment_match.group(2))
187 |
188 | if left_colon and right_colon:
189 | alignments.append("center")
190 | elif right_colon:
191 | alignments.append("right")
192 | else:
193 | alignments.append("left")
194 | else:
195 | alignments.append("left") # Default alignment
196 |
197 | # Generate Roam table format
198 | roam_table = "{{table}}\n"
199 |
200 | # Add header row
201 | for i, header in enumerate(header_cells):
202 | indent = " " * (i + 1)
203 | roam_table += f"{indent}- {header}\n"
204 |
205 | # Add data rows - start from index 2 to skip header and separator
206 | for row_idx in range(2, len(rows)):
207 | data_cells = [cell.strip() for cell in rows[row_idx].split('|') if cell.strip()]
208 |
209 | for i, cell in enumerate(data_cells):
210 | if i < len(header_cells): # Only process cells that have a corresponding header
211 | indent = " " * (i + 1)
212 | roam_table += f"{indent}- {cell}\n"
213 |
214 | return roam_table
215 |
216 | return re.sub(MD_TABLE_PATTERN, table_replacer, text)
217 |
218 |
219 | class MarkdownNode:
220 | """Class representing a node in the markdown parsing tree."""
221 | def __init__(self, content: str, level: int = 0, heading_level: int = 0):
222 | self.content = content
223 | self.level = level
224 | self.heading_level = heading_level
225 | self.children = []
226 |
227 | def add_child(self, node: 'MarkdownNode') -> None:
228 | """Add a child node to this node."""
229 | self.children.append(node)
230 |
231 | def to_dict(self) -> Dict[str, Any]:
232 | """Convert node to dictionary representation."""
233 | result = {
234 | "text": self.content,
235 | "level": self.level
236 | }
237 |
238 | if self.heading_level:
239 | result["heading_level"] = self.heading_level
240 |
241 | if self.children:
242 | result["children"] = [child.to_dict() for child in self.children]
243 |
244 | return result
245 |
246 |
247 | def parse_markdown_list(markdown: str) -> List[Dict[str, Any]]:
248 | """
249 | Parse a markdown list into a hierarchical structure.
250 |
251 | Args:
252 | markdown: Markdown text with nested lists
253 |
254 | Returns:
255 | List of dictionaries with 'text', 'level', and 'children' keys
256 | """
257 | # Convert markdown syntax first
258 | markdown = convert_to_roam_markdown(markdown)
259 |
260 | lines = markdown.split('\n')
261 | root = MarkdownNode("ROOT", -1) # Root node to hold all top-level items
262 | node_stack = [root]
263 | current_level = -1
264 | in_code_block = False
265 | code_block_content = []
266 | code_block_indent = 0
267 |
268 | for line_idx, line in enumerate(lines):
269 | if not line.strip() and not in_code_block:
270 | continue
271 |
272 | # Handle code blocks
273 | if "```" in line and not in_code_block:
274 | # Start of code block
275 | in_code_block = True
276 | code_block_content = [line]
277 | # Store the indentation level
278 | code_block_indent = len(line) - len(line.lstrip())
279 | continue
280 | elif "```" in line and in_code_block:
281 | # End of code block - process the entire block
282 | code_block_content.append(line)
283 |
284 | # Calculate the level based on indentation
285 | level = code_block_indent // 2
286 |
287 | # Join the content with proper line breaks
288 | content = "\n".join(code_block_content)
289 |
290 | # Create a node for the code block
291 | node = MarkdownNode(content, level)
292 |
293 | # Find the right parent for this node
294 | while len(node_stack) > 1 and node_stack[-1].level >= level:
295 | node_stack.pop()
296 |
297 | # Add to parent
298 | node_stack[-1].add_child(node)
299 |
300 | # Update stack and level
301 | node_stack.append(node)
302 | current_level = level
303 |
304 | # Reset code block state
305 | in_code_block = False
306 | code_block_content = []
307 | continue
308 | elif in_code_block:
309 | # In a code block - just collect the line
310 | code_block_content.append(line)
311 | continue
312 |
313 | # Check for heading
314 | heading_match = re.match(MD_HEADING_PATTERN, line)
315 | if heading_match:
316 | level = 0 # Headings are top-level
317 | heading_text = heading_match.group(2).strip()
318 | heading_level = len(heading_match.group(1)) # Number of # characters
319 |
320 | # Reset stack for headings
321 | while len(node_stack) > 1:
322 | node_stack.pop()
323 |
324 | # Create heading node
325 | node = MarkdownNode(heading_text, level, heading_level)
326 | node_stack[-1].add_child(node)
327 | node_stack.append(node)
328 | current_level = level
329 | continue
330 |
331 | # Regular list items
332 | match = re.match(r'^(\s*)[-*+]\s+(.+)$', line)
333 | if match:
334 | indent, content = match.groups()
335 | level = len(indent) // 2 + 1 # Convert indentation to level, starting with 1
336 |
337 | # Check for TODO/DONE
338 | if "{{[[TODO]]}}" in content or "{{[[DONE]]}}" in content:
339 | level_to_append = level
340 | else:
341 | level_to_append = level
342 |
343 | # Pop stack until we find parent level
344 | while len(node_stack) > 1 and node_stack[-1].level >= level:
345 | node_stack.pop()
346 |
347 | # Create new node
348 | node = MarkdownNode(content, level_to_append)
349 | node_stack[-1].add_child(node)
350 | node_stack.append(node)
351 | current_level = level
352 | else:
353 | # Non-list line - treat as continuation of previous item or as top-level text
354 | content = line.strip()
355 | if content and current_level >= 0 and len(node_stack) > 1:
356 | # Add to the current node's content
357 | node_stack[-1].content += "\n" + content
358 | elif content:
359 | # Create as top-level text
360 | node = MarkdownNode(content, 0)
361 | node_stack[0].add_child(node)
362 | node_stack = [root, node]
363 | current_level = 0
364 |
365 | # Convert the tree to the expected dictionary format with proper hierarchy
366 | def build_hierarchy(node):
367 | """Convert a node and its children to a hierarchical dictionary structure."""
368 | result = {
369 | "text": node.content,
370 | "level": node.level
371 | }
372 |
373 | if node.heading_level:
374 | result["heading_level"] = node.heading_level
375 |
376 | if node.children:
377 | result["children"] = [build_hierarchy(child) for child in node.children]
378 |
379 | return result
380 |
381 | # Build result with correct hierarchy
382 | hierarchical_result = []
383 | for child in root.children:
384 | hierarchical_result.append(build_hierarchy(child))
385 |
386 | # We'll now convert this to the flattened format for backward compatibility
387 | # while preserving hierarchy information for functions that can use it
388 | flattened_result = []
389 |
390 | def flatten_hierarchy(item, parent_level=-1, path=None):
391 | """Flatten a hierarchical structure while preserving parent-child information."""
392 | if path is None:
393 | path = []
394 |
395 | # Get item properties
396 | text = item["text"]
397 | level = item.get("level", parent_level + 1)
398 | heading_level = item.get("heading_level", 0)
399 |
400 | # Create the flattened item
401 | flat_item = {
402 | "text": text,
403 | "level": level
404 | }
405 |
406 | if heading_level:
407 | flat_item["heading_level"] = heading_level
408 |
409 | # Add path information for reconstructing hierarchy
410 | flat_item["_path"] = path.copy()
411 |
412 | # Add to results
413 | flattened_result.append(flat_item)
414 |
415 | # Process children
416 | children = item.get("children", [])
417 | if children:
418 | for i, child in enumerate(children):
419 | child_path = path + [i]
420 | flatten_hierarchy(child, level, child_path)
421 |
422 | # Flatten the hierarchical result
423 | for i, item in enumerate(hierarchical_result):
424 | flatten_hierarchy(item, -1, [i])
425 |
426 | # We return the flattened result but with _path information
427 | # for reconstructing hierarchy if needed
428 | return flattened_result
429 |
430 |
431 | def convert_roam_dates(text: str) -> str:
432 | """
433 | Convert date references to Roam date format.
434 |
435 | Args:
436 | text: Text with potential date references
437 |
438 | Returns:
439 | Text with dates in Roam format
440 | """
441 | # Convert ISO dates (YYYY-MM-DD)
442 | def replace_date(match: Match) -> str:
443 | date_str = match.group(0)
444 | try:
445 | date = datetime.strptime(date_str, "%Y-%m-%d")
446 | return format_roam_date(date)
447 | except ValueError:
448 | return date_str
449 |
450 | return re.sub(r'\b\d{4}-\d{2}-\d{2}\b', replace_date, text)
451 |
452 |
453 | def extract_youtube_video_id(url: str) -> Optional[str]:
454 | """
455 | Extract the video ID from a YouTube URL.
456 |
457 | Args:
458 | url: YouTube URL
459 |
460 | Returns:
461 | Video ID or None if not found
462 | """
463 | patterns = [
464 | r"(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})",
465 | r"youtube\.com\/embed\/([a-zA-Z0-9_-]{11})",
466 | r"youtube\.com\/v\/([a-zA-Z0-9_-]{11})",
467 | r"youtube\.com\/user\/[^\/]+\/\?v=([a-zA-Z0-9_-]{11})"
468 | ]
469 |
470 | for pattern in patterns:
471 | match = re.search(pattern, url)
472 | if match:
473 | return match.group(1)
474 |
475 | return None
476 |
477 |
478 | def detect_url_type(url: str) -> str:
479 | """
480 | Detect the type of content a URL points to.
481 |
482 | Args:
483 | url: URL to analyze
484 |
485 | Returns:
486 | Content type: 'youtube', 'pdf', 'webpage', or 'unknown'
487 | """
488 | url_lower = url.lower()
489 |
490 | # Check for YouTube
491 | youtube_patterns = [
492 | r"(?:youtube\.com\/watch\?v=|youtu\.be\/)",
493 | r"youtube\.com\/embed\/",
494 | r"youtube\.com\/v\/",
495 | r"youtube\.com\/user\/[^\/]+\/\?v="
496 | ]
497 | for pattern in youtube_patterns:
498 | if re.search(pattern, url_lower):
499 | return "youtube"
500 |
501 | # Check for PDF
502 | if url_lower.endswith('.pdf') or '/pdf/' in url_lower:
503 | return "pdf"
504 |
505 | # Default to webpage
506 | return "webpage"
507 |
508 |
509 | def create_block_action(parent_uid: str, content: str, order: Union[int, str] = "last",
510 | uid: Optional[str] = None, heading: Optional[int] = None) -> Dict[str, Any]:
511 | """
512 | Create a block action for batch operations.
513 |
514 | Args:
515 | parent_uid: UID of the parent block/page
516 | content: Block content
517 | order: Position of the block
518 | uid: Optional UID for the block
519 | heading: Optional heading level (1-3)
520 |
521 | Returns:
522 | Block action dictionary
523 | """
524 | block_data = {
525 | "string": content
526 | }
527 |
528 | if uid:
529 | block_data["uid"] = uid
530 | else:
531 | # Generate a unique UID if none provided
532 | block_data["uid"] = str(uuid.uuid4())[:9]
533 |
534 | if heading and heading > 0 and heading <= 3:
535 | block_data["heading"] = heading
536 |
537 | action = {
538 | "action": "create-block",
539 | "location": {
540 | "parent-uid": parent_uid,
541 | "order": order
542 | },
543 | "block": block_data
544 | }
545 |
546 | logger.debug(f"Created block action for parent {parent_uid}: {content[:30]}{'...' if len(content) > 30 else ''}")
547 | return action
548 |
549 |
550 | def process_nested_content(content: List[Dict], parent_uid: str, session, headers, graph_name: str) -> List[str]:
551 | """
552 | Recursively process nested content structure and create blocks.
553 |
554 | Args:
555 | content: List of content items with potential children
556 | parent_uid: UID of the parent block
557 | session: Active session for API requests
558 | headers: Request headers with authentication
559 | graph_name: Roam graph name
560 |
561 | Returns:
562 | List of created block UIDs
563 | """
564 | from roam_mcp.api import execute_batch_actions # Import here to avoid circular imports
565 |
566 | if not content:
567 | return []
568 |
569 | # Sort content by level
570 | content = sorted(content, key=lambda x: x.get("level", 0))
571 |
572 | # Create batch actions
573 | batch_actions = []
574 | level_parent_map = {0: parent_uid}
575 |
576 | # Process items level by level (top-down)
577 | for item in content:
578 | level = item.get("level", 0)
579 | text = item.get("text", "")
580 | heading_level = item.get("heading_level", 0)
581 |
582 | # Find parent for this level
583 | parent_level = level - 1
584 | if parent_level < 0:
585 | parent_level = 0
586 |
587 | parent_for_item = level_parent_map.get(parent_level, parent_uid)
588 |
589 | # Create block action
590 | action = create_block_action(
591 | parent_uid=parent_for_item,
592 | content=text,
593 | order="last",
594 | heading=heading_level
595 | )
596 |
597 | batch_actions.append(action)
598 |
599 | # Add temp ID for this level for child reference
600 | level_parent_map[level] = f"temp_{len(batch_actions)-1}"
601 |
602 | # Execute the batch
603 | result = execute_batch_actions(batch_actions)
604 | return result.get("created_uids", [])
605 |
606 |
607 | def find_block_uid(session, headers, graph_name: str, block_content: str, max_retries: int = 3) -> Optional[str]:
608 | """
609 | Search for a block by its content to find its UID with retries.
610 |
611 | Args:
612 | session: Active session for API requests
613 | headers: Request headers with authentication
614 | graph_name: Roam graph name
615 | block_content: Content to search for
616 | max_retries: Maximum number of retries
617 |
618 | Returns:
619 | Block UID or None if not found
620 | """
621 | # Escape quotes in content
622 | escaped_content = block_content.replace('"', '\\"')
623 |
624 | for attempt in range(max_retries):
625 | search_query = f'''[:find ?uid .
626 | :where [?e :block/string "{escaped_content}"]
627 | [?e :block/uid ?uid]]'''
628 |
629 | response = session.post(
630 | f'https://api.roamresearch.com/api/graph/{graph_name}/q',
631 | headers=headers,
632 | json={"query": search_query}
633 | )
634 |
635 | if response.status_code == 200 and response.json().get('result'):
636 | block_uid = response.json()['result']
637 | return block_uid
638 |
639 | # If not found and not the last attempt, wait and retry
640 | if attempt < max_retries - 1:
641 | wait_time = 1 * (attempt + 1) # Exponential backoff
642 | logger.debug(f"Block not found, retrying in {wait_time}s (attempt {attempt+1}/{max_retries})")
643 | time.sleep(wait_time)
644 |
645 | logger.debug(f"Could not find block UID for content: {block_content[:50]}...")
646 | return None
647 |
648 |
649 | def find_page_by_title(session, headers, graph_name: str, title: str) -> Optional[str]:
650 | """
651 | Find a page by title, with case-insensitive matching.
652 |
653 | Args:
654 | session: Active session for API requests
655 | headers: Request headers with authentication
656 | graph_name: Roam graph name
657 | title: Page title to search for
658 |
659 | Returns:
660 | Page UID or None if not found
661 | """
662 | # Clean up the title
663 | title = title.strip()
664 |
665 | # First try direct page lookup (more reliable than case-insensitive queries in Roam)
666 | query = f'''[:find ?uid .
667 | :where [?e :node/title "{title}"]
668 | [?e :block/uid ?uid]]'''
669 |
670 | response = session.post(
671 | f'https://api.roamresearch.com/api/graph/{graph_name}/q',
672 | headers=headers,
673 | json={"query": query}
674 | )
675 |
676 | if response.status_code == 200 and response.json().get('result'):
677 | return response.json()['result']
678 |
679 | # If not found, try checking if it's a UID
680 | if len(title) == 9 and re.match(r'^[a-zA-Z0-9_-]{9}$', title):
681 | # This looks like a UID, check if it's a valid page UID
682 | uid_query = f'''[:find ?title .
683 | :where [?e :block/uid "{title}"]
684 | [?e :node/title ?title]]'''
685 |
686 | uid_response = session.post(
687 | f'https://api.roamresearch.com/api/graph/{graph_name}/q',
688 | headers=headers,
689 | json={"query": uid_query}
690 | )
691 |
692 | if uid_response.status_code == 200 and uid_response.json().get('result'):
693 | return title
694 |
695 | # If still not found, try case-insensitive match by getting all pages
696 | all_pages_query = f'''[:find ?title ?uid
697 | :where [?e :node/title ?title]
698 | [?e :block/uid ?uid]]'''
699 |
700 | all_pages_response = session.post(
701 | f'https://api.roamresearch.com/api/graph/{graph_name}/q',
702 | headers=headers,
703 | json={"query": all_pages_query}
704 | )
705 |
706 | if all_pages_response.status_code == 200 and all_pages_response.json().get('result'):
707 | for page_title, uid in all_pages_response.json()['result']:
708 | if page_title.lower() == title.lower():
709 | return uid
710 |
711 | return None
712 |
713 |
714 | def resolve_block_references(session, headers, graph_name: str, content: str, max_depth: int = 3, current_depth: int = 0) -> str:
715 | """
716 | Resolve block references in content recursively.
717 |
718 | Args:
719 | session: Active session for API requests
720 | headers: Request headers with authentication
721 | graph_name: Roam graph name
722 | content: Content with potential block references
723 | max_depth: Maximum recursion depth
724 | current_depth: Current recursion depth
725 |
726 | Returns:
727 | Content with block references resolved
728 | """
729 | if current_depth >= max_depth:
730 | return content
731 |
732 | # Find all block references
733 | ref_pattern = r'\(\(([a-zA-Z0-9_-]{9})\)\)'
734 | refs = re.findall(ref_pattern, content)
735 |
736 | if not refs:
737 | return content
738 |
739 | # For each reference, get its content
740 | for ref in refs:
741 | try:
742 | query = f'''[:find ?string .
743 | :where [?b :block/uid "{ref}"]
744 | [?b :block/string ?string]]'''
745 |
746 | response = session.post(
747 | f'https://api.roamresearch.com/api/graph/{graph_name}/q',
748 | headers=headers,
749 | json={"query": query}
750 | )
751 |
752 | if response.status_code == 200 and response.json().get('result'):
753 | ref_content = response.json()['result']
754 |
755 | # Recursively resolve nested references
756 | resolved_ref = resolve_block_references(
757 | session, headers, graph_name,
758 | ref_content, max_depth, current_depth + 1
759 | )
760 |
761 | # Replace reference with content
762 | content = content.replace(f"(({ref}))", resolved_ref)
763 | except Exception as e:
764 | logger.warning(f"Failed to resolve reference (({ref})): {str(e)}")
765 |
766 | return content
```
--------------------------------------------------------------------------------
/roam_mcp/api.py:
--------------------------------------------------------------------------------
```python
1 | """Core API functions for interacting with Roam Research."""
2 |
3 | import os
4 | import re
5 | import sys
6 | import logging
7 | from typing import Dict, List, Any, Optional, Union, Set, Tuple, Callable
8 | import requests
9 | from datetime import datetime
10 | import json
11 | import time
12 | from functools import wraps
13 |
14 | from roam_mcp.utils import (
15 | format_roam_date,
16 | find_block_uid,
17 | find_page_by_title,
18 | process_nested_content,
19 | resolve_block_references
20 | )
21 |
22 | # Set up logging
23 | logger = logging.getLogger("roam-mcp.api")
24 |
25 | # Get API credentials from environment variables
26 | API_TOKEN = os.environ.get("ROAM_API_TOKEN")
27 | GRAPH_NAME = os.environ.get("ROAM_GRAPH_NAME")
28 | MEMORIES_TAG = os.environ.get("MEMORIES_TAG", "#[[Memories]]")
29 |
30 | # Validate API credentials
31 | if not API_TOKEN:
32 | logger.warning("ROAM_API_TOKEN environment variable is not set")
33 |
34 | if not GRAPH_NAME:
35 | logger.warning("ROAM_GRAPH_NAME environment variable is not set")
36 |
37 |
38 | # Enhanced Error Hierarchy
39 | class RoamAPIError(Exception):
40 | """Base exception for all Roam API errors."""
41 | def __init__(self, message: str, code: Optional[str] = None, details: Optional[Dict] = None, remediation: Optional[str] = None):
42 | self.message = message
43 | self.code = code or "UNKNOWN_ERROR"
44 | self.details = details or {}
45 | self.remediation = remediation
46 | super().__init__(self._format_message())
47 |
48 | def _format_message(self) -> str:
49 | msg = f"{self.code}: {self.message}"
50 | if self.details:
51 | msg += f" - Details: {json.dumps(self.details)}"
52 | if self.remediation:
53 | msg += f" - Suggestion: {self.remediation}"
54 | return msg
55 |
56 |
57 | class AuthenticationError(RoamAPIError):
58 | """Exception raised for authentication errors."""
59 | def __init__(self, message: str, details: Optional[Dict] = None):
60 | super().__init__(
61 | message=message,
62 | code="AUTH_ERROR",
63 | details=details,
64 | remediation="Check your API token and graph name in environment variables."
65 | )
66 |
67 |
68 | class PageNotFoundError(RoamAPIError):
69 | """Exception raised when a page cannot be found."""
70 | def __init__(self, title: str, details: Optional[Dict] = None):
71 | super().__init__(
72 | message=f"Page '{title}' not found",
73 | code="PAGE_NOT_FOUND",
74 | details=details,
75 | remediation="Check the page title for typos or create the page first."
76 | )
77 |
78 |
79 | class BlockNotFoundError(RoamAPIError):
80 | """Exception raised when a block cannot be found."""
81 | def __init__(self, uid: str, details: Optional[Dict] = None):
82 | super().__init__(
83 | message=f"Block with UID '{uid}' not found",
84 | code="BLOCK_NOT_FOUND",
85 | details=details,
86 | remediation="Check the block UID for accuracy."
87 | )
88 |
89 |
90 | class ValidationError(RoamAPIError):
91 | """Exception raised for input validation errors."""
92 | def __init__(self, message: str, param: str, details: Optional[Dict] = None):
93 | super().__init__(
94 | message=message,
95 | code="VALIDATION_ERROR",
96 | details={"parameter": param, **(details or {})},
97 | remediation="Check the input parameters and correct the formatting."
98 | )
99 |
100 |
101 | class QueryError(RoamAPIError):
102 | """Exception raised for query execution errors."""
103 | def __init__(self, message: str, query: str, details: Optional[Dict] = None):
104 | super().__init__(
105 | message=message,
106 | code="QUERY_ERROR",
107 | details={"query": query, **(details or {})},
108 | remediation="Check the query syntax or parameters."
109 | )
110 |
111 |
112 | class RateLimitError(RoamAPIError):
113 | """Exception raised when rate limits are exceeded."""
114 | def __init__(self, message: str, details: Optional[Dict] = None):
115 | super().__init__(
116 | message=message,
117 | code="RATE_LIMIT_ERROR",
118 | details=details,
119 | remediation="Retry after a delay or reduce the request frequency."
120 | )
121 |
122 |
123 | class TransactionError(RoamAPIError):
124 | """Exception raised for transaction failures."""
125 | def __init__(self, message: str, action_type: str, details: Optional[Dict] = None):
126 | super().__init__(
127 | message=message,
128 | code="TRANSACTION_ERROR",
129 | details={"action_type": action_type, **(details or {})},
130 | remediation="Check the action data or retry the operation."
131 | )
132 |
133 |
134 | class PreserveAuthSession(requests.Session):
135 | """Session class that preserves authentication headers during redirects."""
136 | def rebuild_auth(self, prepared_request, response):
137 | """Preserve the Authorization header on redirects."""
138 | return
139 |
140 |
141 | # Retry decorator for API calls
142 | def retry_on_error(max_retries=3, base_delay=1, backoff_factor=2, retry_on=(RateLimitError, requests.exceptions.RequestException)):
143 | """
144 | Decorator to retry API calls with exponential backoff.
145 |
146 | Args:
147 | max_retries: Maximum number of retry attempts
148 | base_delay: Initial delay in seconds
149 | backoff_factor: Multiplier for delay on each retry
150 | retry_on: Tuple of exception types to retry on
151 | """
152 | def decorator(func):
153 | @wraps(func)
154 | def wrapper(*args, **kwargs):
155 | retries = 0
156 | while True:
157 | try:
158 | return func(*args, **kwargs)
159 | except retry_on as e:
160 | retries += 1
161 | if retries > max_retries:
162 | logger.error(f"Maximum retries ({max_retries}) exceeded: {str(e)}")
163 | raise
164 |
165 | delay = base_delay * (backoff_factor ** (retries - 1))
166 | logger.warning(f"Retrying after error: {str(e)}. Attempt {retries}/{max_retries} in {delay:.2f}s")
167 | time.sleep(delay)
168 | return wrapper
169 | return decorator
170 |
171 |
172 | def validate_credentials():
173 | """
174 | Validate that required API credentials are set.
175 |
176 | Raises:
177 | AuthenticationError: If required credentials are missing
178 | """
179 | if not API_TOKEN or not GRAPH_NAME:
180 | missing = []
181 | if not API_TOKEN:
182 | missing.append("ROAM_API_TOKEN")
183 | if not GRAPH_NAME:
184 | missing.append("ROAM_GRAPH_NAME")
185 |
186 | raise AuthenticationError(
187 | f"Missing required credentials: {', '.join(missing)}",
188 | {"missing": missing}
189 | )
190 |
191 |
192 | def get_session_and_headers() -> Tuple[requests.Session, Dict[str, str]]:
193 | """
194 | Create a session with authentication headers.
195 |
196 | Returns:
197 | Tuple of (session, headers)
198 |
199 | Raises:
200 | AuthenticationError: If required environment variables are missing
201 | """
202 | validate_credentials()
203 |
204 | session = PreserveAuthSession()
205 | headers = {
206 | "Accept": "application/json",
207 | "Authorization": f"Bearer {API_TOKEN}",
208 | "Content-Type": "application/json",
209 | }
210 |
211 | return session, headers
212 |
213 |
214 | @retry_on_error()
215 | def execute_query(query: str, inputs: Optional[List[Any]] = None) -> Any:
216 | """
217 | Execute a Datalog query against the Roam graph.
218 |
219 | Args:
220 | query: Datalog query string
221 | inputs: Optional list of query inputs
222 |
223 | Returns:
224 | Query results
225 |
226 | Raises:
227 | QueryError: If the query fails
228 | AuthenticationError: If authentication fails
229 | RateLimitError: If rate limits are exceeded
230 | """
231 | validate_credentials()
232 | session, headers = get_session_and_headers()
233 |
234 | # Prepare query data
235 | data = {
236 | "query": query,
237 | }
238 | if inputs:
239 | data["inputs"] = inputs
240 |
241 | # Log query (without inputs for security)
242 | logger.debug(f"Executing query: {query}")
243 |
244 | # Execute query
245 | try:
246 | response = session.post(
247 | f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/q',
248 | headers=headers,
249 | json=data
250 | )
251 |
252 | if response.status_code == 401:
253 | raise AuthenticationError("Authentication failed", {"status_code": response.status_code})
254 |
255 | if response.status_code == 429:
256 | raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code})
257 |
258 | response.raise_for_status()
259 | result = response.json().get('result')
260 |
261 | # Log result size
262 | if isinstance(result, list):
263 | logger.debug(f"Query returned {len(result)} results")
264 |
265 | return result
266 | except requests.RequestException as e:
267 | error_msg = f"Query failed: {str(e)}"
268 | error_details = {}
269 |
270 | if hasattr(e, 'response') and e.response:
271 | error_details["status_code"] = e.response.status_code
272 | try:
273 | error_details["response"] = e.response.json()
274 | except:
275 | error_details["response_text"] = e.response.text[:500]
276 |
277 | # Classify error based on status code if available
278 | if hasattr(e, 'response') and e.response:
279 | if e.response.status_code == 401:
280 | raise AuthenticationError("Authentication failed", error_details) from e
281 | elif e.response.status_code == 429:
282 | raise RateLimitError("Rate limit exceeded", error_details) from e
283 |
284 | logger.error(error_msg, extra={"details": error_details})
285 | raise QueryError(error_msg, query, error_details) from e
286 |
287 |
288 | @retry_on_error()
289 | def execute_write_action(action_data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]:
290 | """
291 | Execute a write action or a batch of actions on the Roam graph.
292 |
293 | Args:
294 | action_data: The action data to write or a list of actions for batch operation
295 |
296 | Returns:
297 | Response data
298 |
299 | Raises:
300 | TransactionError: If the write action fails
301 | AuthenticationError: If authentication fails
302 | RateLimitError: If rate limits are exceeded
303 | """
304 | validate_credentials()
305 | session, headers = get_session_and_headers()
306 |
307 | # Check if it's a batch operation or single action
308 | is_batch = isinstance(action_data, list)
309 |
310 | # If it's a batch operation, wrap it in a batch container
311 | if is_batch:
312 | # Log batch size
313 | logger.debug(f"Executing batch write action with {len(action_data)} operations")
314 |
315 | # Group operations by type for debugging
316 | action_types = {}
317 | for action in action_data:
318 | action_type = action.get("action", "unknown")
319 | if action_type in action_types:
320 | action_types[action_type] += 1
321 | else:
322 | action_types[action_type] = 1
323 |
324 | logger.debug(f"Batch operation types: {action_types}")
325 |
326 | # Prepare batch action
327 | batch_data = {
328 | "action": "batch-actions",
329 | "actions": action_data
330 | }
331 |
332 | action_type = "batch-actions"
333 | operation_data = batch_data
334 | else:
335 | # Log action type
336 | action_type = action_data.get("action", "unknown")
337 | logger.debug(f"Executing write action: {action_type}")
338 | operation_data = action_data
339 |
340 | # Debug log the operation data
341 | logger.debug(f"Sending data: {json.dumps(operation_data)[:100]}...")
342 |
343 | # Execute action
344 | try:
345 | response = session.post(
346 | f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/write',
347 | headers=headers,
348 | json=operation_data # Use json parameter for proper JSON encoding
349 | )
350 |
351 | logger.debug(f"Status code: {response.status_code}")
352 | logger.debug(f"Response headers: {dict(response.headers)}")
353 |
354 | if response.status_code == 401:
355 | raise AuthenticationError("Authentication failed", {"status_code": response.status_code})
356 |
357 | if response.status_code == 429:
358 | raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code})
359 |
360 | # Special handling for empty responses
361 | if response.status_code == 200 and not response.text:
362 | logger.debug("Received empty response with status 200 (success)")
363 | return {"success": True}
364 |
365 | response.raise_for_status()
366 |
367 | # Try to parse JSON response
368 | try:
369 | result = response.json()
370 | logger.debug(f"Response: {json.dumps(result)[:500]}")
371 |
372 | # Success even with error message for batch operations that partly succeed
373 | if "batch-error-message" in result and "num-actions-successfully-transacted-before-failure" in result:
374 | num_success = result.get("num-actions-successfully-transacted-before-failure", 0)
375 | logger.debug(f"Batch partially succeeded with {num_success} actions before failure")
376 | return result
377 |
378 | return result
379 | except json.JSONDecodeError:
380 | # Some successful operations return empty responses
381 | if 200 <= response.status_code < 300:
382 | logger.debug("Success with non-JSON response")
383 | return {"success": True}
384 | else:
385 | logger.debug(f"Failed to parse response as JSON: {response.text[:500]}")
386 | raise TransactionError(
387 | f"Failed to parse response as JSON",
388 | action_type,
389 | {"response_text": response.text[:500]}
390 | )
391 |
392 | except requests.RequestException as e:
393 | error_details = {}
394 |
395 | if hasattr(e, 'response') and e.response:
396 | error_details["status_code"] = e.response.status_code
397 | try:
398 | error_details["response"] = e.response.json()
399 | except:
400 | error_details["response_text"] = e.response.text[:500]
401 |
402 | # Classify error based on status code if available
403 | if hasattr(e, 'response') and e.response:
404 | if e.response.status_code == 401:
405 | raise AuthenticationError("Authentication failed", error_details) from e
406 | elif e.response.status_code == 429:
407 | raise RateLimitError("Rate limit exceeded", error_details) from e
408 |
409 | error_msg = f"Write action failed: {str(e)}"
410 | logger.error(error_msg, extra={"details": error_details})
411 | raise TransactionError(error_msg, action_type, error_details) from e
412 |
413 |
414 | def execute_batch_actions(actions: List[Dict[str, Any]], chunk_size: int = 50) -> Dict[str, Any]:
415 | """
416 | Execute a batch of actions, optionally chunking into multiple requests.
417 |
418 | Args:
419 | actions: List of actions to execute
420 | chunk_size: Maximum number of actions per request
421 |
422 | Returns:
423 | Combined results of all batch operations
424 |
425 | Raises:
426 | TransactionError: If any batch fails
427 | """
428 | if not actions:
429 | return {"success": True, "created_uids": []}
430 |
431 | # Single batch if under chunk size
432 | if len(actions) <= chunk_size:
433 | result = execute_write_action(actions)
434 |
435 | # Check for tempids-to-uids mapping in response
436 | if "tempids-to-uids" in result:
437 | return {"success": True, "created_uids": list(result["tempids-to-uids"].values())}
438 | elif "successful" in result and result["successful"]:
439 | return {"success": True, "created_uids": []}
440 | else:
441 | return result
442 |
443 | # Split into chunks for larger batches
444 | chunks = [actions[i:i + chunk_size] for i in range(0, len(actions), chunk_size)]
445 | logger.debug(f"Splitting batch operation into {len(chunks)} chunks of max {chunk_size} actions")
446 |
447 | # Track results across chunks
448 | combined_results = {
449 | "created_uids": [],
450 | "success": True
451 | }
452 |
453 | # Track temporary and real UIDs for parent-child relationships
454 | temp_uid_map = {}
455 |
456 | # Execute each chunk
457 | for i, chunk in enumerate(chunks):
458 | logger.debug(f"Executing batch chunk {i+1}/{len(chunks)} with {len(chunk)} actions")
459 |
460 | # Update parent UIDs with real UIDs from previous chunks
461 | if i > 0 and temp_uid_map:
462 | for action in chunk:
463 | if action["action"] == "create-block":
464 | parent_uid = action["location"]["parent-uid"]
465 | if parent_uid.startswith("temp_") and parent_uid in temp_uid_map:
466 | action["location"]["parent-uid"] = temp_uid_map[parent_uid]
467 |
468 | result = execute_write_action(chunk)
469 |
470 | # Collect UIDs from this chunk
471 | created_uids = []
472 | if "tempids-to-uids" in result:
473 | created_uids = list(result["tempids-to-uids"].values())
474 |
475 | if created_uids:
476 | # Map temp UIDs to real UIDs for next chunks
477 | if i < len(chunks) - 1:
478 | for j, uid in enumerate(created_uids):
479 | temp_key = f"temp_{i}_{j}"
480 | temp_uid_map[temp_key] = uid
481 |
482 | combined_results["created_uids"].extend(created_uids)
483 |
484 | # Add delay between batches to ensure ordering
485 | if i < len(chunks) - 1:
486 | time.sleep(0.5)
487 |
488 | return combined_results
489 |
490 |
491 | def find_or_create_page(title: str) -> str:
492 | """
493 | Find a page by title or create it if it doesn't exist.
494 |
495 | Args:
496 | title: Page title
497 |
498 | Returns:
499 | Page UID
500 |
501 | Raises:
502 | TransactionError: If page creation fails
503 | ValidationError: If title is invalid
504 | AuthenticationError: If authentication fails
505 | """
506 | validate_credentials()
507 | session, headers = get_session_and_headers()
508 |
509 | # Validate title
510 | if not title or not isinstance(title, str):
511 | raise ValidationError("Page title must be a non-empty string", "title")
512 |
513 | title = title.strip()
514 | if not title:
515 | raise ValidationError("Page title cannot be empty or just whitespace", "title")
516 |
517 | # Try to find the page first
518 | logger.debug(f"Looking for page: {title}")
519 | query = f'''[:find ?uid .
520 | :where [?e :node/title "{title}"]
521 | [?e :block/uid ?uid]]'''
522 |
523 | page_uid = execute_query(query)
524 |
525 | if page_uid:
526 | logger.debug(f"Found existing page: {title} (UID: {page_uid})")
527 | return page_uid
528 |
529 | # Create the page if it doesn't exist
530 | logger.debug(f"Creating new page: {title}")
531 | action_data = {
532 | "action": "create-page",
533 | "page": {"title": title}
534 | }
535 |
536 | try:
537 | response = execute_write_action(action_data)
538 |
539 | if response.get("success", False):
540 | # Wait a moment for the page to be created
541 | time.sleep(1)
542 |
543 | # Try to find the page again
544 | page_uid = execute_query(query)
545 | if page_uid:
546 | logger.debug(f"Created page: {title} (UID: {page_uid})")
547 | return page_uid
548 |
549 | # If still not found, try one more time with a longer delay
550 | time.sleep(2)
551 | page_uid = execute_query(query)
552 | if page_uid:
553 | logger.debug(f"Found newly created page: {title} (UID: {page_uid})")
554 | return page_uid
555 |
556 | # If we get here, something went wrong
557 | error_msg = f"Failed to create page: {title}"
558 | logger.error(error_msg)
559 | raise TransactionError(error_msg, "create-page", {"title": title, "response": response})
560 | except TransactionError:
561 | # Rethrow existing TransactionError
562 | raise
563 | except Exception as e:
564 | error_msg = f"Failed to create page: {title}"
565 | logger.error(error_msg)
566 | raise TransactionError(error_msg, "create-page", {"title": title, "error": str(e)}) from e
567 |
568 |
569 | def get_daily_page() -> str:
570 | """
571 | Get or create today's daily page.
572 |
573 | Returns:
574 | Daily page UID
575 |
576 | Raises:
577 | TransactionError: If page creation fails
578 | """
579 | today = datetime.now()
580 | date_str = format_roam_date(today)
581 |
582 | logger.debug(f"Getting daily page for: {date_str}")
583 | return find_or_create_page(date_str)
584 |
585 |
586 | def add_block_to_page(page_uid: str, content: str, order: Union[int, str] = "last") -> Optional[str]:
587 | """
588 | Add a block to a page.
589 |
590 | Args:
591 | page_uid: Parent page UID
592 | content: Block content
593 | order: Position ("first", "last", or integer index)
594 |
595 | Returns:
596 | New block UID or None if creation failed
597 |
598 | Raises:
599 | BlockNotFoundError: If page does not exist
600 | ValidationError: If parameters are invalid
601 | TransactionError: If block creation fails
602 | """
603 | # Validate parameters
604 | if not page_uid:
605 | raise ValidationError("Parent page UID is required", "page_uid")
606 |
607 | if not content:
608 | raise ValidationError("Block content cannot be empty", "content")
609 |
610 | # Generate a unique block UID
611 | import uuid
612 | block_uid = str(uuid.uuid4())[:9]
613 |
614 | action_data = {
615 | "action": "create-block",
616 | "location": {
617 | "parent-uid": page_uid,
618 | "order": order
619 | },
620 | "block": {
621 | "string": content,
622 | "uid": block_uid
623 | }
624 | }
625 |
626 | logger.debug(f"Adding block to page {page_uid}")
627 | try:
628 | result = execute_write_action(action_data)
629 |
630 | if result.get("success", False):
631 | # Add a brief delay to ensure the block is created
632 | time.sleep(1)
633 |
634 | # Verify the block exists
635 | session, headers = get_session_and_headers()
636 | found_uid = find_block_uid(session, headers, GRAPH_NAME, content)
637 |
638 | if found_uid:
639 | logger.debug(f"Created block with UID: {found_uid}")
640 | return found_uid
641 |
642 | # If we couldn't find the UID by content, return the one we generated
643 | logger.debug(f"Block created but couldn't verify, returning generated UID: {block_uid}")
644 | return block_uid
645 | else:
646 | logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}")
647 | return None
648 | except Exception as e:
649 | if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)):
650 | raise
651 |
652 | error_msg = f"Failed to create block: {str(e)}"
653 | logger.error(error_msg)
654 | raise TransactionError(error_msg, "create-block", {"page_uid": page_uid}) from e
655 |
656 |
657 | def update_block(block_uid: str, content: str) -> bool:
658 | """
659 | Update a block's content.
660 |
661 | Args:
662 | block_uid: Block UID
663 | content: New content
664 |
665 | Returns:
666 | Success flag
667 |
668 | Raises:
669 | BlockNotFoundError: If block does not exist
670 | ValidationError: If parameters are invalid
671 | TransactionError: If block update fails
672 | """
673 | # Validate parameters
674 | if not block_uid:
675 | raise ValidationError("Block UID is required", "block_uid")
676 |
677 | if content is None:
678 | raise ValidationError("Block content cannot be None", "content")
679 |
680 | action_data = {
681 | "action": "update-block",
682 | "block": {
683 | "uid": block_uid,
684 | "string": content
685 | }
686 | }
687 |
688 | logger.debug(f"Updating block: {block_uid}")
689 | try:
690 | execute_write_action(action_data)
691 | return True
692 | except Exception as e:
693 | if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)):
694 | raise
695 |
696 | error_msg = f"Failed to update block: {str(e)}"
697 | logger.error(error_msg)
698 | raise TransactionError(error_msg, "update-block", {"block_uid": block_uid}) from e
699 |
700 |
701 | def transform_block(block_uid: str, find_pattern: str, replace_with: str, global_replace: bool = True) -> str:
702 | """
703 | Transform a block's content using regex pattern replacement.
704 |
705 | Args:
706 | block_uid: Block UID
707 | find_pattern: Regex pattern to find
708 | replace_with: Text to replace with
709 | global_replace: Whether to replace all occurrences
710 |
711 | Returns:
712 | Updated content
713 |
714 | Raises:
715 | BlockNotFoundError: If block does not exist
716 | ValidationError: If parameters are invalid
717 | QueryError: If block retrieval fails
718 | TransactionError: If block update fails
719 | """
720 | # Validate parameters
721 | if not block_uid:
722 | raise ValidationError("Block UID is required", "block_uid")
723 |
724 | if not find_pattern:
725 | raise ValidationError("Find pattern cannot be empty", "find_pattern")
726 |
727 | # First get the current content
728 | query = f'''[:find ?string .
729 | :where [?b :block/uid "{block_uid}"]
730 | [?b :block/string ?string]]'''
731 |
732 | logger.debug(f"Getting content for block: {block_uid}")
733 | try:
734 | current_content = execute_query(query)
735 |
736 | if not current_content:
737 | raise BlockNotFoundError(block_uid)
738 |
739 | # Apply transformation
740 | logger.debug(f"Transforming block {block_uid} with pattern: {find_pattern}")
741 | flags = re.MULTILINE
742 | count = 0 if global_replace else 1
743 |
744 | try:
745 | new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags)
746 | except re.error as e:
747 | raise ValidationError(f"Invalid regex pattern: {str(e)}", "find_pattern", {"pattern": find_pattern})
748 |
749 | # Update the block
750 | update_block(block_uid, new_content)
751 |
752 | return new_content
753 | except (BlockNotFoundError, ValidationError, QueryError, TransactionError):
754 | # Rethrow existing errors
755 | raise
756 | except Exception as e:
757 | error_msg = f"Failed to transform block: {str(e)}"
758 | logger.error(error_msg)
759 | raise TransactionError(error_msg, "transform-block", {"block_uid": block_uid}) from e
760 |
761 |
762 | def batch_update_blocks(updates: List[Dict[str, Any]], chunk_size: int = 50) -> List[Dict[str, Any]]:
763 | """
764 | Update multiple blocks in a single operation.
765 |
766 | Args:
767 | updates: List of update operations
768 | chunk_size: Maximum number of actions per batch
769 |
770 | Returns:
771 | List of results
772 |
773 | Raises:
774 | ValidationError: If updates are not valid
775 | """
776 | if not isinstance(updates, list):
777 | raise ValidationError("Updates must be a list", "updates")
778 |
779 | if not updates:
780 | return []
781 |
782 | session, headers = get_session_and_headers()
783 | results = []
784 | batch_actions = []
785 |
786 | logger.debug(f"Batch updating {len(updates)} blocks")
787 |
788 | # Validate each update and prepare batch actions
789 | for i, update in enumerate(updates):
790 | try:
791 | block_uid = update.get("block_uid")
792 | if not block_uid:
793 | results.append({"success": False, "error": "Missing block_uid"})
794 | continue
795 |
796 | # Check block exists
797 | query = f'''[:find ?string .
798 | :where [?b :block/uid "{block_uid}"]
799 | [?b :block/string ?string]]'''
800 |
801 | current_content = execute_query(query)
802 | if not current_content:
803 | results.append({
804 | "success": False,
805 | "block_uid": block_uid,
806 | "error": f"Block with UID {block_uid} not found"
807 | })
808 | continue
809 |
810 | # Handle direct content update
811 | if "content" in update:
812 | batch_actions.append({
813 | "action": "update-block",
814 | "block": {
815 | "uid": block_uid,
816 | "string": update["content"]
817 | }
818 | })
819 |
820 | results.append({
821 | "success": True,
822 | "block_uid": block_uid,
823 | "content": update["content"]
824 | })
825 | # Handle pattern transformation
826 | elif "transform" in update:
827 | transform = update["transform"]
828 |
829 | try:
830 | find_pattern = transform["find"]
831 | replace_with = transform["replace"]
832 | global_replace = transform.get("global", True)
833 |
834 | # Apply transformation
835 | flags = re.MULTILINE
836 | count = 0 if global_replace else 1
837 | new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags)
838 |
839 | batch_actions.append({
840 | "action": "update-block",
841 | "block": {
842 | "uid": block_uid,
843 | "string": new_content
844 | }
845 | })
846 |
847 | results.append({
848 | "success": True,
849 | "block_uid": block_uid,
850 | "content": new_content
851 | })
852 | except re.error as e:
853 | results.append({
854 | "success": False,
855 | "block_uid": block_uid,
856 | "error": f"Invalid regex pattern: {str(e)}"
857 | })
858 | except KeyError as e:
859 | results.append({
860 | "success": False,
861 | "block_uid": block_uid,
862 | "error": f"Missing required transform key: {str(e)}"
863 | })
864 | else:
865 | results.append({
866 | "success": False,
867 | "block_uid": block_uid,
868 | "error": "Neither content nor transform provided"
869 | })
870 | except Exception as e:
871 | logger.error(f"Error preparing update for block {update.get('block_uid', 'unknown')}: {str(e)}")
872 | results.append({
873 | "success": False,
874 | "block_uid": update.get("block_uid", "unknown"),
875 | "error": str(e)
876 | })
877 |
878 | # Execute batch updates if we have any valid actions
879 | if batch_actions:
880 | try:
881 | execute_batch_actions(batch_actions, chunk_size)
882 | except Exception as e:
883 | logger.error(f"Error executing batch update: {str(e)}")
884 | # Mark all previously successful results as failed
885 | for result in results:
886 | if result.get("success"):
887 | result["success"] = False
888 | result["error"] = f"Batch update failed: {str(e)}"
889 |
890 | # Log success rate
891 | successful = sum(1 for r in results if r.get("success"))
892 | logger.debug(f"Batch update completed: {successful}/{len(updates)} successful")
893 |
894 | return results
895 |
896 |
897 | def get_page_content(title: str, resolve_refs: bool = True, max_depth: int = 5) -> str:
898 | """
899 | Get the content of a page with optional block reference resolution.
900 |
901 | Args:
902 | title: Page title
903 | resolve_refs: Whether to resolve block references
904 | max_depth: Maximum depth of nested blocks to retrieve (default: 5)
905 |
906 | Returns:
907 | Page content as markdown
908 |
909 | Raises:
910 | PageNotFoundError: If page retrieval fails
911 | QueryError: If query execution fails
912 | """
913 | session, headers = get_session_and_headers()
914 |
915 | # First find the page UID
916 | logger.debug(f"Getting content for page: {title}")
917 | page_uid = find_page_by_title(session, headers, GRAPH_NAME, title)
918 |
919 | if not page_uid:
920 | raise PageNotFoundError(title)
921 |
922 | # Build block hierarchy iteratively
923 | block_map = {}
924 | top_level_blocks = []
925 |
926 | # Query to get immediate children of a parent (page or block)
927 | def get_children(parent_uid: str, depth: int = 0) -> None:
928 | if depth >= max_depth:
929 | return
930 |
931 | query = f"""[:find ?uid ?string ?order
932 | :where
933 | [?parent :block/uid "{parent_uid}"]
934 | [?parent :block/children ?child]
935 | [?child :block/uid ?uid]
936 | [?child :block/string ?string]
937 | [?child :block/order ?order]]"""
938 |
939 | try:
940 | results = execute_query(query)
941 | if not results:
942 | return
943 |
944 | for uid, content, order in results:
945 | # Resolve references if requested
946 | if resolve_refs:
947 | content = resolve_block_references(session, headers, GRAPH_NAME, content)
948 |
949 | # Create block object
950 | block = {
951 | "uid": uid,
952 | "content": content,
953 | "order": order,
954 | "children": []
955 | }
956 |
957 | block_map[uid] = block
958 |
959 | # Add to top-level or parent's children
960 | if parent_uid == page_uid:
961 | top_level_blocks.append(block)
962 | elif parent_uid in block_map:
963 | block_map[parent_uid]["children"].append(block)
964 |
965 | # Recursively fetch children
966 | get_children(uid, depth + 1)
967 |
968 | except QueryError as e:
969 | logger.warning(f"Failed to fetch children for {parent_uid}: {str(e)}")
970 | raise
971 |
972 | try:
973 | # Start with the page's top-level blocks
974 | get_children(page_uid)
975 |
976 | if not top_level_blocks:
977 | logger.debug(f"No content found on page: {title}")
978 | return f"# {title}\n\nNo content found on this page."
979 |
980 | # Sort blocks by order
981 | def sort_blocks(blocks):
982 | blocks.sort(key=lambda b: b["order"])
983 | for block in blocks:
984 | sort_blocks(block["children"])
985 |
986 | sort_blocks(top_level_blocks)
987 |
988 | # Convert to markdown
989 | markdown = f"# {title}\n\n"
990 |
991 | def blocks_to_md(blocks, level=0):
992 | result = ""
993 | for block in blocks:
994 | indent = " " * level
995 | result += f"{indent}- {block['content']}\n"
996 | if block["children"]:
997 | result += blocks_to_md(block["children"], level + 1)
998 | return result
999 |
1000 | markdown += blocks_to_md(top_level_blocks)
1001 |
1002 | logger.debug(f"Retrieved page content for: {title}")
1003 | return markdown
1004 | except QueryError:
1005 | # Rethrow existing QueryError
1006 | raise
1007 | except Exception as e:
1008 | error_msg = f"Failed to get page content: {str(e)}"
1009 | logger.error(error_msg)
1010 | raise QueryError(error_msg, "Iterative child fetch", {"page_title": title, "page_uid": page_uid}) from e
```
--------------------------------------------------------------------------------
/roam_mcp/server.py:
--------------------------------------------------------------------------------
```python
1 | """Core server module for Roam MCP server."""
2 |
3 | import os
4 | import sys
5 | import logging
6 | import traceback
7 | from typing import Dict, List, Any, Optional, Union
8 | from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled
9 | from mcp.server.fastmcp import FastMCP
10 | from datetime import datetime
11 |
12 | # Import operations
13 | from roam_mcp.api import (
14 | API_TOKEN,
15 | GRAPH_NAME,
16 | MEMORIES_TAG,
17 | get_page_content,
18 | ValidationError,
19 | QueryError,
20 | PageNotFoundError,
21 | BlockNotFoundError,
22 | TransactionError,
23 | AuthenticationError,
24 | RateLimitError
25 | )
26 | from roam_mcp.search import (
27 | search_by_text,
28 | search_by_tag,
29 | search_by_status,
30 | search_block_refs,
31 | search_hierarchy,
32 | search_by_date,
33 | find_pages_modified_today,
34 | execute_datomic_query
35 | )
36 | from roam_mcp.content import (
37 | create_page,
38 | create_block,
39 | create_outline,
40 | import_markdown,
41 | add_todos,
42 | update_content,
43 | update_multiple_contents
44 | )
45 | from roam_mcp.memory import (
46 | remember,
47 | recall
48 | )
49 | from roam_mcp.utils import (
50 | extract_youtube_video_id,
51 | detect_url_type
52 | )
53 | from roam_mcp.content_parsers import parse_webpage, parse_pdf
54 |
55 | # Initialize FastMCP server
56 | mcp = FastMCP("roam-research")
57 |
58 | # Configure logging
59 | logger = logging.getLogger("roam-mcp")
60 |
61 |
62 | def setup_logging(verbose=False):
63 | """Configure logging with appropriate level of detail."""
64 | log_level = logging.DEBUG if verbose else logging.INFO
65 |
66 | # Configure root logger
67 | root_logger = logging.getLogger()
68 | root_logger.setLevel(log_level)
69 |
70 | # Clear any existing handlers
71 | for handler in root_logger.handlers[:]:
72 | root_logger.removeHandler(handler)
73 |
74 | # Add console handler
75 | console_handler = logging.StreamHandler(sys.stderr)
76 | console_handler.setLevel(log_level)
77 | formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
78 | console_handler.setFormatter(formatter)
79 | root_logger.addHandler(console_handler)
80 |
81 |
82 | def validate_environment():
83 | """Validate that required environment variables are set."""
84 | if not API_TOKEN or not GRAPH_NAME:
85 | missing = []
86 | if not API_TOKEN:
87 | missing.append("ROAM_API_TOKEN")
88 | if not GRAPH_NAME:
89 | missing.append("ROAM_GRAPH_NAME")
90 |
91 | error_msg = f"""
92 | Missing required environment variables: {', '.join(missing)}
93 |
94 | Please configure these variables either:
95 | 1. In your MCP settings file:
96 | - For Claude: ~/Library/Application Support/Claude/claude_desktop_config.json
97 | - For Cline: ~/Library/Application Support/Code/User/globalStorage/saoudrizwan.claude-dev/settings/cline_mcp_settings.json
98 |
99 | Example configuration:
100 | {{
101 | "mcpServers": {{
102 | "roam-helper": {{
103 | "command": "uvx",
104 | "args": ["git+https://github.com/PhiloSolares/roam-mcp.git"],
105 | "env": {{
106 | "ROAM_API_TOKEN": "your-api-token",
107 | "ROAM_GRAPH_NAME": "your-graph-name"
108 | }}
109 | }}
110 | }}
111 | }}
112 |
113 | 2. Or in a .env file in the roam-mcp directory:
114 | ROAM_API_TOKEN=your-api-token
115 | ROAM_GRAPH_NAME=your-graph-name
116 | """
117 | logger.error(error_msg)
118 | return False
119 |
120 | return True
121 |
122 |
123 | def format_error_response(error: Exception) -> str:
124 | """Format an error for user-friendly display."""
125 | if isinstance(error, ValidationError):
126 | return f"Validation error: {str(error)}"
127 | elif isinstance(error, PageNotFoundError):
128 | return f"Page not found: {str(error)}"
129 | elif isinstance(error, BlockNotFoundError):
130 | return f"Block not found: {str(error)}"
131 | elif isinstance(error, QueryError):
132 | return f"Query error: {str(error)}"
133 | elif isinstance(error, TransactionError):
134 | return f"Transaction error: {str(error)}"
135 | elif isinstance(error, AuthenticationError):
136 | return f"Authentication error: {str(error)}"
137 | elif isinstance(error, RateLimitError):
138 | return f"Rate limit exceeded: {str(error)}"
139 | else:
140 | return f"Error: {str(error)}"
141 |
142 |
143 | @mcp.tool()
144 | async def search_roam(search_terms: List[str]) -> str:
145 | """Search Roam database for content containing the specified terms.
146 |
147 | Args:
148 | search_terms: List of keywords to search for
149 | """
150 | if not validate_environment():
151 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
152 |
153 | try:
154 | if not search_terms:
155 | return "Please provide at least one search term"
156 |
157 | all_results = []
158 | for term in search_terms:
159 | result = search_by_text(term)
160 | if result["success"]:
161 | all_results.extend(result["matches"])
162 |
163 | # Limit to 3000 words
164 | word_count = 0
165 | max_word_count = 3000
166 | filtered_results = []
167 |
168 | for match in all_results:
169 | content = match["content"]
170 | block_word_count = len(content.split())
171 |
172 | if word_count + block_word_count <= max_word_count:
173 | filtered_results.append(f"Page: {match.get('page_title', 'Unknown')}\n{content}")
174 | word_count += block_word_count
175 | else:
176 | break
177 |
178 | if not filtered_results:
179 | return f"No results found for terms: {', '.join(search_terms)}"
180 |
181 | return "\n\n".join(filtered_results)
182 | except Exception as e:
183 | logger.error(f"Error searching Roam: {str(e)}", exc_info=True)
184 | return format_error_response(e)
185 |
186 |
187 | @mcp.tool()
188 | async def roam_fetch_page_by_title(title: str) -> str:
189 | """Retrieve complete page contents by exact title, including all nested blocks and resolved block references.
190 |
191 | Args:
192 | title: Title of the page
193 | """
194 | if not validate_environment():
195 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
196 |
197 | try:
198 | if not title:
199 | return "Error: title is required"
200 |
201 | content = get_page_content(title)
202 | return content
203 | except Exception as e:
204 | logger.error(f"Error fetching page: {str(e)}", exc_info=True)
205 | return format_error_response(e)
206 |
207 |
208 | @mcp.tool()
209 | async def roam_create_page(title: str, content: Optional[List[Dict[str, Any]]] = None) -> str:
210 | """Create a new page in Roam Research with optional content using explicit nesting levels.
211 |
212 | Args:
213 | title: Title of the new page
214 | content: Initial content for the page as an array of blocks with explicit nesting levels.
215 | Each block must have a 'text' field with the content as a string.
216 | Example:
217 | [
218 | {"text": "Heading", "level": 0},
219 | {"text": "Bullet point", "level": 1},
220 | {"text": "Another point", "level": 1, "children": [
221 | {"text": "Nested point", "level": 2}
222 | ]}
223 | ]
224 | """
225 | if not validate_environment():
226 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
227 |
228 | try:
229 | if not title:
230 | return "Error: title is required"
231 |
232 | result = create_page(title, content)
233 | if result["success"]:
234 | return f"Page created successfully: {result['page_url']}"
235 | else:
236 | return f"Error creating page: {result.get('error', 'Unknown error')}"
237 | except Exception as e:
238 | logger.error(f"Error creating page: {str(e)}", exc_info=True)
239 | return format_error_response(e)
240 |
241 |
242 | @mcp.tool()
243 | async def roam_create_block(content: str, page_uid: Optional[str] = None, title: Optional[str] = None) -> str:
244 | """Add a new block to an existing Roam page. If no page specified, adds to today's daily note.
245 |
246 | Args:
247 | content: Content of the block
248 | page_uid: Optional: UID of the page to add block to
249 | title: Optional: Title of the page to add block to
250 | """
251 | if not validate_environment():
252 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
253 |
254 | try:
255 | if not content:
256 | return "Error: content is required"
257 |
258 | result = create_block(content, page_uid, title)
259 | if result["success"]:
260 | block_uid = result.get("block_uid", "unknown")
261 | parent_uid = result.get("parent_uid", "unknown")
262 | return f"Block created successfully with UID: {block_uid} under parent: {parent_uid}"
263 | else:
264 | return f"Error creating block: {result.get('error', 'Unknown error')}"
265 | except Exception as e:
266 | logger.error(f"Error creating block: {str(e)}", exc_info=True)
267 | return format_error_response(e)
268 |
269 |
270 | @mcp.tool()
271 | async def roam_create_outline(outline: List[Dict[str, Any]], page_title_uid: Optional[str] = None, block_text_uid: Optional[str] = None) -> str:
272 | """Add a structured outline to an existing page or block with customizable nesting levels.
273 |
274 | Args:
275 | outline: Array of outline items with block text and explicit nesting level
276 | page_title_uid: Title or UID of the page. Leave blank to use the default daily page
277 | block_text_uid: A title heading for the outline or the UID of the block under which content will be nested
278 | """
279 | if not validate_environment():
280 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
281 |
282 | try:
283 | if not outline:
284 | return "Error: outline is required and cannot be empty"
285 |
286 | result = create_outline(outline, page_title_uid, block_text_uid)
287 | if result["success"]:
288 | created_count = len(result.get("created_uids", []))
289 | page_uid = result.get("page_uid", "unknown")
290 | parent_uid = result.get("parent_uid", "unknown")
291 | return f"Outline created successfully with {created_count} blocks on page {page_uid} under parent {parent_uid}"
292 | else:
293 | return f"Error creating outline: {result.get('error', 'Unknown error')}"
294 | except Exception as e:
295 | logger.error(f"Error creating outline: {str(e)}", exc_info=True)
296 | return format_error_response(e)
297 |
298 |
299 | @mcp.tool()
300 | async def roam_import_markdown(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None,
301 | parent_uid: Optional[str] = None, parent_string: Optional[str] = None,
302 | order: str = "last") -> str:
303 | """Import nested markdown content into Roam under a specific block.
304 |
305 | Args:
306 | content: Nested markdown content to import
307 | page_uid: Optional: UID of the page containing the parent block
308 | page_title: Optional: Title of the page containing the parent block
309 | parent_uid: Optional: UID of the parent block to add content under
310 | parent_string: Optional: Exact string content of the parent block to add content under
311 | order: Optional: Where to add the content under the parent ("first" or "last")
312 | """
313 | if not validate_environment():
314 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
315 |
316 | try:
317 | if not content:
318 | return "Error: content is required and cannot be empty"
319 |
320 | result = import_markdown(content, page_uid, page_title, parent_uid, parent_string, order)
321 | if result["success"]:
322 | created_count = len(result.get("created_uids", []))
323 | page_uid = result.get("page_uid", "unknown")
324 | parent_uid = result.get("parent_uid", "unknown")
325 | return f"Markdown imported successfully with {created_count} blocks on page {page_uid} under parent {parent_uid}"
326 | else:
327 | return f"Error importing markdown: {result.get('error', 'Unknown error')}"
328 | except Exception as e:
329 | logger.error(f"Error importing markdown: {str(e)}", exc_info=True)
330 | return format_error_response(e)
331 |
332 |
333 | @mcp.tool()
334 | async def roam_add_todo(todos: List[str]) -> str:
335 | """Add a list of todo items as individual blocks to today's daily page in Roam.
336 |
337 | Args:
338 | todos: List of todo items to add
339 | """
340 | if not validate_environment():
341 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
342 |
343 | try:
344 | if not todos:
345 | return "Error: todos list cannot be empty"
346 |
347 | result = add_todos(todos)
348 | if result["success"]:
349 | return f"Added {len(todos)} todo items to today's daily page"
350 | else:
351 | return f"Error adding todos: {result.get('error', 'Unknown error')}"
352 | except Exception as e:
353 | logger.error(f"Error adding todos: {str(e)}", exc_info=True)
354 | return format_error_response(e)
355 |
356 |
357 | @mcp.tool()
358 | async def roam_search_for_tag(primary_tag: str, page_title_uid: Optional[str] = None, near_tag: Optional[str] = None) -> str:
359 | """Search for blocks containing a specific tag and optionally filter by blocks that also contain another tag nearby.
360 |
361 | Args:
362 | primary_tag: The main tag to search for (without the [[ ]] brackets)
363 | page_title_uid: Optional: Title or UID of the page to search in
364 | near_tag: Optional: Another tag to filter results by - will only return blocks where both tags appear
365 | """
366 | if not validate_environment():
367 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
368 |
369 | try:
370 | if not primary_tag:
371 | return "Error: primary_tag is required"
372 |
373 | result = search_by_tag(primary_tag, page_title_uid, near_tag)
374 | if result["success"]:
375 | # Format the results
376 | formatted = f"{result['message']}\n\n"
377 |
378 | for match in result["matches"]:
379 | page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
380 | formatted += f"- {match['content']}{page_info}\n"
381 |
382 | return formatted
383 | else:
384 | return f"Error searching for tag: {result.get('message', 'Unknown error')}"
385 | except Exception as e:
386 | logger.error(f"Error searching for tag: {str(e)}", exc_info=True)
387 | return format_error_response(e)
388 |
389 |
390 | @mcp.tool()
391 | async def roam_search_by_status(status: str, page_title_uid: Optional[str] = None,
392 | include: Optional[str] = None, exclude: Optional[str] = None) -> str:
393 | """Search for blocks with a specific status (TODO/DONE) across all pages or within a specific page.
394 |
395 | Args:
396 | status: Status to search for (TODO or DONE)
397 | page_title_uid: Optional: Title or UID of the page to search in
398 | include: Optional: Comma-separated list of terms to filter results by inclusion
399 | exclude: Optional: Comma-separated list of terms to filter results by exclusion
400 | """
401 | if not validate_environment():
402 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
403 |
404 | try:
405 | if not status or status not in ["TODO", "DONE"]:
406 | return "Error: status must be either 'TODO' or 'DONE'"
407 |
408 | result = search_by_status(status, page_title_uid, include, exclude)
409 | if result["success"]:
410 | # Format the results
411 | formatted = f"{result['message']}\n\n"
412 |
413 | for match in result["matches"]:
414 | page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
415 | formatted += f"- {match['content']}{page_info}\n"
416 |
417 | return formatted
418 | else:
419 | return f"Error searching by status: {result.get('message', 'Unknown error')}"
420 | except Exception as e:
421 | logger.error(f"Error searching by status: {str(e)}", exc_info=True)
422 | return format_error_response(e)
423 |
424 |
425 | @mcp.tool()
426 | async def roam_search_block_refs(block_uid: Optional[str] = None, page_title_uid: Optional[str] = None) -> str:
427 | """Search for block references within a page or across the entire graph.
428 |
429 | Args:
430 | block_uid: Optional: UID of the block to find references to
431 | page_title_uid: Optional: Title or UID of the page to search in
432 | """
433 | if not validate_environment():
434 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
435 |
436 | try:
437 | result = search_block_refs(block_uid, page_title_uid)
438 | if result["success"]:
439 | # Format the results
440 | formatted = f"{result['message']}\n\n"
441 |
442 | for match in result["matches"]:
443 | page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
444 | formatted += f"- {match['content']}{page_info}\n"
445 |
446 | return formatted
447 | else:
448 | return f"Error searching block references: {result.get('message', 'Unknown error')}"
449 | except Exception as e:
450 | logger.error(f"Error searching block references: {str(e)}", exc_info=True)
451 | return format_error_response(e)
452 |
453 |
454 | @mcp.tool()
455 | async def roam_search_hierarchy(parent_uid: Optional[str] = None, child_uid: Optional[str] = None,
456 | page_title_uid: Optional[str] = None, max_depth: int = 1) -> str:
457 | """Search for parent or child blocks in the block hierarchy.
458 |
459 | Args:
460 | parent_uid: Optional: UID of the block to find children of
461 | child_uid: Optional: UID of the block to find parents of
462 | page_title_uid: Optional: Title or UID of the page to search in
463 | max_depth: Optional: How many levels deep to search (default: 1)
464 | """
465 | if not validate_environment():
466 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
467 |
468 | try:
469 | if not parent_uid and not child_uid:
470 | return "Error: Either parent_uid or child_uid must be provided"
471 |
472 | result = search_hierarchy(parent_uid, child_uid, page_title_uid, max_depth)
473 | if result["success"]:
474 | # Format the results
475 | formatted = f"{result['message']}\n\n"
476 |
477 | for match in result["matches"]:
478 | page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
479 | depth_info = f" (depth: {match['depth']})"
480 | formatted += f"- {match['content']}{page_info}{depth_info}\n"
481 |
482 | return formatted
483 | else:
484 | return f"Error searching hierarchy: {result.get('message', 'Unknown error')}"
485 | except Exception as e:
486 | logger.error(f"Error searching hierarchy: {str(e)}", exc_info=True)
487 | return format_error_response(e)
488 |
489 |
490 | @mcp.tool()
491 | async def roam_find_pages_modified_today(max_num_pages: int = 50) -> str:
492 | """Find pages that have been modified today (since midnight).
493 |
494 | Args:
495 | max_num_pages: Max number of pages to retrieve (default: 50)
496 | """
497 | if not validate_environment():
498 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
499 |
500 | try:
501 | if max_num_pages < 1:
502 | return "Error: max_num_pages must be at least 1"
503 |
504 | result = find_pages_modified_today(max_num_pages)
505 | if result["success"]:
506 | # Format the results
507 | formatted = f"{result['message']}\n\n"
508 |
509 | for page in result["pages"]:
510 | formatted += f"- {page}\n"
511 |
512 | return formatted
513 | else:
514 | return f"Error finding modified pages: {result.get('message', 'Unknown error')}"
515 | except Exception as e:
516 | logger.error(f"Error finding modified pages: {str(e)}", exc_info=True)
517 | return format_error_response(e)
518 |
519 |
520 | @mcp.tool()
521 | async def roam_search_by_text(text: str, page_title_uid: Optional[str] = None) -> str:
522 | """Search for blocks containing specific text across all pages or within a specific page.
523 |
524 | Args:
525 | text: The text to search for
526 | page_title_uid: Optional: Title or UID of the page to search in
527 | """
528 | if not validate_environment():
529 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
530 |
531 | try:
532 | if not text:
533 | return "Error: text is required"
534 |
535 | result = search_by_text(text, page_title_uid)
536 | if result["success"]:
537 | # Format the results
538 | formatted = f"{result['message']}\n\n"
539 |
540 | for match in result["matches"]:
541 | page_info = f" (in page: {match['page_title']})" if "page_title" in match else ""
542 | formatted += f"- {match['content']}{page_info}\n"
543 |
544 | return formatted
545 | else:
546 | return f"Error searching by text: {result.get('message', 'Unknown error')}"
547 | except Exception as e:
548 | logger.error(f"Error searching by text: {str(e)}", exc_info=True)
549 | return format_error_response(e)
550 |
551 |
552 | @mcp.tool()
553 | async def roam_update_block(block_uid: str, content: Optional[str] = None,
554 | transform_pattern: Optional[Dict[str, Any]] = None) -> str:
555 | """Update a single block identified by its UID.
556 |
557 | Args:
558 | block_uid: UID of the block to update
559 | content: New content for the block
560 | transform_pattern: Pattern to transform the current content
561 | """
562 | if not validate_environment():
563 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
564 |
565 | try:
566 | if not block_uid:
567 | return "Error: block_uid is required"
568 |
569 | if not content and not transform_pattern:
570 | return "Error: Either content or transform_pattern must be provided"
571 |
572 | result = update_content(block_uid, content, transform_pattern)
573 | if result["success"]:
574 | return f"Block updated successfully: {result['content']}"
575 | else:
576 | return f"Error updating block: {result.get('error', 'Unknown error')}"
577 | except Exception as e:
578 | logger.error(f"Error updating block: {str(e)}", exc_info=True)
579 | return format_error_response(e)
580 |
581 |
582 | @mcp.tool()
583 | async def roam_update_multiple_blocks(updates: List[Dict[str, Any]]) -> str:
584 | """Efficiently update multiple blocks in a single batch operation.
585 |
586 | Args:
587 | updates: Array of block updates to perform
588 | """
589 | if not validate_environment():
590 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
591 |
592 | try:
593 | if not updates or not isinstance(updates, list):
594 | return "Error: updates must be a non-empty list"
595 |
596 | result = update_multiple_contents(updates)
597 | if result["success"]:
598 | successful = sum(1 for r in result["results"] if r.get("success"))
599 | return f"Updated {successful}/{len(updates)} blocks successfully"
600 | else:
601 | return f"Error updating blocks: {result.get('error', 'Unknown error')}"
602 | except Exception as e:
603 | logger.error(f"Error updating blocks: {str(e)}", exc_info=True)
604 | return format_error_response(e)
605 |
606 |
607 | @mcp.tool()
608 | async def roam_search_by_date(start_date: str, end_date: Optional[str] = None,
609 | type_filter: str = "created", scope: str = "blocks",
610 | include_content: bool = True) -> str:
611 | """Search for blocks or pages based on creation or modification dates.
612 |
613 | Args:
614 | start_date: Start date in ISO format (YYYY-MM-DD)
615 | end_date: Optional: End date in ISO format (YYYY-MM-DD)
616 | type_filter: Whether to search by "created", "modified", or "both"
617 | scope: Whether to search "blocks", "pages", or "both"
618 | include_content: Whether to include the content of matching blocks/pages
619 | """
620 | if not validate_environment():
621 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
622 |
623 | try:
624 | if not start_date:
625 | return "Error: start_date is required"
626 |
627 | if type_filter not in ["created", "modified", "both"]:
628 | return "Error: type_filter must be 'created', 'modified', or 'both'"
629 |
630 | if scope not in ["blocks", "pages", "both"]:
631 | return "Error: scope must be 'blocks', 'pages', or 'both'"
632 |
633 | result = search_by_date(start_date, end_date, type_filter, scope, include_content)
634 | if result["success"]:
635 | # Format the results
636 | formatted = f"{result['message']}\n\n"
637 |
638 | for match in result["matches"]:
639 | date_info = datetime.fromtimestamp(match["time"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
640 |
641 | if match["type"] == "block":
642 | page_info = f" (in page: {match.get('page_title', 'Unknown')})"
643 | content_info = f": {match.get('content', '')}" if include_content else ""
644 | formatted += f"- Block {match['uid']} {date_info}{page_info}{content_info}\n"
645 | else: # page
646 | title_info = f" (title: {match.get('title', 'Unknown')})"
647 | content_info = f": {match.get('content', '')}" if include_content else ""
648 | formatted += f"- Page {match['uid']} {date_info}{title_info}{content_info}\n"
649 |
650 | return formatted
651 | else:
652 | return f"Error searching by date: {result.get('message', 'Unknown error')}"
653 | except Exception as e:
654 | logger.error(f"Error searching by date: {str(e)}", exc_info=True)
655 | return format_error_response(e)
656 |
657 |
658 | @mcp.tool()
659 | async def roam_remember(memory: str, categories: Optional[List[str]] = None) -> str:
660 | """Add a memory or piece of information to remember, stored on the daily page with tag.
661 |
662 | Args:
663 | memory: The memory detail or information to remember
664 | categories: Optional categories to tag the memory with
665 | """
666 | if not validate_environment():
667 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
668 |
669 | try:
670 | if not memory:
671 | return "Error: memory is required"
672 |
673 | result = remember(memory, categories)
674 | if result["success"]:
675 | return f"Memory stored successfully: {result['content']}"
676 | else:
677 | return f"Error storing memory: {result.get('error', 'Unknown error')}"
678 | except Exception as e:
679 | logger.error(f"Error storing memory: {str(e)}", exc_info=True)
680 | return format_error_response(e)
681 |
682 |
683 | @mcp.tool()
684 | async def roam_recall(sort_by: str = "newest", filter_tag: Optional[str] = None) -> str:
685 | """Retrieve stored memories, optionally filtered by tag and sorted by creation date.
686 |
687 | Args:
688 | sort_by: Sort order for memories based on creation date
689 | filter_tag: Include only memories with a specific filter tag
690 | """
691 | if not validate_environment():
692 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
693 |
694 | try:
695 | if sort_by not in ["newest", "oldest"]:
696 | return "Error: sort_by must be 'newest' or 'oldest'"
697 |
698 | result = recall(sort_by, filter_tag)
699 | if result["success"]:
700 | # Format the results
701 | formatted = f"{result['message']}\n\n"
702 |
703 | for memory in result["memories"]:
704 | formatted += f"- {memory}\n"
705 |
706 | return formatted
707 | else:
708 | return f"Error recalling memories: {result.get('error', 'Unknown error')}"
709 | except Exception as e:
710 | logger.error(f"Error recalling memories: {str(e)}", exc_info=True)
711 | return format_error_response(e)
712 |
713 |
714 | @mcp.tool()
715 | async def roam_datomic_query(query: str, inputs: Optional[List[Any]] = None) -> str:
716 | """Execute a custom Datomic query on the Roam graph beyond the available search tools.
717 |
718 | Args:
719 | query: The Datomic query to execute (in Datalog syntax)
720 | inputs: Optional array of input parameters for the query
721 | """
722 | if not validate_environment():
723 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
724 |
725 | try:
726 | if not query:
727 | return "Error: query is required"
728 |
729 | result = execute_datomic_query(query, inputs)
730 | if result["success"]:
731 | # Format the results
732 | formatted = f"{result['message']}\n\n"
733 |
734 | for match in result["matches"]:
735 | formatted += f"- {match['content']}\n"
736 |
737 | return formatted
738 | else:
739 | return f"Error executing query: {result.get('message', 'Unknown error')}"
740 | except Exception as e:
741 | logger.error(f"Error executing query: {str(e)}", exc_info=True)
742 | return format_error_response(e)
743 |
744 |
745 | @mcp.tool()
746 | async def get_youtube_transcript(url: str) -> str:
747 | """Fetch and return the transcript of a YouTube video.
748 |
749 | Args:
750 | url: URL of the YouTube video
751 | """
752 | video_id = extract_youtube_video_id(url)
753 | if not video_id:
754 | return "Invalid YouTube URL. Unable to extract video ID."
755 |
756 | try:
757 | # Define the prioritized list of language codes
758 | languages = [
759 | 'en', 'en-US', 'en-GB', 'de', 'es', 'hi', 'zh', 'ar', 'bn', 'pt',
760 | 'ru', 'ja', 'pa'
761 | ]
762 |
763 | # Attempt to retrieve the available transcripts
764 | transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
765 |
766 | # Try to find a transcript in the prioritized languages
767 | for language in languages:
768 | try:
769 | transcript = transcript_list.find_transcript([language])
770 | # Check if the transcript is manually created or generated, prefer manually created
771 | if transcript.is_generated:
772 | continue
773 | text = " ".join([line["text"] for line in transcript.fetch()])
774 | return text
775 | except Exception:
776 | continue
777 |
778 | # If no suitable transcript is found in the specified languages, try to fetch a generated transcript
779 | try:
780 | generated_transcript = transcript_list.find_generated_transcript(
781 | languages)
782 | text = " ".join(
783 | [line["text"] for line in generated_transcript.fetch()])
784 | return text
785 | except Exception:
786 | return "No suitable transcript found for this video."
787 |
788 | except TranscriptsDisabled:
789 | return "Transcripts are disabled for this video."
790 | except Exception as e:
791 | logger.error(f"Error fetching YouTube transcript: {str(e)}", exc_info=True)
792 | return f"An error occurred while fetching the transcript: {str(e)}"
793 |
794 |
795 | @mcp.tool()
796 | async def fetch_webpage_content(url: str) -> str:
797 | """Fetch and extract the main content from a web page.
798 |
799 | Args:
800 | url: URL of the web page to fetch
801 | """
802 | if not validate_environment():
803 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
804 |
805 | try:
806 | logger.debug(f"Fetching webpage content: {url}")
807 | result = await parse_webpage(url)
808 |
809 | if result["success"]:
810 | return f"# {result['title']}\n\nSource: {url}\n\n{result['content']}"
811 | else:
812 | return f"Error fetching webpage: {result.get('error', 'Unknown error')}"
813 | except Exception as e:
814 | logger.error(f"Error in fetch_webpage_content: {str(e)}", exc_info=True)
815 | return f"Error fetching webpage: {str(e)}"
816 |
817 |
818 | @mcp.tool()
819 | async def fetch_pdf_content(url: str) -> str:
820 | """Fetch and extract the content from a PDF file.
821 |
822 | Args:
823 | url: URL of the PDF file to fetch
824 | """
825 | if not validate_environment():
826 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
827 |
828 | try:
829 | logger.debug(f"Fetching PDF content: {url}")
830 | result = await parse_pdf(url)
831 |
832 | if result["success"]:
833 | return f"# {result['title']}\n\nSource: {url}\n\n{result['content']}"
834 | else:
835 | return f"Error fetching PDF: {result.get('error', 'Unknown error')}"
836 | except Exception as e:
837 | logger.error(f"Error in fetch_pdf_content: {str(e)}", exc_info=True)
838 | return f"Error fetching PDF: {str(e)}"
839 |
840 |
841 | @mcp.tool()
842 | async def parse_url(url: str) -> str:
843 | """Intelligently parse content from a URL - supports webpages, PDFs, and YouTube videos.
844 |
845 | Args:
846 | url: URL to parse
847 | """
848 | if not validate_environment():
849 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
850 |
851 | try:
852 | # Detect URL type
853 | url_type = detect_url_type(url)
854 |
855 | if url_type == "youtube":
856 | # Use existing YouTube transcript function
857 | return await get_youtube_transcript(url)
858 | elif url_type == "pdf":
859 | return await fetch_pdf_content(url)
860 | else: # webpage or unknown
861 | return await fetch_webpage_content(url)
862 | except Exception as e:
863 | logger.error(f"Error parsing URL: {str(e)}", exc_info=True)
864 | return f"Error parsing URL: {str(e)}"
865 |
866 |
867 | @mcp.tool()
868 | async def get_roam_graph_info() -> str:
869 | """Get information about your Roam Research graph.
870 | """
871 | if not validate_environment():
872 | return "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
873 |
874 | try:
875 | # Get page count
876 | query = """[:find (count ?p)
877 | :where [?p :node/title]]"""
878 |
879 | result = execute_datomic_query(query)
880 |
881 | if result["success"] and result["matches"]:
882 | page_count = result["matches"][0]["content"]
883 | else:
884 | page_count = "Unknown"
885 |
886 | # Get block count
887 | query = """[:find (count ?b)
888 | :where [?b :block/string]]"""
889 |
890 | result = execute_datomic_query(query)
891 |
892 | if result["success"] and result["matches"]:
893 | block_count = result["matches"][0]["content"]
894 | else:
895 | block_count = "Unknown"
896 |
897 | # Format the output
898 | memory_tag = MEMORIES_TAG if MEMORIES_TAG else "Not set (using default #[[Memories]])"
899 |
900 | formatted_info = f"""
901 | Graph Name: {GRAPH_NAME}
902 | Pages: {page_count}
903 | Blocks: {block_count}
904 | API Access: Enabled
905 | Memory Tag: {memory_tag}
906 | """
907 |
908 | return formatted_info
909 | except Exception as e:
910 | logger.error(f"Error retrieving graph information: {str(e)}", exc_info=True)
911 | return format_error_response(e)
912 |
913 |
914 | @mcp.prompt()
915 | async def summarize_page(page_title: str) -> dict:
916 | """
917 | Create a prompt to summarize a page in Roam Research.
918 |
919 | Args:
920 | page_title: Title of the page to summarize
921 | """
922 | if not validate_environment():
923 | return {
924 | "messages": [{
925 | "role": "user",
926 | "content": "Error: ROAM_API_TOKEN and ROAM_GRAPH_NAME environment variables must be set"
927 | }]
928 | }
929 |
930 | try:
931 | content = get_page_content(page_title)
932 |
933 | return {
934 | "messages": [{
935 | "role": "user",
936 | "content": f"Please provide a concise summary of the following page content from my Roam Research database:\n\n{content}"
937 | }]
938 | }
939 | except Exception as e:
940 | logger.error(f"Error creating summary prompt: {str(e)}", exc_info=True)
941 | return {
942 | "messages": [{
943 | "role": "user",
944 | "content": f"I wanted to summarize my Roam page titled '{page_title}', but there was an error retrieving the content: {format_error_response(e)}. Can you help me troubleshoot this issue with my Roam Research integration?"
945 | }]
946 | }
947 |
948 |
949 | def run_server(transport="stdio", port=None, verbose=False):
950 | """Run the MCP server with the specified transport."""
951 | # Configure logging based on verbosity
952 | setup_logging(verbose)
953 |
954 | logger.info("Server starting...")
955 |
956 | # Validate environment variables
957 | valid_env = validate_environment()
958 | if valid_env:
959 | logger.info(f"API token and graph name are set")
960 | logger.info(f"MEMORIES_TAG is set to: {MEMORIES_TAG}")
961 | else:
962 | logger.warning("Missing required environment variables")
963 |
964 | # Run the server
965 | try:
966 | if transport == "stdio":
967 | logger.info("Starting server with stdio transport")
968 | mcp.run(transport="stdio")
969 | elif transport == "sse":
970 | if not port:
971 | port = 3000
972 | logger.info(f"Starting server with SSE transport on port {port}")
973 | mcp.run(transport="sse", port=port)
974 | else:
975 | logger.error(f"Unsupported transport: {transport}")
976 | sys.exit(1)
977 | except KeyboardInterrupt:
978 | logger.info("Server stopped by user")
979 | except Exception as e:
980 | logger.error(f"Error running server: {str(e)}")
981 | traceback.print_exc()
```
--------------------------------------------------------------------------------
/roam_mcp/content.py:
--------------------------------------------------------------------------------
```python
1 | """Content operations for the Roam MCP server (pages, blocks, and outlines)."""
2 |
3 | from typing import Dict, List, Any, Optional, Union
4 | from datetime import datetime
5 | import re
6 | import logging
7 | import uuid
8 | import time
9 | import json
10 |
11 | from roam_mcp.api import (
12 | execute_query,
13 | execute_write_action,
14 | execute_batch_actions,
15 | get_session_and_headers,
16 | GRAPH_NAME,
17 | find_or_create_page,
18 | get_daily_page,
19 | add_block_to_page,
20 | update_block,
21 | batch_update_blocks,
22 | find_page_by_title,
23 | ValidationError,
24 | BlockNotFoundError,
25 | PageNotFoundError,
26 | TransactionError
27 | )
28 | from roam_mcp.utils import (
29 | format_roam_date,
30 | convert_to_roam_markdown,
31 | parse_markdown_list,
32 | process_nested_content,
33 | find_block_uid,
34 | create_block_action
35 | )
36 |
37 | # Set up logging
38 | logger = logging.getLogger("roam-mcp.content")
39 |
40 |
41 | def process_hierarchical_content(parent_uid: str, content_data: List[Dict[str, Any]], order: str = "last") -> Dict[str, Any]:
42 | """
43 | Process hierarchical content with proper parent-child relationships.
44 | This is a standardized utility function used across different content creation methods.
45 |
46 | Args:
47 | parent_uid: UID of the parent block/page
48 | content_data: List of content items with text, level, and optional children/heading_level attributes
49 | order: Where to add content ("first" or "last")
50 |
51 | Returns:
52 | Dictionary with success status and created block UIDs
53 | """
54 | if not content_data:
55 | return {
56 | "success": True,
57 | "created_uids": []
58 | }
59 |
60 | # First, validate the hierarchical structure
61 | def validate_item(item, path="root"):
62 | errors = []
63 | # Check required fields
64 | if not item.get("text") and not item.get("string"):
65 | errors.append(f"Item at {path} is missing required 'text' field")
66 |
67 | # Ensure level is valid
68 | level = item.get("level")
69 | if level is not None and not isinstance(level, int):
70 | errors.append(f"Item at {path} has invalid 'level', must be an integer")
71 |
72 | # Validate heading level
73 | heading_level = item.get("heading_level", 0)
74 | if heading_level and (not isinstance(heading_level, int) or heading_level < 0 or heading_level > 3):
75 | errors.append(f"Item at {path} has invalid 'heading_level', must be an integer between 0 and 3")
76 |
77 | # Validate children recursively
78 | children = item.get("children", [])
79 | if not isinstance(children, list):
80 | errors.append(f"Item at {path} has invalid 'children', must be a list")
81 | else:
82 | for i, child in enumerate(children):
83 | child_path = f"{path}.children[{i}]"
84 | child_errors = validate_item(child, child_path)
85 | errors.extend(child_errors)
86 |
87 | return errors
88 |
89 | # Validate all items
90 | all_errors = []
91 | for i, item in enumerate(content_data):
92 | item_path = f"item[{i}]"
93 | errors = validate_item(item, item_path)
94 | all_errors.extend(errors)
95 |
96 | if all_errors:
97 | return {
98 | "success": False,
99 | "error": f"Invalid content structure: {'; '.join(all_errors)}"
100 | }
101 |
102 | # Process hierarchical content with proper nesting
103 | session, headers = get_session_and_headers()
104 | all_created_uids = []
105 |
106 | # Define a recursive function to process items
107 | def process_item(item, parent_uid, level_to_uid, current_level):
108 | created_uids = []
109 |
110 | # Get item properties
111 | text = item.get("text", item.get("string", ""))
112 |
113 | # Strip leading dash characters that might cause double bullets
114 | text = re.sub(r'^-\s+', '', text)
115 |
116 | level = item.get("level", current_level)
117 | heading_level = item.get("heading_level", 0)
118 |
119 | # Find the appropriate parent for this level
120 | parent_level = level - 1
121 | if parent_level < -1:
122 | parent_level = -1
123 |
124 | effective_parent = level_to_uid.get(parent_level, parent_uid)
125 |
126 | # Create block with a unique UID
127 | block_uid = str(uuid.uuid4())[:9]
128 |
129 | action_data = {
130 | "action": "create-block",
131 | "location": {
132 | "parent-uid": effective_parent,
133 | "order": order if level == 0 else "last"
134 | },
135 | "block": {
136 | "string": text,
137 | "uid": block_uid
138 | }
139 | }
140 |
141 | # Add heading level if specified
142 | if heading_level and heading_level > 0 and heading_level <= 3:
143 | action_data["block"]["heading"] = heading_level
144 |
145 | # Execute the action
146 | result = execute_write_action(action_data)
147 |
148 | if result.get("success", False):
149 | created_uids.append(block_uid)
150 | level_to_uid[level] = block_uid
151 | logger.debug(f"Created block at level {level} with UID: {block_uid}")
152 |
153 | # Process children if any
154 | children = item.get("children", [])
155 | if children:
156 | for child in children:
157 | # Process each child with this block as parent
158 | child_result = process_item(child, block_uid, level_to_uid, level + 1)
159 | created_uids.extend(child_result)
160 |
161 | # Add a brief delay for API stability
162 | time.sleep(0.3)
163 | else:
164 | logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}")
165 |
166 | return created_uids
167 |
168 | try:
169 | # Process each top-level item
170 | level_to_uid = {-1: parent_uid} # Start with parent as level -1
171 |
172 | for item in content_data:
173 | item_uids = process_item(item, parent_uid, level_to_uid, 0)
174 | all_created_uids.extend(item_uids)
175 |
176 | return {
177 | "success": True,
178 | "created_uids": all_created_uids
179 | }
180 | except Exception as e:
181 | error_msg = f"Failed to process hierarchical content: {str(e)}"
182 | logger.error(error_msg)
183 | return {
184 | "success": False,
185 | "error": error_msg,
186 | "created_uids": all_created_uids # Return any UIDs created before failure
187 | }
188 |
189 |
190 | def create_nested_blocks(parent_uid: str, blocks_data: List[Dict[str, Any]]) -> Dict[str, Any]:
191 | """
192 | Create nested blocks with proper parent-child relationships.
193 |
194 | Args:
195 | parent_uid: UID of the parent block/page
196 | blocks_data: List of block data (text, level, children)
197 |
198 | Returns:
199 | Dictionary with success status and created block UIDs
200 | """
201 | # For backward compatibility, now uses the standardized hierarchical content processor
202 | return process_hierarchical_content(parent_uid, blocks_data)
203 |
204 |
205 | def create_page(title: str, content: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
206 | """
207 | Create a new page in Roam Research with optional nested content.
208 |
209 | Args:
210 | title: Title for the new page
211 | content: Optional content as a list of dicts with 'text', optional 'level', and optional 'children'
212 | Each item should have:
213 | - 'text' or 'string': Content text
214 | - 'level': Nesting level (optional, defaults to parent_level + 1)
215 | - 'heading_level': Heading level 1-3 (optional)
216 | - 'children': List of child items (optional)
217 |
218 | Returns:
219 | Result with page UID and created block UIDs
220 | """
221 | if not title:
222 | return {
223 | "success": False,
224 | "error": "Title is required"
225 | }
226 |
227 | session, headers = get_session_and_headers()
228 |
229 | try:
230 | # Create the page
231 | page_uid = find_or_create_page(title)
232 |
233 | # Add content if provided
234 | if content:
235 | # Use the standardized hierarchical content processor
236 | result = process_hierarchical_content(page_uid, content)
237 |
238 | if result["success"]:
239 | return {
240 | "success": True,
241 | "uid": page_uid,
242 | "created_uids": result.get("created_uids", []),
243 | "page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
244 | }
245 | else:
246 | return {
247 | "success": False,
248 | "error": result.get("error", "Failed to create content"),
249 | "uid": page_uid,
250 | "page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
251 | }
252 |
253 | return {
254 | "success": True,
255 | "uid": page_uid,
256 | "page_url": f"https://roamresearch.com/#/app/{GRAPH_NAME}/page/{page_uid}"
257 | }
258 | except ValidationError as e:
259 | return {
260 | "success": False,
261 | "error": str(e)
262 | }
263 | except TransactionError as e:
264 | return {
265 | "success": False,
266 | "error": str(e)
267 | }
268 | except Exception as e:
269 | logger.error(f"Error creating page: {str(e)}")
270 | return {
271 | "success": False,
272 | "error": f"Error creating page: {str(e)}"
273 | }
274 |
275 |
276 | def create_block(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None) -> Dict[str, Any]:
277 | """
278 | Create a new block in Roam Research.
279 |
280 | Args:
281 | content: Block content - can be single-line text or multi-line content
282 | that will be parsed into a hierarchical structure
283 | page_uid: Optional page UID
284 | page_title: Optional page title
285 |
286 | Returns:
287 | Result with block UID
288 | """
289 | if not content:
290 | return {
291 | "success": False,
292 | "error": "Content is required"
293 | }
294 |
295 | session, headers = get_session_and_headers()
296 |
297 | try:
298 | # Determine target page
299 | target_page_uid = None
300 |
301 | if page_uid:
302 | # Use provided page UID
303 | target_page_uid = page_uid
304 | elif page_title:
305 | # Find or create page by title
306 | target_page_uid = find_or_create_page(page_title)
307 | else:
308 | # Use today's daily page
309 | target_page_uid = get_daily_page()
310 |
311 | # Handle multi-line content
312 | if "\n" in content:
313 | # Parse as nested structure
314 | markdown_content = convert_to_roam_markdown(content)
315 | parsed_content = parse_markdown_list(markdown_content)
316 |
317 | # Check if there's any content
318 | if not parsed_content:
319 | return {
320 | "success": False,
321 | "error": "Failed to parse content"
322 | }
323 |
324 | # Build hierarchical structure
325 | def build_hierarchy_from_parsed(items):
326 | # Sort by level first
327 | sorted_items = sorted(items, key=lambda x: x.get("level", 0))
328 |
329 | # Group items by level
330 | level_groups = {}
331 | for item in sorted_items:
332 | level = item.get("level", 0)
333 | if level not in level_groups:
334 | level_groups[level] = []
335 | level_groups[level].append(item)
336 |
337 | # Find the minimum level (root level)
338 | min_level = min(level_groups.keys()) if level_groups else 0
339 | root_items = level_groups.get(min_level, [])
340 |
341 | # Track parents at each level
342 | current_parents = {}
343 | hierarchical_items = []
344 |
345 | # Process items level by level
346 | for level in sorted(level_groups.keys()):
347 | for item in level_groups[level]:
348 | if level == min_level:
349 | # Root level items
350 | hierarchical_items.append(item)
351 | current_parents[level] = item
352 | else:
353 | # Find the parent
354 | parent_level = level - 1
355 | while parent_level >= min_level:
356 | if parent_level in current_parents:
357 | parent = current_parents[parent_level]
358 | if "children" not in parent:
359 | parent["children"] = []
360 | parent["children"].append(item)
361 | current_parents[level] = item
362 | break
363 | parent_level -= 1
364 |
365 | # If no parent found, add as root
366 | if parent_level < min_level:
367 | hierarchical_items.append(item)
368 | current_parents[level] = item
369 |
370 | return hierarchical_items
371 |
372 | # Build hierarchical structure
373 | hierarchical_content = build_hierarchy_from_parsed(parsed_content)
374 |
375 | # Process using the standardized hierarchical content processor
376 | result = process_hierarchical_content(target_page_uid, hierarchical_content)
377 |
378 | if result["success"]:
379 | return {
380 | "success": True,
381 | "block_uid": result["created_uids"][0] if result["created_uids"] else None,
382 | "parent_uid": target_page_uid,
383 | "created_uids": result["created_uids"]
384 | }
385 | else:
386 | return {
387 | "success": False,
388 | "error": result.get("error", "Failed to create hierarchical blocks"),
389 | "parent_uid": target_page_uid
390 | }
391 | else:
392 | # Create a simple block with explicit UID
393 | block_uid = str(uuid.uuid4())[:9]
394 |
395 | action_data = {
396 | "action": "create-block",
397 | "location": {
398 | "parent-uid": target_page_uid,
399 | "order": "last"
400 | },
401 | "block": {
402 | "string": content,
403 | "uid": block_uid
404 | }
405 | }
406 |
407 | result = execute_write_action(action_data)
408 | if result.get("success", False):
409 | # Verify the block exists after a brief delay
410 | time.sleep(0.5)
411 | found_uid = find_block_uid(session, headers, GRAPH_NAME, content)
412 |
413 | return {
414 | "success": True,
415 | "block_uid": found_uid or block_uid,
416 | "parent_uid": target_page_uid
417 | }
418 | else:
419 | return {
420 | "success": False,
421 | "error": "Failed to create block"
422 | }
423 | except ValidationError as e:
424 | return {
425 | "success": False,
426 | "error": str(e)
427 | }
428 | except PageNotFoundError as e:
429 | return {
430 | "success": False,
431 | "error": str(e)
432 | }
433 | except BlockNotFoundError as e:
434 | return {
435 | "success": False,
436 | "error": str(e)
437 | }
438 | except TransactionError as e:
439 | return {
440 | "success": False,
441 | "error": str(e)
442 | }
443 | except Exception as e:
444 | logger.error(f"Error creating block: {str(e)}")
445 | return {
446 | "success": False,
447 | "error": f"Error creating block: {str(e)}"
448 | }
449 |
450 |
451 | def create_outline(outline: List[Dict[str, Any]], page_title_uid: Optional[str] = None, block_text_uid: Optional[str] = None) -> Dict[str, Any]:
452 | """
453 | Create a structured outline in Roam Research.
454 |
455 | Args:
456 | outline: List of outline items with text and level
457 | Each item should have:
458 | - 'text': Content text (required)
459 | - 'level': Nesting level (required)
460 | - 'heading_level': Heading level 1-3 (optional)
461 | page_title_uid: Optional page title or UID
462 | block_text_uid: Optional block text or UID to add outline under
463 |
464 | Returns:
465 | Result with created block UIDs
466 | """
467 | # Validate outline
468 | if not outline:
469 | return {
470 | "success": False,
471 | "error": "Outline cannot be empty"
472 | }
473 |
474 | # Check for valid levels
475 | invalid_items = [item for item in outline if not item.get("text") or not isinstance(item.get("level"), int)]
476 | if invalid_items:
477 | return {
478 | "success": False,
479 | "error": "All outline items must have text and a valid level"
480 | }
481 |
482 | session, headers = get_session_and_headers()
483 |
484 | try:
485 | # Determine target page
486 | target_page_uid = None
487 |
488 | if page_title_uid:
489 | # Find page by title or UID
490 | page_uid = find_page_by_title(session, headers, GRAPH_NAME, page_title_uid)
491 |
492 | if page_uid:
493 | target_page_uid = page_uid
494 | else:
495 | # Create new page if not found
496 | target_page_uid = find_or_create_page(page_title_uid)
497 | else:
498 | # Use today's daily page
499 | target_page_uid = get_daily_page()
500 |
501 | # Determine parent block
502 | parent_uid = target_page_uid
503 |
504 | if block_text_uid:
505 | # Check if it's a valid block UID (9 characters)
506 | if len(block_text_uid) == 9 and re.match(r'^[a-zA-Z0-9_-]{9}$', block_text_uid):
507 | # Verify block exists
508 | query = f'''[:find ?uid
509 | :where [?b :block/uid "{block_text_uid}"]
510 | [?b :block/uid ?uid]]'''
511 |
512 | result = execute_query(query)
513 |
514 | if result:
515 | parent_uid = block_text_uid
516 | else:
517 | return {
518 | "success": False,
519 | "error": f"Block with UID {block_text_uid} not found"
520 | }
521 | else:
522 | # Create a header block with the given text
523 | action_data = {
524 | "action": "create-block",
525 | "location": {
526 | "parent-uid": target_page_uid,
527 | "order": "last"
528 | },
529 | "block": {
530 | "string": block_text_uid,
531 | "uid": str(uuid.uuid4())[:9]
532 | }
533 | }
534 |
535 | execute_write_action(action_data)
536 | time.sleep(0.5) # Add delay to ensure block is created
537 | header_uid = find_block_uid(session, headers, GRAPH_NAME, block_text_uid)
538 |
539 | if not header_uid:
540 | return {
541 | "success": False,
542 | "error": f"Failed to create header block with text: {block_text_uid}"
543 | }
544 |
545 | parent_uid = header_uid
546 |
547 | # Build hierarchical structure from flat outline items
548 | def build_outline_hierarchy(items):
549 | # First, sort by level
550 | sorted_items = sorted(items, key=lambda x: x.get("level", 0))
551 |
552 | # Group items by level
553 | level_groups = {}
554 | for item in sorted_items:
555 | level = item.get("level", 0)
556 | if level not in level_groups:
557 | level_groups[level] = []
558 | level_groups[level].append(item)
559 |
560 | # Build parent-child relationships based on item position and level
561 | min_level = min(level_groups.keys()) if level_groups else 0
562 | hierarchical_items = []
563 |
564 | # Track parent nodes at each level
565 | level_parents = {}
566 |
567 | # Process items in order
568 | for item in sorted_items:
569 | level = item.get("level", 0)
570 |
571 | # If this is a root-level item, add it to the result directly
572 | if level == min_level:
573 | hierarchical_items.append(item)
574 | level_parents[level] = item
575 | else:
576 | # Find the nearest parent level
577 | parent_level = level - 1
578 | while parent_level >= min_level and parent_level not in level_parents:
579 | parent_level -= 1
580 |
581 | # If we found a parent, add this item as its child
582 | if parent_level >= min_level:
583 | parent = level_parents[parent_level]
584 | if "children" not in parent:
585 | parent["children"] = []
586 | parent["children"].append(item)
587 | level_parents[level] = item
588 | else:
589 | # If no parent found, add it as a root item
590 | hierarchical_items.append(item)
591 | level_parents[level] = item
592 |
593 | return hierarchical_items
594 |
595 | # Build hierarchical structure from outline
596 | hierarchical_outline = build_outline_hierarchy(outline)
597 |
598 | # Use the standardized hierarchical content processor
599 | result = process_hierarchical_content(parent_uid, hierarchical_outline)
600 |
601 | if result["success"]:
602 | return {
603 | "success": True,
604 | "page_uid": target_page_uid,
605 | "parent_uid": parent_uid,
606 | "created_uids": result.get("created_uids", [])
607 | }
608 | else:
609 | return {
610 | "success": False,
611 | "error": result.get("error", "Failed to create outline"),
612 | "page_uid": target_page_uid,
613 | "parent_uid": parent_uid
614 | }
615 | except ValidationError as e:
616 | return {
617 | "success": False,
618 | "error": str(e)
619 | }
620 | except PageNotFoundError as e:
621 | return {
622 | "success": False,
623 | "error": str(e)
624 | }
625 | except BlockNotFoundError as e:
626 | return {
627 | "success": False,
628 | "error": str(e)
629 | }
630 | except TransactionError as e:
631 | return {
632 | "success": False,
633 | "error": str(e)
634 | }
635 | except Exception as e:
636 | logger.error(f"Error creating outline: {str(e)}")
637 | return {
638 | "success": False,
639 | "error": f"Error creating outline: {str(e)}"
640 | }
641 |
642 |
643 | def import_markdown(content: str, page_uid: Optional[str] = None, page_title: Optional[str] = None,
644 | parent_uid: Optional[str] = None, parent_string: Optional[str] = None,
645 | order: str = "last") -> Dict[str, Any]:
646 | """
647 | Import markdown content into Roam Research.
648 |
649 | Args:
650 | content: Markdown content to import
651 | page_uid: Optional page UID
652 | page_title: Optional page title
653 | parent_uid: Optional parent block UID
654 | parent_string: Optional parent block text
655 | order: Position ("first" or "last")
656 |
657 | Returns:
658 | Result with created block UIDs
659 | """
660 | if not content:
661 | return {
662 | "success": False,
663 | "error": "Content cannot be empty"
664 | }
665 |
666 | if order not in ["first", "last"]:
667 | return {
668 | "success": False,
669 | "error": "Order must be 'first' or 'last'"
670 | }
671 |
672 | session, headers = get_session_and_headers()
673 |
674 | try:
675 | # Determine target page
676 | target_page_uid = None
677 |
678 | if page_uid:
679 | # Use provided page UID
680 | target_page_uid = page_uid
681 | elif page_title:
682 | # Find or create page by title
683 | target_page_uid = find_or_create_page(page_title)
684 | else:
685 | # Use today's daily page
686 | target_page_uid = get_daily_page()
687 |
688 | # Determine parent block
689 | parent_block_uid = target_page_uid
690 |
691 | if parent_uid:
692 | # Verify block exists
693 | query = f'''[:find ?uid .
694 | :where [?b :block/uid "{parent_uid}"]
695 | [?b :block/uid ?uid]]'''
696 |
697 | result = execute_query(query)
698 |
699 | if result:
700 | parent_block_uid = parent_uid
701 | else:
702 | return {
703 | "success": False,
704 | "error": f"Block with UID {parent_uid} not found"
705 | }
706 | elif parent_string:
707 | # Find block by string
708 | found_uid = find_block_uid(session, headers, GRAPH_NAME, parent_string)
709 |
710 | if found_uid:
711 | parent_block_uid = found_uid
712 | else:
713 | # Create parent block if it doesn't exist
714 | block_uid = str(uuid.uuid4())[:9]
715 |
716 | action_data = {
717 | "action": "create-block",
718 | "location": {
719 | "parent-uid": target_page_uid,
720 | "order": "last"
721 | },
722 | "block": {
723 | "string": parent_string,
724 | "uid": block_uid
725 | }
726 | }
727 |
728 | execute_write_action(action_data)
729 | time.sleep(1) # Wait for block to be created
730 |
731 | found_uid = find_block_uid(session, headers, GRAPH_NAME, parent_string)
732 | if found_uid:
733 | parent_block_uid = found_uid
734 | else:
735 | parent_block_uid = block_uid
736 | logger.debug(f"Created parent block with UID: {block_uid}")
737 |
738 | # Convert markdown to Roam format
739 | roam_markdown = convert_to_roam_markdown(content)
740 |
741 | # Parse markdown into hierarchical structure
742 | parsed_content = parse_markdown_list(roam_markdown)
743 |
744 | if not parsed_content:
745 | return {
746 | "success": False,
747 | "error": "Failed to parse markdown content"
748 | }
749 |
750 | # Build a proper hierarchical structure from the parsed markdown
751 | def build_hierarchy(items):
752 | # Group items by level
753 | level_groups = {}
754 | for item in items:
755 | level = item.get("level", 0)
756 | if level not in level_groups:
757 | level_groups[level] = []
758 | level_groups[level].append(item)
759 |
760 | # Start with the root level (usually 0)
761 | min_level = min(level_groups.keys()) if level_groups else 0
762 | root_items = level_groups.get(min_level, [])
763 |
764 | # Recursive function to build the tree
765 | def attach_children(parent_items, parent_level):
766 | for parent in parent_items:
767 | children = []
768 | child_level = parent_level + 1
769 |
770 | # If there are items at the next level
771 | if child_level in level_groups:
772 | # Find children whose current parent would be this item
773 | # based on the flattened list's position
774 | parent_index = items.index(parent)
775 | for potential_child in level_groups[child_level]:
776 | child_index = items.index(potential_child)
777 |
778 | # Is this child positioned after the parent and before the next parent?
779 | if child_index > parent_index:
780 | # Check if there's another parent of the same level between this parent and the child
781 | next_parent_index = float('inf')
782 | for next_parent in level_groups[parent_level]:
783 | next_idx = items.index(next_parent)
784 | if next_idx > parent_index and next_idx < child_index:
785 | next_parent_index = next_idx
786 | break
787 |
788 | if child_index < next_parent_index:
789 | children.append(potential_child)
790 |
791 | # Set the children
792 | if children:
793 | parent["children"] = children
794 | # Recursively attach children to these children
795 | attach_children(children, child_level)
796 |
797 | # Start the recursive process
798 | attach_children(root_items, min_level)
799 | return root_items
800 |
801 | # Build a hierarchical structure that preserves parent-child relationships
802 | hierarchical_content = build_hierarchy(parsed_content)
803 |
804 | # Process the hierarchical content using the standardized utility
805 | result = process_hierarchical_content(parent_block_uid, hierarchical_content, order)
806 |
807 | if result["success"]:
808 | return {
809 | "success": True,
810 | "page_uid": target_page_uid,
811 | "parent_uid": parent_block_uid,
812 | "created_uids": result.get("created_uids", [])
813 | }
814 | else:
815 | return {
816 | "success": False,
817 | "error": result.get("error", "Failed to import markdown"),
818 | "page_uid": target_page_uid,
819 | "parent_uid": parent_block_uid
820 | }
821 | except ValidationError as e:
822 | return {
823 | "success": False,
824 | "error": str(e)
825 | }
826 | except PageNotFoundError as e:
827 | return {
828 | "success": False,
829 | "error": str(e)
830 | }
831 | except BlockNotFoundError as e:
832 | return {
833 | "success": False,
834 | "error": str(e)
835 | }
836 | except TransactionError as e:
837 | return {
838 | "success": False,
839 | "error": str(e)
840 | }
841 | except Exception as e:
842 | logger.error(f"Error importing markdown: {str(e)}")
843 | return {
844 | "success": False,
845 | "error": f"Error importing markdown: {str(e)}"
846 | }
847 |
848 |
849 | def add_todos(todos: List[str]) -> Dict[str, Any]:
850 | """
851 | Add todo items to today's daily page.
852 |
853 | Args:
854 | todos: List of todo items
855 |
856 | Returns:
857 | Result with success status
858 | """
859 | if not todos:
860 | return {
861 | "success": False,
862 | "error": "Todo list cannot be empty"
863 | }
864 |
865 | if not all(isinstance(todo, str) for todo in todos):
866 | return {
867 | "success": False,
868 | "error": "All todo items must be strings"
869 | }
870 |
871 | session, headers = get_session_and_headers()
872 |
873 | try:
874 | # Get today's daily page
875 | daily_page_uid = get_daily_page()
876 |
877 | # Create batch actions for todos
878 | actions = []
879 | todo_uids = []
880 |
881 | for i, todo in enumerate(todos):
882 | # Format with TODO syntax
883 | todo_content = f"{{{{[[TODO]]}}}} {todo}"
884 |
885 | # Generate UID
886 | block_uid = str(uuid.uuid4())[:9]
887 | todo_uids.append(block_uid)
888 |
889 | # Create action
890 | action = {
891 | "action": "create-block",
892 | "location": {
893 | "parent-uid": daily_page_uid,
894 | "order": "last"
895 | },
896 | "block": {
897 | "string": todo_content,
898 | "uid": block_uid
899 | }
900 | }
901 |
902 | actions.append(action)
903 |
904 | # Execute batch actions
905 | result = execute_write_action(actions)
906 |
907 | if result.get("success", False) or "created_uids" in result:
908 | return {
909 | "success": True,
910 | "created_uids": result.get("created_uids", todo_uids),
911 | "page_uid": daily_page_uid
912 | }
913 | else:
914 | return {
915 | "success": False,
916 | "error": "Failed to create todo items"
917 | }
918 | except ValidationError as e:
919 | return {
920 | "success": False,
921 | "error": str(e)
922 | }
923 | except PageNotFoundError as e:
924 | return {
925 | "success": False,
926 | "error": str(e)
927 | }
928 | except TransactionError as e:
929 | return {
930 | "success": False,
931 | "error": str(e)
932 | }
933 | except Exception as e:
934 | return {
935 | "success": False,
936 | "error": str(e)
937 | }
938 |
939 |
940 | def update_content(block_uid: str, content: Optional[str] = None, transform_pattern: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
941 | """
942 | Update a block's content or transform it using a pattern.
943 |
944 | Args:
945 | block_uid: Block UID
946 | content: New content
947 | transform_pattern: Pattern for transformation
948 |
949 | Returns:
950 | Result with updated content
951 | """
952 | if not block_uid:
953 | return {
954 | "success": False,
955 | "error": "Block UID is required"
956 | }
957 |
958 | if not content and not transform_pattern:
959 | return {
960 | "success": False,
961 | "error": "Either content or transform_pattern must be provided"
962 | }
963 |
964 | try:
965 | # Get current content if doing a transformation
966 | if transform_pattern:
967 | # Validate transform pattern
968 | if not isinstance(transform_pattern, dict):
969 | return {
970 | "success": False,
971 | "error": "Transform pattern must be an object"
972 | }
973 |
974 | if "find" not in transform_pattern or "replace" not in transform_pattern:
975 | return {
976 | "success": False,
977 | "error": "Transform pattern must include 'find' and 'replace' properties"
978 | }
979 |
980 | query = f'''[:find ?string .
981 | :where [?b :block/uid "{block_uid}"]
982 | [?b :block/string ?string]]'''
983 |
984 | current_content = execute_query(query)
985 |
986 | if not current_content:
987 | return {
988 | "success": False,
989 | "error": f"Block with UID {block_uid} not found"
990 | }
991 |
992 | # Apply transformation
993 | find = transform_pattern["find"]
994 | replace = transform_pattern["replace"]
995 | global_replace = transform_pattern.get("global", True)
996 |
997 | try:
998 | flags = re.MULTILINE
999 | count = 0 if global_replace else 1
1000 | new_content = re.sub(find, replace, current_content, count=count, flags=flags)
1001 |
1002 | # Update block
1003 | update_block(block_uid, new_content)
1004 |
1005 | return {
1006 | "success": True,
1007 | "content": new_content
1008 | }
1009 | except re.error as e:
1010 | return {
1011 | "success": False,
1012 | "error": f"Invalid regex pattern: {str(e)}"
1013 | }
1014 | else:
1015 | # Direct content update
1016 | update_block(block_uid, content)
1017 |
1018 | return {
1019 | "success": True,
1020 | "content": content
1021 | }
1022 | except ValidationError as e:
1023 | return {
1024 | "success": False,
1025 | "error": str(e)
1026 | }
1027 | except BlockNotFoundError as e:
1028 | return {
1029 | "success": False,
1030 | "error": str(e)
1031 | }
1032 | except TransactionError as e:
1033 | return {
1034 | "success": False,
1035 | "error": str(e)
1036 | }
1037 | except Exception as e:
1038 | return {
1039 | "success": False,
1040 | "error": str(e)
1041 | }
1042 |
1043 |
1044 | def update_multiple_contents(updates: List[Dict[str, Any]]) -> Dict[str, Any]:
1045 | """
1046 | Update multiple blocks in a single operation.
1047 |
1048 | Args:
1049 | updates: List of update operations
1050 |
1051 | Returns:
1052 | Results of updates
1053 | """
1054 | if not updates or not isinstance(updates, list):
1055 | return {
1056 | "success": False,
1057 | "error": "Updates must be a non-empty list"
1058 | }
1059 |
1060 | try:
1061 | # Validate each update
1062 | for i, update in enumerate(updates):
1063 | if "block_uid" not in update:
1064 | return {
1065 | "success": False,
1066 | "error": f"Update at index {i} is missing required 'block_uid' property"
1067 | }
1068 |
1069 | if "content" not in update and "transform" not in update:
1070 | return {
1071 | "success": False,
1072 | "error": f"Update at index {i} must include either 'content' or 'transform'"
1073 | }
1074 |
1075 | if "transform" in update:
1076 | transform = update["transform"]
1077 | if not isinstance(transform, dict):
1078 | return {
1079 | "success": False,
1080 | "error": f"Transform at index {i} must be an object"
1081 | }
1082 |
1083 | if "find" not in transform or "replace" not in transform:
1084 | return {
1085 | "success": False,
1086 | "error": f"Transform at index {i} must include 'find' and 'replace' properties"
1087 | }
1088 |
1089 | # Batch update blocks in chunks of 50
1090 | CHUNK_SIZE = 50
1091 | results = batch_update_blocks(updates, CHUNK_SIZE)
1092 |
1093 | # Count successful updates
1094 | successful = sum(1 for result in results if result.get("success"))
1095 |
1096 | return {
1097 | "success": successful == len(updates),
1098 | "results": results,
1099 | "message": f"Updated {successful}/{len(updates)} blocks successfully"
1100 | }
1101 | except ValidationError as e:
1102 | return {
1103 | "success": False,
1104 | "error": str(e)
1105 | }
1106 | except Exception as e:
1107 | return {
1108 | "success": False,
1109 | "error": str(e)
1110 | }
1111 | """
1112 | Create nested blocks with proper parent-child relationships.
1113 |
1114 | Args:
1115 | parent_uid: UID of the parent block/page
1116 | blocks_data: List of block data (text, level, children)
1117 |
1118 | Returns:
1119 | Dictionary with success status and created block UIDs
1120 | """
1121 | # For backward compatibility, now uses the standardized hierarchical content processor
1122 | return process_hierarchical_content(parent_uid, blocks_data)
```