# Directory Structure
```
├── .github
│ └── workflows
│ └── test.yml
├── .gitignore
├── behave.ini
├── features
│ ├── blackbox_tests.feature
│ ├── environment.py
│ └── steps
│ └── blackbox_steps.py
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── obsidian_mcp
│ ├── client.py
│ ├── search.py
│ ├── server.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*.so
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# IDEs
.idea/
.vscode/
*.swp
*.swo
*~
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Project specific
.claude/
*.output
*.log
# Test artifacts
.behave_output/
test-reports/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Obsidian MCP Server
An MCP (Model Context Protocol) server that enables AI agents to perform sophisticated knowledge discovery and analysis across your Obsidian vault through the Local REST API plugin.
<a href="https://glama.ai/mcp/servers/@pmmvr/obsidian-api-mcp-server">
<img width="380" height="200" src="https://glama.ai/mcp/servers/@pmmvr/obsidian-api-mcp-server/badge" alt="Obsidian Server MCP server" />
</a>
## Why This Matters
This server transforms your Obsidian vault into a powerful knowledge base for AI agents, enabling complex multi-step workflows like:
- **"Retrieve notes from my 'Projects/Planning' folder containing 'roadmap' or 'timeline' in titles, created after April 1st, then analyze them for any blockers or dependencies and present a consolidated risk assessment with references to the source notes"**
- **"Find all notes tagged with 'research' or 'analysis' from the last month, scan their content for incomplete sections or open questions, then cross-reference with my 'Team/Expertise' notes to suggest which colleagues could help address each gap"**
- **"Get the complete content of meeting notes from 'Leadership/Quarterly' containing 'budget' or 'headcount', analyze them for action items assigned to my department, and create a chronological timeline with source note references"**
The server's advanced filtering, regex support, and full content retrieval capabilities allow agents to perform nuanced knowledge work that would take hours manually.
## Prerequisites
1. Install the [Obsidian Local REST API](https://github.com/coddingtonbear/obsidian-local-rest-api) plugin in your Obsidian vault
2. Configure and enable the plugin in Obsidian settings
3. Note the API URL (default: `https://localhost:27124`) and API key if you've set one
## Installation
### From PyPI (Recommended)
```bash
# Install from PyPI
pip install obsidian-api-mcp-server
# Or with uv
uv pip install obsidian-api-mcp-server
```
### Add to MCP Configuration
Add to your MCP client configuration (e.g., Claude Desktop):
```json
{
"mcpServers": {
"obsidian-api-mcp-server": {
"command": "uvx",
"args": [
"--from",
"obsidian-api-mcp-server>=1.0.1",
"obsidian-api-mcp"
],
"env": {
"OBSIDIAN_API_URL": "https://localhost:27124",
"OBSIDIAN_API_KEY": "your-api-key-here"
}
}
}
}
```
### From Source (Development)
```bash
# Clone the repository
git clone https://github.com/pmmvr/obsidian-api-mcp-server
cd obsidian-api-mcp-server
# Install with uv
uv pip install -e .
# Or with pip
pip install -e .
```
## Configuration
Set environment variables for the Obsidian API:
```bash
# Required: Obsidian API URL (HTTPS by default)
export OBSIDIAN_API_URL="https://localhost:27124" # Default
# Optional: API key if you've configured authentication
export OBSIDIAN_API_KEY="your-api-key-here"
```
**Important Security Note**: Avoid hardcoding your `OBSIDIAN_API_KEY` directly into scripts or committing it to version control. Consider using a `.env` file (which is included in the `.gitignore` of this project) and a library like `python-dotenv` to manage your API key, or use environment variables managed by your operating system or shell.
**Note**: The server defaults to HTTPS and disables SSL certificate verification for self-signed certificates commonly used with local Obsidian instances. For HTTP connections, set `OBSIDIAN_API_URL="http://localhost:27123"`.
## Usage
Run the MCP server:
```bash
obsidian-mcp
```
## Available Tools
The server provides three powerful tools:
1. **`search_vault`** - Advanced search with flexible filters and full content retrieval:
- `query` - Text or regex search across note content (optional)
- `query_type` - Search type: "text" (default) or "regex"
- `search_in_path` - Limit search to specific folder path
- `title_contains` - Filter by text in note titles (string, array, or JSON string)
- `title_match_mode` - How to match multiple terms: "any" (OR) or "all" (AND)
- `tag` - Filter by tag (string, array, or JSON string - searches frontmatter and inline #tags)
- `tag_match_mode` - How to match multiple tags: "any" (OR) or "all" (AND)
- `context_length` - Amount of content to return (set high for full content)
- `include_content` - Boolean to retrieve complete content of all matching notes
- `created_since/until` - Filter by creation date
- `modified_since/until` - Filter by modification date
- `page_size` - Results per page
- `max_matches_per_file` - Limit matches per note
**Key Features**:
- When no `query` is provided, automatically returns full content for filter-only searches
- `include_content=True` forces full content retrieval for any search
- Supports regex patterns for complex text matching (OR conditions, case-insensitive search, etc.)
2. **`get_note_content`** - Retrieve complete content and metadata of a specific note by path
3. **`browse_vault_structure`** - Navigate vault directory structure efficiently:
- `path` - Directory to browse (defaults to vault root)
- `include_files` - Boolean to include files (default: False, folders only for speed)
- `recursive` - Boolean to browse all nested directories
## Example Use Cases
### Basic Searches
1. **Find notes by title in a specific folder:**
```
search_vault(
search_in_path="Work/Projects/",
title_contains="meeting"
)
```
2. **Find notes with multiple title terms (OR logic):**
```
search_vault(
title_contains=["foo", "bar", "fizz", "buzz"],
title_match_mode="any" # Default
)
```
3. **Find notes with ALL title terms (AND logic):**
```
search_vault(
title_contains=["project", "2024"],
title_match_mode="all"
)
```
4. **Get all recent notes with full content:**
```
search_vault(
modified_since="2025-05-20",
include_content=True
)
```
5. **Text search with context:**
```
search_vault(
query="API documentation",
search_in_path="Engineering/",
context_length=500
)
```
6. **Search by tag:**
```
search_vault(
tag="project"
)
```
7. **Regex search for OR conditions:**
```
search_vault(
query="foo|bar",
query_type="regex",
search_in_path="Projects/"
)
```
8. **Regex search for tasks assigned to specific people:**
```
search_vault(
query="(TODO|FIXME|ACTION).*@(alice|bob)",
query_type="regex",
search_in_path="Work/Meetings/"
)
```
### Advanced Multi-Step Workflows
These examples demonstrate how agents can chain together sophisticated knowledge discovery tasks:
9. **Strategic Project Analysis:**
```
# Step 1: Get all project documentation
search_vault(
search_in_path="Projects/Infrastructure/",
title_contains=["planning", "requirements", "architecture"],
title_match_mode="any",
include_content=True
)
# Step 2: Find related technical discussions
search_vault(
tag=["infrastructure", "technical-debt"],
tag_match_mode="any",
modified_since="2025-04-01",
include_content=True
)
```
*Agent can then analyze dependencies, identify risks, and recommend resource allocation*
10. **Meeting Action Item Mining:**
```
# Get all recent meeting notes with full content
search_vault(
search_in_path="Meetings/",
title_contains=["standup", "planning", "retrospective"],
title_match_mode="any",
created_since="2025-05-01",
include_content=True
)
```
*Agent scans content for action items, extracts assignments, and creates chronological tracking*
11. **Research Gap Analysis:**
```
# Find research notes with questions or gaps
search_vault(
query="(TODO|QUESTION|INVESTIGATE|UNCLEAR)",
query_type="regex",
tag=["research", "analysis"],
tag_match_mode="any",
include_content=True
)
# Cross-reference with team expertise
search_vault(
search_in_path="Team/",
tag=["expertise", "skills"],
tag_match_mode="any",
include_content=True
)
```
*Agent identifies knowledge gaps and suggests team members who could help*
12. **Vault Structure Exploration:**
```
# Quick organizational overview
browse_vault_structure(recursive=True)
# Deep dive into specific areas
browse_vault_structure(
path="Projects/CurrentSprint/",
include_files=True,
recursive=True
)
```
13. **Tag-Based Knowledge Mapping:**
```
# Find notes with multiple tags (AND logic)
search_vault(
tag=["project", "urgent"],
tag_match_mode="all",
include_content=True
)
# Find notes with any relevant tags (OR logic)
search_vault(
tag=["architecture", "design", "implementation"],
tag_match_mode="any",
modified_since="2025-04-15"
)
```
## Development
```bash
# Install with test dependencies
uv pip install -e ".[test]"
# Run the server
python -m obsidian_mcp.server
# Run tests
uv run behave features/blackbox_tests.feature
# Or use the test runner
python run_tests.py
```
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
```
--------------------------------------------------------------------------------
/behave.ini:
--------------------------------------------------------------------------------
```
[behave]
paths = features
show_snippets = false
show_skipped = false
format = pretty
logging_level = WARNING
default_tags = ~@skip
```
--------------------------------------------------------------------------------
/features/environment.py:
--------------------------------------------------------------------------------
```python
import os
import asyncio
from unittest.mock import patch
def before_all(context):
context.loop = asyncio.new_event_loop()
asyncio.set_event_loop(context.loop)
def after_all(context):
context.loop.close()
def before_scenario(context, scenario):
os.environ["OBSIDIAN_API_URL"] = "https://localhost:27124"
os.environ["OBSIDIAN_API_KEY"] = "test-api-key"
def after_scenario(context, scenario):
for key in ["OBSIDIAN_API_URL", "OBSIDIAN_API_KEY"]:
if key in os.environ:
del os.environ[key]
```
--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------
```yaml
name: Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v2
- name: Install dependencies
run: |
uv pip install --system -e ".[test]"
- name: Run tests
run: |
uv run behave features/blackbox_tests.feature --summary
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/utils.py:
--------------------------------------------------------------------------------
```python
import re
from datetime import datetime, timedelta
from dateutil import parser as date_parser
def format_timestamp(timestamp_ms: int) -> str:
dt = datetime.fromtimestamp(timestamp_ms / 1000)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def parse_date_filter(date_str: str) -> datetime:
if "ago" in date_str.lower():
if "week" in date_str.lower():
weeks = int(re.search(r'(\d+)', date_str).group(1))
return datetime.now() - timedelta(weeks=weeks)
elif "day" in date_str.lower():
days = int(re.search(r'(\d+)', date_str).group(1))
return datetime.now() - timedelta(days=days)
elif date_str.lower() == "today":
return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
else:
return date_parser.parse(date_str)
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "obsidian-api-mcp-server"
version = "1.0.1"
description = "MCP server enabling AI agents to perform natural knowledge discovery and analysis across Obsidian vaults"
readme = "README.md"
authors = [
{name = "pmmvr", email = "[email protected]"}
]
license = {text = "MIT"}
keywords = ["obsidian", "mcp", "ai", "knowledge-management", "rest-api"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Text Processing :: General",
"Topic :: Office/Business :: Groupware",
]
requires-python = ">=3.10"
dependencies = [
"mcp>=1.9.1",
"mcp[cli]",
"httpx>=0.27.0",
"pydantic>=2.0.0",
"python-dateutil>=2.9.0",
"python-dotenv>=1.0.0",
]
[project.optional-dependencies]
test = [
"behave>=1.2.6",
"pytest-mock>=3.12.0",
"responses>=0.24.0",
]
[project.scripts]
obsidian-api-mcp = "obsidian_mcp.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/obsidian_mcp"]
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.dependencies]
dev = [
"build>=1.2.2.post1",
"twine>=6.1.0",
]
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/client.py:
--------------------------------------------------------------------------------
```python
import os
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import httpx
class ObsidianClient:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.headers = {}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
async def _request(self, method: str, endpoint: str, **kwargs) -> Any:
headers = kwargs.pop("headers", {})
headers.update(self.headers)
async with httpx.AsyncClient(verify=False) as client:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response = await client.request(method, url, headers=headers, **kwargs)
if response.status_code == 401:
raise Exception("Obsidian API requires authentication. Please set OBSIDIAN_API_KEY environment variable.")
response.raise_for_status()
return response.json()
async def search_simple(self, query: str, context_length: int = 100) -> List[Dict[str, Any]]:
return await self._request(
"POST",
"/search/simple/",
params={"query": query, "contextLength": context_length}
)
async def get_note_metadata(self, path: str) -> Dict[str, Any]:
encoded_path = quote(path, safe='/')
return await self._request(
"GET",
f"/vault/{encoded_path}",
headers={"Accept": "application/vnd.olrapi.note+json"}
)
async def list_directory(self, path: str = "") -> List[str]:
if path:
# Just URL encode the path and try it directly
encoded_path = quote(path, safe='/')
endpoint = f"/vault/{encoded_path}/"
else:
endpoint = "/vault/"
result = await self._request("GET", endpoint)
return result.get("files", [])
async def search_advanced(self, jsonlogic_query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Execute advanced search using JsonLogic query format."""
return await self._request(
"POST",
"/search/",
json=jsonlogic_query,
headers={"Content-Type": "application/vnd.olrapi.jsonlogic+json"}
)
async def browse_vault(self, base_path: str = "", include_files: bool = False, recursive: bool = False, max_depth: int = 10) -> List[str]:
"""Browse vault structure with flexible filtering options."""
if not recursive:
all_items = await self.list_directory(base_path)
if not include_files:
# Filter to only show directories (items ending with '/')
return [item for item in all_items if item.endswith('/')]
return all_items
all_items = []
async def _recursive_list(current_path: str, depth: int):
if depth > max_depth:
return
try:
items = await self.list_directory(current_path)
for item in items:
if current_path:
full_path = f"{current_path}/{item}"
else:
full_path = item
# Apply file filtering
if include_files or item.endswith('/'):
all_items.append(full_path)
# If it's a directory, recurse into it
if item.endswith('/'):
await _recursive_list(full_path.rstrip('/'), depth + 1)
except Exception:
# Skip directories we can't access
pass
await _recursive_list(base_path, 0)
return all_items
async def list_all_files(self, base_path: str = "", max_depth: int = 10, max_files: int = 5000) -> List[str]:
"""Recursively list all files in the vault with safety limits."""
all_files = []
async def _recursive_list(current_path: str, depth: int):
if depth > max_depth or len(all_files) >= max_files:
return
try:
files = await self.list_directory(current_path)
for file in files:
if len(all_files) >= max_files:
return
if current_path:
full_path = f"{current_path}/{file.rstrip('/')}"
else:
full_path = file.rstrip('/')
if file.endswith('/'):
# It's a directory, recurse into it
await _recursive_list(full_path, depth + 1)
else:
# It's a file, add it to our list
all_files.append(full_path)
except Exception:
# Skip directories we can't access
pass
await _recursive_list(base_path, 0)
return all_files
def create_client() -> ObsidianClient:
base_url = os.getenv("OBSIDIAN_API_URL", "https://localhost:27124")
api_key = os.getenv("OBSIDIAN_API_KEY")
return ObsidianClient(base_url, api_key)
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/server.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Any, Dict, Optional, Union, List
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
load_dotenv()
from obsidian_mcp.client import create_client
from obsidian_mcp.search import SearchProcessor
mcp = FastMCP("obsidian-mcp")
client = create_client()
search_processor = SearchProcessor(client)
@mcp.tool(
annotations={
"title": "Search Obsidian Vault",
"readOnlyHint": True,
"openWorldHint": False
}
)
async def search_vault(
query: Optional[str] = None,
query_type: str = "text",
search_in_path: Optional[str] = None,
title_contains: Optional[Any] = None,
title_match_mode: str = "any",
tag: Optional[Any] = None,
tag_match_mode: str = "any",
context_length: int = 100,
include_content: bool = False,
modified_since: Optional[str] = None,
modified_until: Optional[str] = None,
created_since: Optional[str] = None,
created_until: Optional[str] = None,
page_size: int = 50,
page: int = 1,
max_matches_per_file: int = 5
) -> Dict[str, Any]:
"""
Search Obsidian vault for notes matching criteria.
Args:
query: Text or regex pattern to search for
query_type: "text" or "regex"
search_in_path: Limit search to specific folder
title_contains: Filter by title (string or array)
title_match_mode: "any" or "all" for multiple title terms
tag: Filter by tag (string, array, or JSON string like title_contains)
tag_match_mode: "any" or "all" for multiple tag terms
context_length: Characters of context around matches
include_content: Return full note content
modified_since/until: Filter by modification date (YYYY-MM-DD)
created_since/until: Filter by creation date (YYYY-MM-DD)
page_size/page: Pagination controls
max_matches_per_file: Limit matches per file
"""
parsed_title_contains = title_contains
if title_contains:
if isinstance(title_contains, list):
parsed_title_contains = title_contains
# Handle title_contains if JSON string representation of list
elif isinstance(title_contains, str) and title_contains.strip().startswith('['):
try:
parsed_title_contains = json.loads(title_contains)
except json.JSONDecodeError:
pass
# Handle tag in multiple formats (same logic as title_contains)
parsed_tag = tag
if tag:
if isinstance(tag, list):
parsed_tag = tag
elif isinstance(tag, str) and tag.strip().startswith('['):
try:
parsed_tag = json.loads(tag)
except json.JSONDecodeError:
pass
return await search_processor.search(
query=query,
query_type=query_type,
search_in_path=search_in_path,
title_contains=parsed_title_contains,
title_match_mode=title_match_mode,
tag=parsed_tag,
tag_match_mode=tag_match_mode,
context_length=context_length,
include_content=include_content,
modified_since=modified_since,
modified_until=modified_until,
created_since=created_since,
created_until=created_until,
page_size=page_size,
page=page,
max_matches_per_file=max_matches_per_file
)
@mcp.tool(
annotations={
"title": "Get Obsidian Note Content",
"readOnlyHint": True,
"openWorldHint": False
}
)
async def get_note_content(path: str) -> Dict[str, Any]:
"""
Get the full content and metadata of a specific note by path.
Args:
path: Full path to the note within the vault
"""
try:
note_data = await client.get_note_metadata(path)
return {
"success": True,
"data": note_data
}
except Exception as e:
return {
"success": False,
"error": f"Failed to get note at path '{path}': {str(e)}",
"data": None
}
@mcp.tool(
annotations={
"title": "Browse Obsidian Vault Structure",
"readOnlyHint": True,
"openWorldHint": False
}
)
async def browse_vault_structure(path: str = "", include_files: bool = False, recursive: bool = False) -> Dict[str, Any]:
"""
Browse vault directory structure.
Args:
path: Path to browse from (defaults to vault root)
include_files: Include files in listing (default: False, folders only)
recursive: List nested contents recursively
"""
try:
# Remove leading/trailing quotes and whitespace
clean_path = path.strip().strip('"\'')
items = await client.browse_vault(clean_path, include_files, recursive)
directories = [item for item in items if item.endswith('/')]
files = [item for item in items if not item.endswith('/')]
return {
"success": True,
"path": clean_path,
"include_files": include_files,
"recursive": recursive,
"directories": directories,
"files": files if include_files else [],
"total_directories": len(directories),
"total_files": len(files) if include_files else 0,
"total_items": len(items)
}
except Exception as e:
return {
"success": False,
"error": f"Failed to browse vault structure for path '{path}': {str(e)}",
"path": path,
"include_files": include_files,
"recursive": recursive,
"directories": [],
"files": [],
"total_directories": 0,
"total_files": 0,
"total_items": 0
}
def main():
mcp.run(transport="stdio")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/search.py:
--------------------------------------------------------------------------------
```python
import os
import math
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obsidian_mcp.client import ObsidianClient
from obsidian_mcp.utils import format_timestamp, parse_date_filter
class SearchProcessor:
"""
Processes search queries against an Obsidian vault, handling various filters,
pagination, and result formatting.
"""
def __init__(self, client: ObsidianClient):
self.client = client
async def _get_file_metadata(self, file_path: str, include_content_for_tags: bool = False) -> Optional[Dict[str, Any]]:
try:
note_metadata = await self.client.get_note_metadata(file_path)
result = {
"mtime": note_metadata["stat"]["mtime"],
"ctime": note_metadata["stat"]["ctime"]
}
# Include content and tags if needed for tag filtering
if include_content_for_tags:
result["content"] = note_metadata.get("content", "")
result["tags"] = note_metadata.get("frontmatter", {}).get("tags", [])
return result
except Exception:
return None
def _apply_filters(self, file_path: str, metadata: Dict[str, Any],
search_path_prefix: str, title_contains: Optional[Union[str, List[str]]], title_match_mode: str,
tag: Optional[Union[str, List[str]]], tag_match_mode: str, since_date: Optional[datetime], until_date: Optional[datetime],
created_since_date: Optional[datetime], created_until_date: Optional[datetime]) -> bool:
"""
Applies various filters to a file based on its path, metadata, and specified criteria.
Returns True if the file passes all filters, False otherwise.
"""
if search_path_prefix and not file_path.startswith(search_path_prefix):
return False
if title_contains:
filename = os.path.basename(file_path).lower()
if isinstance(title_contains, str):
if title_contains.lower() not in filename:
return False
else:
terms = [term.lower() for term in title_contains]
if title_match_mode == "all":
if not all(term in filename for term in terms):
return False
else:
if not any(term in filename for term in terms):
return False
# Check tag filter - tags are stored in frontmatter or content
if tag:
tags_found = metadata.get("tags", [])
# Also check for inline tags in content if available
content = metadata.get("content", "")
if content:
# Look for #tag format in content
import re
inline_tags = re.findall(r'#(\w+)', content)
tags_found.extend(inline_tags)
# Convert to lowercase for case-insensitive matching
tags_found = [t.lower() for t in tags_found]
# Handle multiple tags with AND/OR logic
if isinstance(tag, str):
# Single tag
if tag.lower() not in tags_found:
return False
else:
# Multiple tags - apply OR/AND logic
tags_to_match = [t.lower() for t in tag]
if tag_match_mode == "all":
# ALL tags must be present (AND logic)
if not all(tag_term in tags_found for tag_term in tags_to_match):
return False
else: # tag_match_mode == "any" (default)
# ANY tag must be present (OR logic)
if not any(tag_term in tags_found for tag_term in tags_to_match):
return False
file_mod_time = datetime.fromtimestamp(metadata["mtime"] / 1000)
if since_date and file_mod_time < since_date:
return False
if until_date and file_mod_time > until_date:
return False
file_created_time = datetime.fromtimestamp(metadata["ctime"] / 1000)
if created_since_date and file_created_time < created_since_date:
return False
if created_until_date and file_created_time > created_until_date:
return False
return True
def _process_matches(self, api_result: Dict[str, Any], max_matches_per_file: int) -> List[Dict[str, Any]]:
matches = []
for match in api_result.get("matches", []):
matches.append({
"context": match.get("context", ""),
"match_start": match.get("match", {}).get("start", 0),
"match_end": match.get("match", {}).get("end", 0)
})
return matches[:max_matches_per_file]
def _create_result_item(self, file_path: str, matches: List[Dict[str, Any]],
metadata: Dict[str, Any], score: int) -> Dict[str, Any]:
return {
"path": file_path,
"filename": os.path.basename(file_path),
"matches": matches,
"modified_time": format_timestamp(metadata["mtime"]),
"created_time": format_timestamp(metadata["ctime"]),
"score": score
}
def _paginate_results(self, results: List[Dict[str, Any]], page: int, page_size: int) -> tuple:
total_files_found = len(results)
total_pages = math.ceil(total_files_found / page_size)
start_index = (page - 1) * page_size
end_index = start_index + page_size
paginated_results = results[start_index:end_index]
also_found_in_files = None
if total_pages > 1:
# Collect filenames from other pages if pagination is active
paginated_paths = {result["path"] for result in paginated_results}
also_found_in_files = [
result["filename"] for result in results
if result["path"] not in paginated_paths
]
return paginated_results, total_files_found, total_pages, also_found_in_files
async def search(self, query: Optional[str] = None, query_type: str = "text", search_in_path: Optional[str] = None,
title_contains: Optional[Union[str, List[str]]] = None, title_match_mode: str = "any",
tag: Optional[Union[str, List[str]]] = None, tag_match_mode: str = "any",
context_length: int = 100, include_content: bool = False,
modified_since: Optional[str] = None, modified_until: Optional[str] = None,
created_since: Optional[str] = None, created_until: Optional[str] = None,
page_size: int = 50, page: int = 1, max_matches_per_file: int = 5) -> Dict[str, Any]:
date_filters = self._parse_date_filters(modified_since, modified_until, created_since, created_until)
search_path_prefix = self._normalize_search_path(search_in_path)
try:
# Determine the base path for API search if a prefix is provided
base_search_path = search_path_prefix.rstrip('/') if search_path_prefix else ""
api_results = await self._get_api_results(query, query_type, context_length, base_search_path)
filtered_results, total_matches_count = await self._process_results(
api_results, search_path_prefix, title_contains, title_match_mode, tag, tag_match_mode, date_filters, max_matches_per_file, query, include_content
)
filtered_results.sort(key=lambda x: x["modified_time"], reverse=True)
paginated_results, total_files_found, total_pages, also_found_in_files = self._paginate_results(
filtered_results, page, page_size
)
message = self._create_response_message(
total_matches_count, total_files_found, page, total_pages,
len(paginated_results), search_path_prefix
)
return {
"success": True,
"message": message,
"results": paginated_results,
"total_files_found": total_files_found,
"total_matches_found": total_matches_count,
"current_page": page,
"page_size": page_size,
"total_pages": total_pages,
"also_found_in_files": also_found_in_files
}
except Exception as e:
return self._create_error_response(str(e), page, page_size)
def _parse_date_filters(self, modified_since: Optional[str], modified_until: Optional[str],
created_since: Optional[str], created_until: Optional[str]) -> Dict[str, Optional[datetime]]:
return {
"since_date": parse_date_filter(modified_since) if modified_since else None,
"until_date": parse_date_filter(modified_until) if modified_until else None,
"created_since_date": parse_date_filter(created_since) if created_since else None,
"created_until_date": parse_date_filter(created_until) if created_until else None
}
def _normalize_search_path(self, search_in_path: Optional[str]) -> str:
if not search_in_path:
return ""
search_path_prefix = search_in_path.strip("/")
return search_path_prefix + "/" if search_path_prefix else ""
async def _get_api_results(self, query: Optional[str], query_type: str, context_length: int, search_path: str = "") -> List[Dict[str, Any]]:
if query and query.strip():
if query_type == "regex":
return await self._execute_regex_search(query, search_path)
else:
# Default to simple text search if query type is not regex or not specified
return await self.client.search_simple(query, context_length)
else:
# If no query is provided, list all markdown files in the specified path
all_files = await self.client.list_all_files(search_path, max_depth=8, max_files=1000)
return [
{
"filename": file_path,
"score": 0,
"matches": []
}
for file_path in all_files
if file_path.endswith('.md')
]
async def _execute_regex_search(self, regex_pattern: str, search_path: str = "") -> List[Dict[str, Any]]:
import re
try:
if not regex_pattern.startswith('(?'):
# Default to case-insensitive regex search if no flags are provided
case_insensitive_pattern = f"(?i){regex_pattern}"
else:
case_insensitive_pattern = regex_pattern
regex = re.compile(case_insensitive_pattern)
all_files = await self.client.list_all_files(search_path, max_depth=8, max_files=1000)
md_files = [f for f in all_files if f.endswith('.md')]
formatted_results = []
for file_path in md_files:
try:
note_data = await self.client.get_note_metadata(file_path)
content = note_data.get("content", "")
matches = list(regex.finditer(content))
if matches:
match_data = []
for match in matches[:5]:
# Create a context window around each match
start = max(0, match.start() - 50)
end = min(len(content), match.end() + 50)
context = content[start:end]
match_data.append({
"context": context,
"match": {
"start": match.start() - start,
"end": match.end() - start
}
})
formatted_results.append({
"filename": file_path,
"score": len(matches),
"matches": match_data
})
except Exception:
continue
return formatted_results
except Exception as e:
print(f"Regex search failed: {e}, falling back to simple search")
return await self.client.search_simple(regex_pattern, 100)
async def _process_results(self, api_results: List[Dict[str, Any]], search_path_prefix: str,
title_contains: Optional[Union[str, List[str]]], title_match_mode: str, tag: Optional[Union[str, List[str]]], tag_match_mode: str,
date_filters: Dict[str, Optional[datetime]], max_matches_per_file: int, query: Optional[str], include_content: bool = False) -> tuple:
filtered_results = []
total_matches_count = 0
for api_result in api_results:
file_path = api_result["filename"]
# Include content if we need to filter by tags
metadata = await self._get_file_metadata(file_path, include_content_for_tags=bool(tag))
if not metadata:
continue
if not self._apply_filters(
file_path, metadata, search_path_prefix, title_contains, title_match_mode, tag, tag_match_mode,
date_filters["since_date"], date_filters["until_date"],
date_filters["created_since_date"], date_filters["created_until_date"]
):
continue
all_matches = api_result.get("matches", [])
matches = self._process_matches(api_result, max_matches_per_file)
total_matches_count += len(all_matches)
if include_content or (query is None or query.strip() == ""):
# If include_content is true, or if there's no search query (listing all files),
# attempt to fetch and include the full note content.
try:
full_content = await self.client.get_note_metadata(file_path)
content_text = full_content.get("content", "")
if content_text:
matches = [{
"context": content_text,
"match_start": 0,
"match_end": len(content_text)
}]
except Exception:
pass
if matches or (query is None or query.strip() == ""):
result_item = self._create_result_item(
file_path, matches, metadata, api_result.get("score", 0)
)
filtered_results.append(result_item)
return filtered_results, total_matches_count
def _create_response_message(self, total_matches_count: int, total_files_found: int,
page: int, total_pages: int, current_page_files: int,
search_path_prefix: str) -> str:
message = (f"Found {total_matches_count} matches across {total_files_found} files. "
f"Showing page {page} of {total_pages} ({current_page_files} files on this page).")
if search_path_prefix:
message += f" Searched in path: {search_path_prefix}"
return message
def _create_error_response(self, error_msg: str, page: int, page_size: int) -> Dict[str, Any]:
return {
"success": False,
"message": f"Search failed: {error_msg}",
"results": [],
"total_files_found": 0,
"total_matches_found": 0,
"current_page": page,
"page_size": page_size,
"total_pages": 0
}
```
--------------------------------------------------------------------------------
/features/steps/blackbox_steps.py:
--------------------------------------------------------------------------------
```python
from unittest.mock import patch
from behave import given, when, then
import json
@given('the Obsidian API is available')
def step_obsidian_api_available(context):
context.base_url = "https://localhost:27124"
context.api_key = "test-api-key"
@given('the vault contains notes with content "{content}"')
def step_vault_contains_content(context, content):
context.mock_search_results = [
{
"filename": "test-note.md",
"score": 100,
"matches": [
{
"context": f"Some text with {content} in the middle",
"match": {"start": 15, "end": 15 + len(content)}
}
]
}
]
@given('the vault has a directory structure with files and folders')
def step_vault_has_structure(context):
context.mock_api_files = ["daily/", "projects/", "README.md", "index.md"]
@given('the vault contains notes created on different dates')
def step_vault_notes_different_create_dates(context):
context.mock_files_list = ["old-note.md", "new-note.md"]
context.mock_metadata_responses = {
"old-note.md": {"stat": {"mtime": 1703462400000, "ctime": 1703462400000}}, # Dec 2023
"new-note.md": {"stat": {"mtime": 1704672000000, "ctime": 1704672000000}} # Jan 2024
}
@given('the vault contains notes with titles "{title1}", "{title2}", and "{title3}"')
def step_vault_notes_with_titles(context, title1, title2, title3):
context.mock_files_list = [f"{title1}.md", f"{title2}.md", f"{title3}.md"]
context.mock_metadata_base = {"stat": {"mtime": 1704067200000, "ctime": 1704067200000}}
@given('the vault contains notes in projects and daily directories')
def step_vault_notes_in_directories(context):
context.mock_files_list = ["projects/work.md", "projects/personal.md", "daily/2024-01-01.md", "other/random.md"]
context.mock_metadata_base = {"stat": {"mtime": 1704067200000, "ctime": 1704067200000}}
@given('the vault contains notes with "{content1}" and "{content2}"')
def step_vault_notes_with_content(context, content1, content2):
context.mock_files_list = ["note1.md", "note2.md"]
context.mock_note_contents = {
"note1.md": {"content": f"Some {content1} here", "stat": {"mtime": 1704067200000, "ctime": 1704067200000}},
"note2.md": {"content": f"Another {content2} there", "stat": {"mtime": 1704067200000, "ctime": 1704067200000}}
}
@given('the vault contains notes with tags "{tag1}" and "{tag2}"')
def step_vault_notes_with_tags(context, tag1, tag2):
context.mock_files_list = ["project-note.md", "meeting-note.md", "other-note.md"]
context.mock_tag_contents = {
"project-note.md": {
"content": f"This is a project note #{tag1}",
"frontmatter": {"tags": [tag1]},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
},
"meeting-note.md": {
"content": f"This is a meeting note #{tag2}",
"frontmatter": {"tags": [tag2]},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
},
"other-note.md": {
"content": "This note has no tags",
"frontmatter": {},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
}
}
@given('the vault contains notes with multiple tags')
def step_vault_notes_with_multiple_tags(context):
context.mock_files_list = ["urgent-project.md", "project-only.md", "urgent-only.md", "no-tags.md"]
context.mock_multi_tag_contents = {
"urgent-project.md": {
"content": "This is urgent project work #project #urgent",
"frontmatter": {"tags": ["project", "urgent"]},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
},
"project-only.md": {
"content": "This is project work #project",
"frontmatter": {"tags": ["project"]},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
},
"urgent-only.md": {
"content": "This is urgent #urgent",
"frontmatter": {"tags": ["urgent"]},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
},
"no-tags.md": {
"content": "This note has no tags",
"frontmatter": {},
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
}
}
@when('I call the search_vault tool with query "{query}"')
def step_call_search_tool(context, query):
from obsidian_mcp.server import search_vault
async def run_tool():
# Mock only the external HTTP calls to Obsidian API
with patch('httpx.AsyncClient.request') as mock_request:
# Set up mock responses for different API endpoints
def mock_api_response(method, url, **kwargs):
if '/search/simple/' in url:
# Mock search endpoint
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_search_results,
'raise_for_status': lambda *args, **kwargs: None
})()
return response
elif '/vault/' in url and not url.endswith('/'):
# Mock note metadata endpoint
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {
"stat": {"mtime": 1704067200000, "ctime": 1704067200000}
},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
else:
# Default response
response = type('MockResponse', (), {
'status_code': 404,
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
# Call the actual MCP tool function - this is blackbox interface
return await search_vault(query=query)
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call the browse_vault_structure tool with include_files True')
def step_call_browse_tool_with_files(context):
from obsidian_mcp.server import browse_vault_structure
async def run_tool():
# Mock only external HTTP calls to API
with patch('httpx.AsyncClient.request') as mock_request:
# Mock vault listing endpoint to return files and folders
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_api_files},
'raise_for_status': lambda *args, **kwargs: None
})()
mock_request.return_value = response
# Call actual MCP tool function with include_files=True
return await browse_vault_structure(include_files=True)
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call the get_note_content tool with path "{path}"')
def step_call_get_note_tool(context, path):
from obsidian_mcp.server import get_note_content
async def run_tool():
# Mock only external HTTP calls to API
with patch('httpx.AsyncClient.request') as mock_request:
if path == "missing-note.md":
# Mock 404 for missing note
def raise_error(*args, **kwargs):
raise Exception("Note not found")
response = type('MockResponse', (), {
'status_code': 404,
'raise_for_status': raise_error
})()
mock_request.return_value = response
else:
# Mock successful retrieval
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {
"content": "Daily note content for January 15th",
"stat": {"mtime": 1704067200000, "ctime": 1704067200000},
"frontmatter": {}
},
'raise_for_status': lambda *args, **kwargs: None
})()
mock_request.return_value = response
# Call actual tool function
return await get_note_content(path)
context.tool_result = context.loop.run_until_complete(run_tool())
@then('the tool should return successful results')
def step_verify_successful_results(context):
assert context.tool_result.get("success") is True
assert "results" in context.tool_result or "data" in context.tool_result
@then('the results should contain the searched content')
def step_verify_search_content(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) > 0
# Verify actual search result structure
result = context.tool_result["results"][0]
assert "matches" in result
assert len(result["matches"]) > 0
@then('the tool should return both files and folders')
def step_verify_files_and_folders_returned(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["directories"]) > 0 # Should have dir
assert len(context.tool_result["files"]) > 0 # Should have files when include_files=True
assert context.tool_result["include_files"] is True
@then('the tool should return an error')
def step_verify_error_result(context):
assert context.tool_result.get("success") is False
assert "error" in context.tool_result
@when('I call search_vault tool with created_since "{date}"')
def step_call_search_with_created_since(context, date):
from obsidian_mcp.server import search_vault
async def run_tool():
with patch('httpx.AsyncClient.request') as mock_request:
def mock_api_response(method, url, **kwargs):
if '/vault/' in url and not url.endswith('/'):
# Extract filename from URL to return correct metadata
filename = url.split('/')[-1]
if filename in context.mock_metadata_responses:
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_metadata_responses[filename],
'raise_for_status': lambda *args, **kwargs: None
})()
return response
# Default: return file list for filter-only search
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_files_list},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
return await search_vault(created_since=date)
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call search_vault tool with title_contains {title_list} and match mode "{mode}"')
def step_call_search_with_title_contains(context, title_list, mode):
from obsidian_mcp.server import search_vault
import json
# Parse the title list from string representation
title_contains = json.loads(title_list)
async def run_tool():
with patch('httpx.AsyncClient.request') as mock_request:
def mock_api_response(method, url, **kwargs):
if '/vault/' in url and not url.endswith('/'):
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_metadata_base,
'raise_for_status': lambda *args, **kwargs: None
})()
return response
# Return file list for filter-only search
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_files_list},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
return await search_vault(title_contains=title_contains, title_match_mode=mode)
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call search_vault tool with search_in_path "{path}"')
def step_call_search_with_path(context, path):
from obsidian_mcp.server import search_vault
async def run_tool():
with patch('httpx.AsyncClient.request') as mock_request:
def mock_api_response(method, url, **kwargs):
if '/vault/' in url and not url.endswith('/'):
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_metadata_base,
'raise_for_status': lambda *args, **kwargs: None
})()
return response
# Return file list for filter-only search
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_files_list},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
return await search_vault(search_in_path=path)
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call search_vault tool with regex "{pattern}"')
def step_call_search_with_regex(context, pattern):
from obsidian_mcp.server import search_vault
async def run_tool():
with patch('httpx.AsyncClient.request') as mock_request:
def mock_api_response(method, url, **kwargs):
if '/vault/' in url and not url.endswith('/'):
# Extract filename from URL to return appropiiate content
filename = url.split('/')[-1]
if filename in context.mock_note_contents:
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_note_contents[filename],
'raise_for_status': lambda *args, **kwargs: None
})()
return response
# Return file list for Regex search
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_files_list},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
return await search_vault(query=pattern, query_type="regex")
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call search_vault tool with tag "{tag}"')
def step_call_search_with_tag(context, tag):
from obsidian_mcp.server import search_vault
async def run_tool():
with patch('httpx.AsyncClient.request') as mock_request:
def mock_api_response(method, url, **kwargs):
if '/vault/' in url and not url.endswith('/'):
# Extract filename from URL to return appropriate content
filename = url.split('/')[-1]
if filename in context.mock_tag_contents:
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_tag_contents[filename],
'raise_for_status': lambda *args, **kwargs: None
})()
return response
# Return file list for tag search
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_files_list},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
return await search_vault(tag=tag)
context.tool_result = context.loop.run_until_complete(run_tool())
@when('I call search_vault tool with tags {tag_list} and match mode "{mode}"')
def step_call_search_with_multiple_tags(context, tag_list, mode):
from obsidian_mcp.server import search_vault
import json
# Parse the tag list from string representation
tags = json.loads(tag_list)
async def run_tool():
with patch('httpx.AsyncClient.request') as mock_request:
def mock_api_response(method, url, **kwargs):
if '/vault/' in url and not url.endswith('/'):
# Extract filename from URL to return appropriate content
filename = url.split('/')[-1]
if filename in context.mock_multi_tag_contents:
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: context.mock_multi_tag_contents[filename],
'raise_for_status': lambda *args, **kwargs: None
})()
return response
# Return file list for tag search
response = type('MockResponse', (), {
'status_code': 200,
'json': lambda *args, **kwargs: {"files": context.mock_files_list},
'raise_for_status': lambda *args, **kwargs: None
})()
return response
mock_request.side_effect = mock_api_response
return await search_vault(tag=tags, tag_match_mode=mode)
context.tool_result = context.loop.run_until_complete(run_tool())
@then('the tool should return only notes created after that date')
def step_verify_created_since_filter(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) == 1 # Only new-note.md should match
assert context.tool_result["results"][0]["path"] == "new-note.md"
@then('the tool should return notes matching either foo or bar')
def step_verify_title_or_match(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) == 2 # foo project.md and bar chart.md
paths = [result["path"] for result in context.tool_result["results"]]
assert "foo project.md" in paths
assert "bar chart.md" in paths
assert "baz notes.md" not in paths
@then('the tool should return only notes containing both foo and bar')
def step_verify_title_and_match(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) == 1 # Only "foo AND bar project.md"
assert context.tool_result["results"][0]["path"] == "foo and bar project.md"
@then('the tool should return only notes from projects directory')
def step_verify_path_filter(context):
assert context.tool_result["success"] is True
for result in context.tool_result["results"]:
assert result["path"].startswith("projects/")
@then('the tool should return notes matching the regex pattern')
def step_verify_regex_match(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) > 0 # Should find notes with foo OR bar content
@then('the tool should return only notes tagged with project')
def step_verify_tag_filter(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) == 1 # Only project-note.md should match
assert context.tool_result["results"][0]["path"] == "project-note.md"
@then('the tool should return notes with either project or urgent tags')
def step_verify_multiple_tags_or_filter(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) == 3 # urgent-project.md, project-only.md, urgent-only.md
paths = [result["path"] for result in context.tool_result["results"]]
assert "urgent-project.md" in paths
assert "project-only.md" in paths
assert "urgent-only.md" in paths
assert "no-tags.md" not in paths
@then('the tool should return only notes with both project and urgent tags')
def step_verify_multiple_tags_and_filter(context):
assert context.tool_result["success"] is True
assert len(context.tool_result["results"]) == 1 # Only urgent-project.md should match
assert context.tool_result["results"][0]["path"] == "urgent-project.md"
```