# Directory Structure
```
├── .github
│ └── workflows
│ └── test.yml
├── .gitignore
├── behave.ini
├── features
│ ├── blackbox_tests.feature
│ ├── environment.py
│ └── steps
│ └── blackbox_steps.py
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── obsidian_mcp
│ ├── client.py
│ ├── search.py
│ ├── server.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Byte-compiled / optimized / DLL files
2 | __pycache__/
3 | *.py[cod]
4 | *$py.class
5 | *.so
6 |
7 | # C extensions
8 | *.so
9 |
10 | # Distribution / packaging
11 | .Python
12 | build/
13 | develop-eggs/
14 | dist/
15 | downloads/
16 | eggs/
17 | .eggs/
18 | lib/
19 | lib64/
20 | parts/
21 | sdist/
22 | var/
23 | wheels/
24 | share/python-wheels/
25 | *.egg-info/
26 | .installed.cfg
27 | *.egg
28 | MANIFEST
29 |
30 | # PyInstaller
31 | *.manifest
32 | *.spec
33 |
34 | # Unit test / coverage reports
35 | htmlcov/
36 | .tox/
37 | .nox/
38 | .coverage
39 | .coverage.*
40 | .cache
41 | nosetests.xml
42 | coverage.xml
43 | *.cover
44 | *.py,cover
45 | .hypothesis/
46 | .pytest_cache/
47 | cover/
48 |
49 | # Environments
50 | .env
51 | .venv
52 | env/
53 | venv/
54 | ENV/
55 | env.bak/
56 | venv.bak/
57 |
58 | # mypy
59 | .mypy_cache/
60 | .dmypy.json
61 | dmypy.json
62 |
63 | # Pyre type checker
64 | .pyre/
65 |
66 | # pytype static type analyzer
67 | .pytype/
68 |
69 | # IDEs
70 | .idea/
71 | .vscode/
72 | *.swp
73 | *.swo
74 | *~
75 |
76 | # OS generated files
77 | .DS_Store
78 | .DS_Store?
79 | ._*
80 | .Spotlight-V100
81 | .Trashes
82 | ehthumbs.db
83 | Thumbs.db
84 |
85 | # Project specific
86 | .claude/
87 | *.output
88 | *.log
89 |
90 | # Test artifacts
91 | .behave_output/
92 | test-reports/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Obsidian MCP Server
2 |
3 | An MCP (Model Context Protocol) server that enables AI agents to perform sophisticated knowledge discovery and analysis across your Obsidian vault through the Local REST API plugin.
4 |
5 | <a href="https://glama.ai/mcp/servers/@pmmvr/obsidian-api-mcp-server">
6 | <img width="380" height="200" src="https://glama.ai/mcp/servers/@pmmvr/obsidian-api-mcp-server/badge" alt="Obsidian Server MCP server" />
7 | </a>
8 |
9 | ## Why This Matters
10 |
11 | This server transforms your Obsidian vault into a powerful knowledge base for AI agents, enabling complex multi-step workflows like:
12 |
13 | - **"Retrieve notes from my 'Projects/Planning' folder containing 'roadmap' or 'timeline' in titles, created after April 1st, then analyze them for any blockers or dependencies and present a consolidated risk assessment with references to the source notes"**
14 |
15 | - **"Find all notes tagged with 'research' or 'analysis' from the last month, scan their content for incomplete sections or open questions, then cross-reference with my 'Team/Expertise' notes to suggest which colleagues could help address each gap"**
16 |
17 | - **"Get the complete content of meeting notes from 'Leadership/Quarterly' containing 'budget' or 'headcount', analyze them for action items assigned to my department, and create a chronological timeline with source note references"**
18 |
19 | The server's advanced filtering, regex support, and full content retrieval capabilities allow agents to perform nuanced knowledge work that would take hours manually.
20 |
21 | ## Prerequisites
22 |
23 | 1. Install the [Obsidian Local REST API](https://github.com/coddingtonbear/obsidian-local-rest-api) plugin in your Obsidian vault
24 | 2. Configure and enable the plugin in Obsidian settings
25 | 3. Note the API URL (default: `https://localhost:27124`) and API key if you've set one
26 |
27 | ## Installation
28 |
29 | ### From PyPI (Recommended)
30 |
31 | ```bash
32 | # Install from PyPI
33 | pip install obsidian-api-mcp-server
34 |
35 | # Or with uv
36 | uv pip install obsidian-api-mcp-server
37 | ```
38 |
39 | ### Add to MCP Configuration
40 |
41 | Add to your MCP client configuration (e.g., Claude Desktop):
42 |
43 | ```json
44 | {
45 | "mcpServers": {
46 | "obsidian-api-mcp-server": {
47 | "command": "uvx",
48 | "args": [
49 | "--from",
50 | "obsidian-api-mcp-server>=1.0.1",
51 | "obsidian-api-mcp"
52 | ],
53 | "env": {
54 | "OBSIDIAN_API_URL": "https://localhost:27124",
55 | "OBSIDIAN_API_KEY": "your-api-key-here"
56 | }
57 | }
58 | }
59 | }
60 | ```
61 |
62 | ### From Source (Development)
63 |
64 | ```bash
65 | # Clone the repository
66 | git clone https://github.com/pmmvr/obsidian-api-mcp-server
67 | cd obsidian-api-mcp-server
68 |
69 | # Install with uv
70 | uv pip install -e .
71 |
72 | # Or with pip
73 | pip install -e .
74 | ```
75 |
76 | ## Configuration
77 |
78 | Set environment variables for the Obsidian API:
79 |
80 | ```bash
81 | # Required: Obsidian API URL (HTTPS by default)
82 | export OBSIDIAN_API_URL="https://localhost:27124" # Default
83 |
84 | # Optional: API key if you've configured authentication
85 | export OBSIDIAN_API_KEY="your-api-key-here"
86 | ```
87 |
88 | **Important Security Note**: Avoid hardcoding your `OBSIDIAN_API_KEY` directly into scripts or committing it to version control. Consider using a `.env` file (which is included in the `.gitignore` of this project) and a library like `python-dotenv` to manage your API key, or use environment variables managed by your operating system or shell.
89 |
90 | **Note**: The server defaults to HTTPS and disables SSL certificate verification for self-signed certificates commonly used with local Obsidian instances. For HTTP connections, set `OBSIDIAN_API_URL="http://localhost:27123"`.
91 |
92 | ## Usage
93 |
94 | Run the MCP server:
95 |
96 | ```bash
97 | obsidian-mcp
98 | ```
99 |
100 | ## Available Tools
101 |
102 | The server provides three powerful tools:
103 |
104 | 1. **`search_vault`** - Advanced search with flexible filters and full content retrieval:
105 | - `query` - Text or regex search across note content (optional)
106 | - `query_type` - Search type: "text" (default) or "regex"
107 | - `search_in_path` - Limit search to specific folder path
108 | - `title_contains` - Filter by text in note titles (string, array, or JSON string)
109 | - `title_match_mode` - How to match multiple terms: "any" (OR) or "all" (AND)
110 | - `tag` - Filter by tag (string, array, or JSON string - searches frontmatter and inline #tags)
111 | - `tag_match_mode` - How to match multiple tags: "any" (OR) or "all" (AND)
112 | - `context_length` - Amount of content to return (set high for full content)
113 | - `include_content` - Boolean to retrieve complete content of all matching notes
114 | - `created_since/until` - Filter by creation date
115 | - `modified_since/until` - Filter by modification date
116 | - `page_size` - Results per page
117 | - `max_matches_per_file` - Limit matches per note
118 |
119 | **Key Features**:
120 | - When no `query` is provided, automatically returns full content for filter-only searches
121 | - `include_content=True` forces full content retrieval for any search
122 | - Supports regex patterns for complex text matching (OR conditions, case-insensitive search, etc.)
123 |
124 | 2. **`get_note_content`** - Retrieve complete content and metadata of a specific note by path
125 |
126 | 3. **`browse_vault_structure`** - Navigate vault directory structure efficiently:
127 | - `path` - Directory to browse (defaults to vault root)
128 | - `include_files` - Boolean to include files (default: False, folders only for speed)
129 | - `recursive` - Boolean to browse all nested directories
130 |
131 | ## Example Use Cases
132 |
133 | ### Basic Searches
134 | 1. **Find notes by title in a specific folder:**
135 | ```
136 | search_vault(
137 | search_in_path="Work/Projects/",
138 | title_contains="meeting"
139 | )
140 | ```
141 |
142 | 2. **Find notes with multiple title terms (OR logic):**
143 | ```
144 | search_vault(
145 | title_contains=["foo", "bar", "fizz", "buzz"],
146 | title_match_mode="any" # Default
147 | )
148 | ```
149 |
150 | 3. **Find notes with ALL title terms (AND logic):**
151 | ```
152 | search_vault(
153 | title_contains=["project", "2024"],
154 | title_match_mode="all"
155 | )
156 | ```
157 |
158 | 4. **Get all recent notes with full content:**
159 | ```
160 | search_vault(
161 | modified_since="2025-05-20",
162 | include_content=True
163 | )
164 | ```
165 |
166 | 5. **Text search with context:**
167 | ```
168 | search_vault(
169 | query="API documentation",
170 | search_in_path="Engineering/",
171 | context_length=500
172 | )
173 | ```
174 |
175 | 6. **Search by tag:**
176 | ```
177 | search_vault(
178 | tag="project"
179 | )
180 | ```
181 |
182 | 7. **Regex search for OR conditions:**
183 | ```
184 | search_vault(
185 | query="foo|bar",
186 | query_type="regex",
187 | search_in_path="Projects/"
188 | )
189 | ```
190 |
191 | 8. **Regex search for tasks assigned to specific people:**
192 | ```
193 | search_vault(
194 | query="(TODO|FIXME|ACTION).*@(alice|bob)",
195 | query_type="regex",
196 | search_in_path="Work/Meetings/"
197 | )
198 | ```
199 |
200 | ### Advanced Multi-Step Workflows
201 |
202 | These examples demonstrate how agents can chain together sophisticated knowledge discovery tasks:
203 |
204 | 9. **Strategic Project Analysis:**
205 | ```
206 | # Step 1: Get all project documentation
207 | search_vault(
208 | search_in_path="Projects/Infrastructure/",
209 | title_contains=["planning", "requirements", "architecture"],
210 | title_match_mode="any",
211 | include_content=True
212 | )
213 |
214 | # Step 2: Find related technical discussions
215 | search_vault(
216 | tag=["infrastructure", "technical-debt"],
217 | tag_match_mode="any",
218 | modified_since="2025-04-01",
219 | include_content=True
220 | )
221 | ```
222 | *Agent can then analyze dependencies, identify risks, and recommend resource allocation*
223 |
224 | 10. **Meeting Action Item Mining:**
225 | ```
226 | # Get all recent meeting notes with full content
227 | search_vault(
228 | search_in_path="Meetings/",
229 | title_contains=["standup", "planning", "retrospective"],
230 | title_match_mode="any",
231 | created_since="2025-05-01",
232 | include_content=True
233 | )
234 | ```
235 | *Agent scans content for action items, extracts assignments, and creates chronological tracking*
236 |
237 | 11. **Research Gap Analysis:**
238 | ```
239 | # Find research notes with questions or gaps
240 | search_vault(
241 | query="(TODO|QUESTION|INVESTIGATE|UNCLEAR)",
242 | query_type="regex",
243 | tag=["research", "analysis"],
244 | tag_match_mode="any",
245 | include_content=True
246 | )
247 |
248 | # Cross-reference with team expertise
249 | search_vault(
250 | search_in_path="Team/",
251 | tag=["expertise", "skills"],
252 | tag_match_mode="any",
253 | include_content=True
254 | )
255 | ```
256 | *Agent identifies knowledge gaps and suggests team members who could help*
257 |
258 | 12. **Vault Structure Exploration:**
259 | ```
260 | # Quick organizational overview
261 | browse_vault_structure(recursive=True)
262 |
263 | # Deep dive into specific areas
264 | browse_vault_structure(
265 | path="Projects/CurrentSprint/",
266 | include_files=True,
267 | recursive=True
268 | )
269 | ```
270 |
271 | 13. **Tag-Based Knowledge Mapping:**
272 | ```
273 | # Find notes with multiple tags (AND logic)
274 | search_vault(
275 | tag=["project", "urgent"],
276 | tag_match_mode="all",
277 | include_content=True
278 | )
279 |
280 | # Find notes with any relevant tags (OR logic)
281 | search_vault(
282 | tag=["architecture", "design", "implementation"],
283 | tag_match_mode="any",
284 | modified_since="2025-04-15"
285 | )
286 | ```
287 |
288 | ## Development
289 |
290 | ```bash
291 | # Install with test dependencies
292 | uv pip install -e ".[test]"
293 |
294 | # Run the server
295 | python -m obsidian_mcp.server
296 |
297 | # Run tests
298 | uv run behave features/blackbox_tests.feature
299 | # Or use the test runner
300 | python run_tests.py
301 | ```
302 |
303 | ## License
304 |
305 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
```
--------------------------------------------------------------------------------
/behave.ini:
--------------------------------------------------------------------------------
```
1 | [behave]
2 | paths = features
3 | show_snippets = false
4 | show_skipped = false
5 | format = pretty
6 | logging_level = WARNING
7 | default_tags = ~@skip
```
--------------------------------------------------------------------------------
/features/environment.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import asyncio
3 | from unittest.mock import patch
4 |
5 |
6 | def before_all(context):
7 | context.loop = asyncio.new_event_loop()
8 | asyncio.set_event_loop(context.loop)
9 |
10 |
11 | def after_all(context):
12 | context.loop.close()
13 |
14 |
15 | def before_scenario(context, scenario):
16 | os.environ["OBSIDIAN_API_URL"] = "https://localhost:27124"
17 | os.environ["OBSIDIAN_API_KEY"] = "test-api-key"
18 |
19 |
20 | def after_scenario(context, scenario):
21 | for key in ["OBSIDIAN_API_URL", "OBSIDIAN_API_KEY"]:
22 | if key in os.environ:
23 | del os.environ[key]
```
--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Tests
2 |
3 | on:
4 | push:
5 | branches: [ main ]
6 | pull_request:
7 | branches: [ main ]
8 |
9 | jobs:
10 | test:
11 | runs-on: ubuntu-latest
12 | strategy:
13 | matrix:
14 | python-version: ["3.10", "3.11", "3.12"]
15 |
16 | steps:
17 | - uses: actions/checkout@v4
18 |
19 | - name: Set up Python ${{ matrix.python-version }}
20 | uses: actions/setup-python@v4
21 | with:
22 | python-version: ${{ matrix.python-version }}
23 |
24 | - name: Install uv
25 | uses: astral-sh/setup-uv@v2
26 |
27 | - name: Install dependencies
28 | run: |
29 | uv pip install --system -e ".[test]"
30 |
31 | - name: Run tests
32 | run: |
33 | uv run behave features/blackbox_tests.feature --summary
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/utils.py:
--------------------------------------------------------------------------------
```python
1 | import re
2 | from datetime import datetime, timedelta
3 | from dateutil import parser as date_parser
4 |
5 |
6 | def format_timestamp(timestamp_ms: int) -> str:
7 | dt = datetime.fromtimestamp(timestamp_ms / 1000)
8 | return dt.strftime("%Y-%m-%d %H:%M:%S")
9 |
10 |
11 | def parse_date_filter(date_str: str) -> datetime:
12 | if "ago" in date_str.lower():
13 | if "week" in date_str.lower():
14 | weeks = int(re.search(r'(\d+)', date_str).group(1))
15 | return datetime.now() - timedelta(weeks=weeks)
16 | elif "day" in date_str.lower():
17 | days = int(re.search(r'(\d+)', date_str).group(1))
18 | return datetime.now() - timedelta(days=days)
19 | elif date_str.lower() == "today":
20 | return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
21 | else:
22 | return date_parser.parse(date_str)
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "obsidian-api-mcp-server"
3 | version = "1.0.1"
4 | description = "MCP server enabling AI agents to perform natural knowledge discovery and analysis across Obsidian vaults"
5 | readme = "README.md"
6 | authors = [
7 | {name = "pmmvr", email = "[email protected]"}
8 | ]
9 | license = {text = "MIT"}
10 | keywords = ["obsidian", "mcp", "ai", "knowledge-management", "rest-api"]
11 | classifiers = [
12 | "Development Status :: 4 - Beta",
13 | "Intended Audience :: Developers",
14 | "License :: OSI Approved :: MIT License",
15 | "Programming Language :: Python :: 3",
16 | "Programming Language :: Python :: 3.10",
17 | "Programming Language :: Python :: 3.11",
18 | "Programming Language :: Python :: 3.12",
19 | "Topic :: Software Development :: Libraries :: Python Modules",
20 | "Topic :: Text Processing :: General",
21 | "Topic :: Office/Business :: Groupware",
22 | ]
23 |
24 | requires-python = ">=3.10"
25 | dependencies = [
26 | "mcp>=1.9.1",
27 | "mcp[cli]",
28 | "httpx>=0.27.0",
29 | "pydantic>=2.0.0",
30 | "python-dateutil>=2.9.0",
31 | "python-dotenv>=1.0.0",
32 | ]
33 |
34 | [project.optional-dependencies]
35 | test = [
36 | "behave>=1.2.6",
37 | "pytest-mock>=3.12.0",
38 | "responses>=0.24.0",
39 | ]
40 |
41 | [project.scripts]
42 | obsidian-api-mcp = "obsidian_mcp.server:main"
43 |
44 | [build-system]
45 | requires = ["hatchling"]
46 | build-backend = "hatchling.build"
47 |
48 | [tool.hatch.build.targets.wheel]
49 | packages = ["src/obsidian_mcp"]
50 |
51 | [tool.hatch.metadata]
52 | allow-direct-references = true
53 |
54 | [tool.hatch.dependencies]
55 | dev = [
56 | "build>=1.2.2.post1",
57 | "twine>=6.1.0",
58 | ]
59 |
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/client.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | from typing import Any, Dict, List, Optional
3 | from urllib.parse import quote
4 |
5 | import httpx
6 |
7 |
8 | class ObsidianClient:
9 | def __init__(self, base_url: str, api_key: Optional[str] = None):
10 | self.base_url = base_url.rstrip('/')
11 | self.headers = {}
12 | if api_key:
13 | self.headers["Authorization"] = f"Bearer {api_key}"
14 |
15 | async def _request(self, method: str, endpoint: str, **kwargs) -> Any:
16 | headers = kwargs.pop("headers", {})
17 | headers.update(self.headers)
18 |
19 | async with httpx.AsyncClient(verify=False) as client:
20 | url = f"{self.base_url}/{endpoint.lstrip('/')}"
21 | response = await client.request(method, url, headers=headers, **kwargs)
22 | if response.status_code == 401:
23 | raise Exception("Obsidian API requires authentication. Please set OBSIDIAN_API_KEY environment variable.")
24 | response.raise_for_status()
25 | return response.json()
26 |
27 | async def search_simple(self, query: str, context_length: int = 100) -> List[Dict[str, Any]]:
28 | return await self._request(
29 | "POST",
30 | "/search/simple/",
31 | params={"query": query, "contextLength": context_length}
32 | )
33 |
34 | async def get_note_metadata(self, path: str) -> Dict[str, Any]:
35 | encoded_path = quote(path, safe='/')
36 | return await self._request(
37 | "GET",
38 | f"/vault/{encoded_path}",
39 | headers={"Accept": "application/vnd.olrapi.note+json"}
40 | )
41 |
42 | async def list_directory(self, path: str = "") -> List[str]:
43 | if path:
44 | # Just URL encode the path and try it directly
45 | encoded_path = quote(path, safe='/')
46 | endpoint = f"/vault/{encoded_path}/"
47 | else:
48 | endpoint = "/vault/"
49 |
50 | result = await self._request("GET", endpoint)
51 | return result.get("files", [])
52 |
53 | async def search_advanced(self, jsonlogic_query: Dict[str, Any]) -> List[Dict[str, Any]]:
54 | """Execute advanced search using JsonLogic query format."""
55 | return await self._request(
56 | "POST",
57 | "/search/",
58 | json=jsonlogic_query,
59 | headers={"Content-Type": "application/vnd.olrapi.jsonlogic+json"}
60 | )
61 |
62 | async def browse_vault(self, base_path: str = "", include_files: bool = False, recursive: bool = False, max_depth: int = 10) -> List[str]:
63 | """Browse vault structure with flexible filtering options."""
64 | if not recursive:
65 | all_items = await self.list_directory(base_path)
66 | if not include_files:
67 | # Filter to only show directories (items ending with '/')
68 | return [item for item in all_items if item.endswith('/')]
69 | return all_items
70 |
71 | all_items = []
72 |
73 | async def _recursive_list(current_path: str, depth: int):
74 | if depth > max_depth:
75 | return
76 |
77 | try:
78 | items = await self.list_directory(current_path)
79 | for item in items:
80 | if current_path:
81 | full_path = f"{current_path}/{item}"
82 | else:
83 | full_path = item
84 |
85 | # Apply file filtering
86 | if include_files or item.endswith('/'):
87 | all_items.append(full_path)
88 |
89 | # If it's a directory, recurse into it
90 | if item.endswith('/'):
91 | await _recursive_list(full_path.rstrip('/'), depth + 1)
92 | except Exception:
93 | # Skip directories we can't access
94 | pass
95 |
96 | await _recursive_list(base_path, 0)
97 | return all_items
98 |
99 | async def list_all_files(self, base_path: str = "", max_depth: int = 10, max_files: int = 5000) -> List[str]:
100 | """Recursively list all files in the vault with safety limits."""
101 | all_files = []
102 |
103 | async def _recursive_list(current_path: str, depth: int):
104 | if depth > max_depth or len(all_files) >= max_files:
105 | return
106 |
107 | try:
108 | files = await self.list_directory(current_path)
109 | for file in files:
110 | if len(all_files) >= max_files:
111 | return
112 |
113 | if current_path:
114 | full_path = f"{current_path}/{file.rstrip('/')}"
115 | else:
116 | full_path = file.rstrip('/')
117 |
118 | if file.endswith('/'):
119 | # It's a directory, recurse into it
120 | await _recursive_list(full_path, depth + 1)
121 | else:
122 | # It's a file, add it to our list
123 | all_files.append(full_path)
124 | except Exception:
125 | # Skip directories we can't access
126 | pass
127 |
128 | await _recursive_list(base_path, 0)
129 | return all_files
130 |
131 |
132 | def create_client() -> ObsidianClient:
133 | base_url = os.getenv("OBSIDIAN_API_URL", "https://localhost:27124")
134 | api_key = os.getenv("OBSIDIAN_API_KEY")
135 | return ObsidianClient(base_url, api_key)
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/server.py:
--------------------------------------------------------------------------------
```python
1 | import json
2 | from typing import Any, Dict, Optional, Union, List
3 |
4 | from dotenv import load_dotenv
5 | from mcp.server.fastmcp import FastMCP
6 |
7 | load_dotenv()
8 |
9 | from obsidian_mcp.client import create_client
10 | from obsidian_mcp.search import SearchProcessor
11 |
12 | mcp = FastMCP("obsidian-mcp")
13 | client = create_client()
14 | search_processor = SearchProcessor(client)
15 |
16 | @mcp.tool(
17 | annotations={
18 | "title": "Search Obsidian Vault",
19 | "readOnlyHint": True,
20 | "openWorldHint": False
21 | }
22 | )
23 | async def search_vault(
24 | query: Optional[str] = None,
25 | query_type: str = "text",
26 | search_in_path: Optional[str] = None,
27 | title_contains: Optional[Any] = None,
28 | title_match_mode: str = "any",
29 | tag: Optional[Any] = None,
30 | tag_match_mode: str = "any",
31 | context_length: int = 100,
32 | include_content: bool = False,
33 | modified_since: Optional[str] = None,
34 | modified_until: Optional[str] = None,
35 | created_since: Optional[str] = None,
36 | created_until: Optional[str] = None,
37 | page_size: int = 50,
38 | page: int = 1,
39 | max_matches_per_file: int = 5
40 | ) -> Dict[str, Any]:
41 | """
42 | Search Obsidian vault for notes matching criteria.
43 |
44 | Args:
45 | query: Text or regex pattern to search for
46 | query_type: "text" or "regex"
47 | search_in_path: Limit search to specific folder
48 | title_contains: Filter by title (string or array)
49 | title_match_mode: "any" or "all" for multiple title terms
50 | tag: Filter by tag (string, array, or JSON string like title_contains)
51 | tag_match_mode: "any" or "all" for multiple tag terms
52 | context_length: Characters of context around matches
53 | include_content: Return full note content
54 | modified_since/until: Filter by modification date (YYYY-MM-DD)
55 | created_since/until: Filter by creation date (YYYY-MM-DD)
56 | page_size/page: Pagination controls
57 | max_matches_per_file: Limit matches per file
58 | """
59 | parsed_title_contains = title_contains
60 | if title_contains:
61 | if isinstance(title_contains, list):
62 | parsed_title_contains = title_contains
63 | # Handle title_contains if JSON string representation of list
64 | elif isinstance(title_contains, str) and title_contains.strip().startswith('['):
65 | try:
66 | parsed_title_contains = json.loads(title_contains)
67 | except json.JSONDecodeError:
68 | pass
69 |
70 | # Handle tag in multiple formats (same logic as title_contains)
71 | parsed_tag = tag
72 | if tag:
73 | if isinstance(tag, list):
74 | parsed_tag = tag
75 | elif isinstance(tag, str) and tag.strip().startswith('['):
76 | try:
77 | parsed_tag = json.loads(tag)
78 | except json.JSONDecodeError:
79 | pass
80 |
81 | return await search_processor.search(
82 | query=query,
83 | query_type=query_type,
84 | search_in_path=search_in_path,
85 | title_contains=parsed_title_contains,
86 | title_match_mode=title_match_mode,
87 | tag=parsed_tag,
88 | tag_match_mode=tag_match_mode,
89 | context_length=context_length,
90 | include_content=include_content,
91 | modified_since=modified_since,
92 | modified_until=modified_until,
93 | created_since=created_since,
94 | created_until=created_until,
95 | page_size=page_size,
96 | page=page,
97 | max_matches_per_file=max_matches_per_file
98 | )
99 |
100 | @mcp.tool(
101 | annotations={
102 | "title": "Get Obsidian Note Content",
103 | "readOnlyHint": True,
104 | "openWorldHint": False
105 | }
106 | )
107 | async def get_note_content(path: str) -> Dict[str, Any]:
108 | """
109 | Get the full content and metadata of a specific note by path.
110 |
111 | Args:
112 | path: Full path to the note within the vault
113 | """
114 | try:
115 | note_data = await client.get_note_metadata(path)
116 | return {
117 | "success": True,
118 | "data": note_data
119 | }
120 | except Exception as e:
121 | return {
122 | "success": False,
123 | "error": f"Failed to get note at path '{path}': {str(e)}",
124 | "data": None
125 | }
126 |
127 | @mcp.tool(
128 | annotations={
129 | "title": "Browse Obsidian Vault Structure",
130 | "readOnlyHint": True,
131 | "openWorldHint": False
132 | }
133 | )
134 | async def browse_vault_structure(path: str = "", include_files: bool = False, recursive: bool = False) -> Dict[str, Any]:
135 | """
136 | Browse vault directory structure.
137 |
138 | Args:
139 | path: Path to browse from (defaults to vault root)
140 | include_files: Include files in listing (default: False, folders only)
141 | recursive: List nested contents recursively
142 | """
143 | try:
144 | # Remove leading/trailing quotes and whitespace
145 | clean_path = path.strip().strip('"\'')
146 | items = await client.browse_vault(clean_path, include_files, recursive)
147 |
148 | directories = [item for item in items if item.endswith('/')]
149 | files = [item for item in items if not item.endswith('/')]
150 |
151 | return {
152 | "success": True,
153 | "path": clean_path,
154 | "include_files": include_files,
155 | "recursive": recursive,
156 | "directories": directories,
157 | "files": files if include_files else [],
158 | "total_directories": len(directories),
159 | "total_files": len(files) if include_files else 0,
160 | "total_items": len(items)
161 | }
162 | except Exception as e:
163 | return {
164 | "success": False,
165 | "error": f"Failed to browse vault structure for path '{path}': {str(e)}",
166 | "path": path,
167 | "include_files": include_files,
168 | "recursive": recursive,
169 | "directories": [],
170 | "files": [],
171 | "total_directories": 0,
172 | "total_files": 0,
173 | "total_items": 0
174 | }
175 |
176 | def main():
177 | mcp.run(transport="stdio")
178 |
179 | if __name__ == "__main__":
180 | main()
```
--------------------------------------------------------------------------------
/src/obsidian_mcp/search.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import math
3 | from typing import Any, Dict, List, Optional, Union
4 | from datetime import datetime
5 |
6 | import sys
7 | import os
8 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9 |
10 | from obsidian_mcp.client import ObsidianClient
11 | from obsidian_mcp.utils import format_timestamp, parse_date_filter
12 |
13 |
14 | class SearchProcessor:
15 | """
16 | Processes search queries against an Obsidian vault, handling various filters,
17 | pagination, and result formatting.
18 | """
19 | def __init__(self, client: ObsidianClient):
20 | self.client = client
21 |
22 | async def _get_file_metadata(self, file_path: str, include_content_for_tags: bool = False) -> Optional[Dict[str, Any]]:
23 | try:
24 | note_metadata = await self.client.get_note_metadata(file_path)
25 | result = {
26 | "mtime": note_metadata["stat"]["mtime"],
27 | "ctime": note_metadata["stat"]["ctime"]
28 | }
29 |
30 | # Include content and tags if needed for tag filtering
31 | if include_content_for_tags:
32 | result["content"] = note_metadata.get("content", "")
33 | result["tags"] = note_metadata.get("frontmatter", {}).get("tags", [])
34 |
35 | return result
36 | except Exception:
37 | return None
38 |
39 | def _apply_filters(self, file_path: str, metadata: Dict[str, Any],
40 | search_path_prefix: str, title_contains: Optional[Union[str, List[str]]], title_match_mode: str,
41 | tag: Optional[Union[str, List[str]]], tag_match_mode: str, since_date: Optional[datetime], until_date: Optional[datetime],
42 | created_since_date: Optional[datetime], created_until_date: Optional[datetime]) -> bool:
43 | """
44 | Applies various filters to a file based on its path, metadata, and specified criteria.
45 | Returns True if the file passes all filters, False otherwise.
46 | """
47 |
48 | if search_path_prefix and not file_path.startswith(search_path_prefix):
49 | return False
50 |
51 | if title_contains:
52 | filename = os.path.basename(file_path).lower()
53 | if isinstance(title_contains, str):
54 | if title_contains.lower() not in filename:
55 | return False
56 | else:
57 | terms = [term.lower() for term in title_contains]
58 | if title_match_mode == "all":
59 | if not all(term in filename for term in terms):
60 | return False
61 | else:
62 | if not any(term in filename for term in terms):
63 | return False
64 |
65 | # Check tag filter - tags are stored in frontmatter or content
66 | if tag:
67 | tags_found = metadata.get("tags", [])
68 | # Also check for inline tags in content if available
69 | content = metadata.get("content", "")
70 | if content:
71 | # Look for #tag format in content
72 | import re
73 | inline_tags = re.findall(r'#(\w+)', content)
74 | tags_found.extend(inline_tags)
75 |
76 | # Convert to lowercase for case-insensitive matching
77 | tags_found = [t.lower() for t in tags_found]
78 |
79 | # Handle multiple tags with AND/OR logic
80 | if isinstance(tag, str):
81 | # Single tag
82 | if tag.lower() not in tags_found:
83 | return False
84 | else:
85 | # Multiple tags - apply OR/AND logic
86 | tags_to_match = [t.lower() for t in tag]
87 | if tag_match_mode == "all":
88 | # ALL tags must be present (AND logic)
89 | if not all(tag_term in tags_found for tag_term in tags_to_match):
90 | return False
91 | else: # tag_match_mode == "any" (default)
92 | # ANY tag must be present (OR logic)
93 | if not any(tag_term in tags_found for tag_term in tags_to_match):
94 | return False
95 |
96 | file_mod_time = datetime.fromtimestamp(metadata["mtime"] / 1000)
97 | if since_date and file_mod_time < since_date:
98 | return False
99 | if until_date and file_mod_time > until_date:
100 | return False
101 |
102 | file_created_time = datetime.fromtimestamp(metadata["ctime"] / 1000)
103 | if created_since_date and file_created_time < created_since_date:
104 | return False
105 | if created_until_date and file_created_time > created_until_date:
106 | return False
107 |
108 | return True
109 |
110 | def _process_matches(self, api_result: Dict[str, Any], max_matches_per_file: int) -> List[Dict[str, Any]]:
111 | matches = []
112 | for match in api_result.get("matches", []):
113 | matches.append({
114 | "context": match.get("context", ""),
115 | "match_start": match.get("match", {}).get("start", 0),
116 | "match_end": match.get("match", {}).get("end", 0)
117 | })
118 | return matches[:max_matches_per_file]
119 |
120 | def _create_result_item(self, file_path: str, matches: List[Dict[str, Any]],
121 | metadata: Dict[str, Any], score: int) -> Dict[str, Any]:
122 | return {
123 | "path": file_path,
124 | "filename": os.path.basename(file_path),
125 | "matches": matches,
126 | "modified_time": format_timestamp(metadata["mtime"]),
127 | "created_time": format_timestamp(metadata["ctime"]),
128 | "score": score
129 | }
130 |
131 | def _paginate_results(self, results: List[Dict[str, Any]], page: int, page_size: int) -> tuple:
132 | total_files_found = len(results)
133 | total_pages = math.ceil(total_files_found / page_size)
134 | start_index = (page - 1) * page_size
135 | end_index = start_index + page_size
136 | paginated_results = results[start_index:end_index]
137 |
138 | also_found_in_files = None
139 | if total_pages > 1:
140 | # Collect filenames from other pages if pagination is active
141 | paginated_paths = {result["path"] for result in paginated_results}
142 | also_found_in_files = [
143 | result["filename"] for result in results
144 | if result["path"] not in paginated_paths
145 | ]
146 |
147 | return paginated_results, total_files_found, total_pages, also_found_in_files
148 |
149 | async def search(self, query: Optional[str] = None, query_type: str = "text", search_in_path: Optional[str] = None,
150 | title_contains: Optional[Union[str, List[str]]] = None, title_match_mode: str = "any",
151 | tag: Optional[Union[str, List[str]]] = None, tag_match_mode: str = "any",
152 | context_length: int = 100, include_content: bool = False,
153 | modified_since: Optional[str] = None, modified_until: Optional[str] = None,
154 | created_since: Optional[str] = None, created_until: Optional[str] = None,
155 | page_size: int = 50, page: int = 1, max_matches_per_file: int = 5) -> Dict[str, Any]:
156 |
157 | date_filters = self._parse_date_filters(modified_since, modified_until, created_since, created_until)
158 | search_path_prefix = self._normalize_search_path(search_in_path)
159 |
160 | try:
161 | # Determine the base path for API search if a prefix is provided
162 | base_search_path = search_path_prefix.rstrip('/') if search_path_prefix else ""
163 | api_results = await self._get_api_results(query, query_type, context_length, base_search_path)
164 | filtered_results, total_matches_count = await self._process_results(
165 | api_results, search_path_prefix, title_contains, title_match_mode, tag, tag_match_mode, date_filters, max_matches_per_file, query, include_content
166 | )
167 |
168 | filtered_results.sort(key=lambda x: x["modified_time"], reverse=True)
169 |
170 | paginated_results, total_files_found, total_pages, also_found_in_files = self._paginate_results(
171 | filtered_results, page, page_size
172 | )
173 |
174 | message = self._create_response_message(
175 | total_matches_count, total_files_found, page, total_pages,
176 | len(paginated_results), search_path_prefix
177 | )
178 |
179 | return {
180 | "success": True,
181 | "message": message,
182 | "results": paginated_results,
183 | "total_files_found": total_files_found,
184 | "total_matches_found": total_matches_count,
185 | "current_page": page,
186 | "page_size": page_size,
187 | "total_pages": total_pages,
188 | "also_found_in_files": also_found_in_files
189 | }
190 |
191 | except Exception as e:
192 | return self._create_error_response(str(e), page, page_size)
193 |
194 | def _parse_date_filters(self, modified_since: Optional[str], modified_until: Optional[str],
195 | created_since: Optional[str], created_until: Optional[str]) -> Dict[str, Optional[datetime]]:
196 | return {
197 | "since_date": parse_date_filter(modified_since) if modified_since else None,
198 | "until_date": parse_date_filter(modified_until) if modified_until else None,
199 | "created_since_date": parse_date_filter(created_since) if created_since else None,
200 | "created_until_date": parse_date_filter(created_until) if created_until else None
201 | }
202 |
203 | def _normalize_search_path(self, search_in_path: Optional[str]) -> str:
204 | if not search_in_path:
205 | return ""
206 | search_path_prefix = search_in_path.strip("/")
207 | return search_path_prefix + "/" if search_path_prefix else ""
208 |
209 | async def _get_api_results(self, query: Optional[str], query_type: str, context_length: int, search_path: str = "") -> List[Dict[str, Any]]:
210 | if query and query.strip():
211 | if query_type == "regex":
212 | return await self._execute_regex_search(query, search_path)
213 | else:
214 | # Default to simple text search if query type is not regex or not specified
215 | return await self.client.search_simple(query, context_length)
216 | else:
217 | # If no query is provided, list all markdown files in the specified path
218 | all_files = await self.client.list_all_files(search_path, max_depth=8, max_files=1000)
219 | return [
220 | {
221 | "filename": file_path,
222 | "score": 0,
223 | "matches": []
224 | }
225 | for file_path in all_files
226 | if file_path.endswith('.md')
227 | ]
228 |
229 | async def _execute_regex_search(self, regex_pattern: str, search_path: str = "") -> List[Dict[str, Any]]:
230 | import re
231 |
232 | try:
233 | if not regex_pattern.startswith('(?'):
234 | # Default to case-insensitive regex search if no flags are provided
235 | case_insensitive_pattern = f"(?i){regex_pattern}"
236 | else:
237 | case_insensitive_pattern = regex_pattern
238 |
239 | regex = re.compile(case_insensitive_pattern)
240 | all_files = await self.client.list_all_files(search_path, max_depth=8, max_files=1000)
241 | md_files = [f for f in all_files if f.endswith('.md')]
242 |
243 | formatted_results = []
244 | for file_path in md_files:
245 | try:
246 | note_data = await self.client.get_note_metadata(file_path)
247 | content = note_data.get("content", "")
248 |
249 | matches = list(regex.finditer(content))
250 | if matches:
251 | match_data = []
252 | for match in matches[:5]:
253 | # Create a context window around each match
254 | start = max(0, match.start() - 50)
255 | end = min(len(content), match.end() + 50)
256 | context = content[start:end]
257 | match_data.append({
258 | "context": context,
259 | "match": {
260 | "start": match.start() - start,
261 | "end": match.end() - start
262 | }
263 | })
264 |
265 | formatted_results.append({
266 | "filename": file_path,
267 | "score": len(matches),
268 | "matches": match_data
269 | })
270 | except Exception:
271 | continue
272 |
273 | return formatted_results
274 |
275 | except Exception as e:
276 | print(f"Regex search failed: {e}, falling back to simple search")
277 | return await self.client.search_simple(regex_pattern, 100)
278 |
279 | async def _process_results(self, api_results: List[Dict[str, Any]], search_path_prefix: str,
280 | title_contains: Optional[Union[str, List[str]]], title_match_mode: str, tag: Optional[Union[str, List[str]]], tag_match_mode: str,
281 | date_filters: Dict[str, Optional[datetime]], max_matches_per_file: int, query: Optional[str], include_content: bool = False) -> tuple:
282 | filtered_results = []
283 | total_matches_count = 0
284 |
285 | for api_result in api_results:
286 | file_path = api_result["filename"]
287 | # Include content if we need to filter by tags
288 | metadata = await self._get_file_metadata(file_path, include_content_for_tags=bool(tag))
289 |
290 | if not metadata:
291 | continue
292 |
293 | if not self._apply_filters(
294 | file_path, metadata, search_path_prefix, title_contains, title_match_mode, tag, tag_match_mode,
295 | date_filters["since_date"], date_filters["until_date"],
296 | date_filters["created_since_date"], date_filters["created_until_date"]
297 | ):
298 | continue
299 |
300 | all_matches = api_result.get("matches", [])
301 | matches = self._process_matches(api_result, max_matches_per_file)
302 | total_matches_count += len(all_matches)
303 |
304 | if include_content or (query is None or query.strip() == ""):
305 | # If include_content is true, or if there's no search query (listing all files),
306 | # attempt to fetch and include the full note content.
307 | try:
308 | full_content = await self.client.get_note_metadata(file_path)
309 | content_text = full_content.get("content", "")
310 | if content_text:
311 | matches = [{
312 | "context": content_text,
313 | "match_start": 0,
314 | "match_end": len(content_text)
315 | }]
316 | except Exception:
317 | pass
318 |
319 | if matches or (query is None or query.strip() == ""):
320 | result_item = self._create_result_item(
321 | file_path, matches, metadata, api_result.get("score", 0)
322 | )
323 | filtered_results.append(result_item)
324 |
325 | return filtered_results, total_matches_count
326 |
327 | def _create_response_message(self, total_matches_count: int, total_files_found: int,
328 | page: int, total_pages: int, current_page_files: int,
329 | search_path_prefix: str) -> str:
330 | message = (f"Found {total_matches_count} matches across {total_files_found} files. "
331 | f"Showing page {page} of {total_pages} ({current_page_files} files on this page).")
332 |
333 | if search_path_prefix:
334 | message += f" Searched in path: {search_path_prefix}"
335 |
336 | return message
337 |
338 | def _create_error_response(self, error_msg: str, page: int, page_size: int) -> Dict[str, Any]:
339 | return {
340 | "success": False,
341 | "message": f"Search failed: {error_msg}",
342 | "results": [],
343 | "total_files_found": 0,
344 | "total_matches_found": 0,
345 | "current_page": page,
346 | "page_size": page_size,
347 | "total_pages": 0
348 | }
```
--------------------------------------------------------------------------------
/features/steps/blackbox_steps.py:
--------------------------------------------------------------------------------
```python
1 | from unittest.mock import patch
2 | from behave import given, when, then
3 | import json
4 |
5 |
6 | @given('the Obsidian API is available')
7 | def step_obsidian_api_available(context):
8 | context.base_url = "https://localhost:27124"
9 | context.api_key = "test-api-key"
10 |
11 |
12 | @given('the vault contains notes with content "{content}"')
13 | def step_vault_contains_content(context, content):
14 | context.mock_search_results = [
15 | {
16 | "filename": "test-note.md",
17 | "score": 100,
18 | "matches": [
19 | {
20 | "context": f"Some text with {content} in the middle",
21 | "match": {"start": 15, "end": 15 + len(content)}
22 | }
23 | ]
24 | }
25 | ]
26 |
27 |
28 | @given('the vault has a directory structure with files and folders')
29 | def step_vault_has_structure(context):
30 | context.mock_api_files = ["daily/", "projects/", "README.md", "index.md"]
31 |
32 |
33 | @given('the vault contains notes created on different dates')
34 | def step_vault_notes_different_create_dates(context):
35 | context.mock_files_list = ["old-note.md", "new-note.md"]
36 | context.mock_metadata_responses = {
37 | "old-note.md": {"stat": {"mtime": 1703462400000, "ctime": 1703462400000}}, # Dec 2023
38 | "new-note.md": {"stat": {"mtime": 1704672000000, "ctime": 1704672000000}} # Jan 2024
39 | }
40 |
41 |
42 | @given('the vault contains notes with titles "{title1}", "{title2}", and "{title3}"')
43 | def step_vault_notes_with_titles(context, title1, title2, title3):
44 | context.mock_files_list = [f"{title1}.md", f"{title2}.md", f"{title3}.md"]
45 | context.mock_metadata_base = {"stat": {"mtime": 1704067200000, "ctime": 1704067200000}}
46 |
47 |
48 | @given('the vault contains notes in projects and daily directories')
49 | def step_vault_notes_in_directories(context):
50 | context.mock_files_list = ["projects/work.md", "projects/personal.md", "daily/2024-01-01.md", "other/random.md"]
51 | context.mock_metadata_base = {"stat": {"mtime": 1704067200000, "ctime": 1704067200000}}
52 |
53 |
54 | @given('the vault contains notes with "{content1}" and "{content2}"')
55 | def step_vault_notes_with_content(context, content1, content2):
56 | context.mock_files_list = ["note1.md", "note2.md"]
57 | context.mock_note_contents = {
58 | "note1.md": {"content": f"Some {content1} here", "stat": {"mtime": 1704067200000, "ctime": 1704067200000}},
59 | "note2.md": {"content": f"Another {content2} there", "stat": {"mtime": 1704067200000, "ctime": 1704067200000}}
60 | }
61 |
62 |
63 | @given('the vault contains notes with tags "{tag1}" and "{tag2}"')
64 | def step_vault_notes_with_tags(context, tag1, tag2):
65 | context.mock_files_list = ["project-note.md", "meeting-note.md", "other-note.md"]
66 | context.mock_tag_contents = {
67 | "project-note.md": {
68 | "content": f"This is a project note #{tag1}",
69 | "frontmatter": {"tags": [tag1]},
70 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
71 | },
72 | "meeting-note.md": {
73 | "content": f"This is a meeting note #{tag2}",
74 | "frontmatter": {"tags": [tag2]},
75 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
76 | },
77 | "other-note.md": {
78 | "content": "This note has no tags",
79 | "frontmatter": {},
80 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
81 | }
82 | }
83 |
84 |
85 | @given('the vault contains notes with multiple tags')
86 | def step_vault_notes_with_multiple_tags(context):
87 | context.mock_files_list = ["urgent-project.md", "project-only.md", "urgent-only.md", "no-tags.md"]
88 | context.mock_multi_tag_contents = {
89 | "urgent-project.md": {
90 | "content": "This is urgent project work #project #urgent",
91 | "frontmatter": {"tags": ["project", "urgent"]},
92 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
93 | },
94 | "project-only.md": {
95 | "content": "This is project work #project",
96 | "frontmatter": {"tags": ["project"]},
97 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
98 | },
99 | "urgent-only.md": {
100 | "content": "This is urgent #urgent",
101 | "frontmatter": {"tags": ["urgent"]},
102 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
103 | },
104 | "no-tags.md": {
105 | "content": "This note has no tags",
106 | "frontmatter": {},
107 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
108 | }
109 | }
110 |
111 |
112 | @when('I call the search_vault tool with query "{query}"')
113 | def step_call_search_tool(context, query):
114 | from obsidian_mcp.server import search_vault
115 |
116 | async def run_tool():
117 | # Mock only the external HTTP calls to Obsidian API
118 | with patch('httpx.AsyncClient.request') as mock_request:
119 | # Set up mock responses for different API endpoints
120 | def mock_api_response(method, url, **kwargs):
121 | if '/search/simple/' in url:
122 | # Mock search endpoint
123 | response = type('MockResponse', (), {
124 | 'status_code': 200,
125 | 'json': lambda *args, **kwargs: context.mock_search_results,
126 | 'raise_for_status': lambda *args, **kwargs: None
127 | })()
128 | return response
129 | elif '/vault/' in url and not url.endswith('/'):
130 | # Mock note metadata endpoint
131 | response = type('MockResponse', (), {
132 | 'status_code': 200,
133 | 'json': lambda *args, **kwargs: {
134 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000}
135 | },
136 | 'raise_for_status': lambda *args, **kwargs: None
137 | })()
138 | return response
139 | else:
140 | # Default response
141 | response = type('MockResponse', (), {
142 | 'status_code': 404,
143 | 'raise_for_status': lambda *args, **kwargs: None
144 | })()
145 | return response
146 |
147 | mock_request.side_effect = mock_api_response
148 |
149 | # Call the actual MCP tool function - this is blackbox interface
150 | return await search_vault(query=query)
151 |
152 | context.tool_result = context.loop.run_until_complete(run_tool())
153 |
154 |
155 | @when('I call the browse_vault_structure tool with include_files True')
156 | def step_call_browse_tool_with_files(context):
157 | from obsidian_mcp.server import browse_vault_structure
158 |
159 | async def run_tool():
160 | # Mock only external HTTP calls to API
161 | with patch('httpx.AsyncClient.request') as mock_request:
162 | # Mock vault listing endpoint to return files and folders
163 | response = type('MockResponse', (), {
164 | 'status_code': 200,
165 | 'json': lambda *args, **kwargs: {"files": context.mock_api_files},
166 | 'raise_for_status': lambda *args, **kwargs: None
167 | })()
168 | mock_request.return_value = response
169 |
170 | # Call actual MCP tool function with include_files=True
171 | return await browse_vault_structure(include_files=True)
172 |
173 | context.tool_result = context.loop.run_until_complete(run_tool())
174 |
175 |
176 | @when('I call the get_note_content tool with path "{path}"')
177 | def step_call_get_note_tool(context, path):
178 | from obsidian_mcp.server import get_note_content
179 |
180 | async def run_tool():
181 | # Mock only external HTTP calls to API
182 | with patch('httpx.AsyncClient.request') as mock_request:
183 | if path == "missing-note.md":
184 | # Mock 404 for missing note
185 | def raise_error(*args, **kwargs):
186 | raise Exception("Note not found")
187 |
188 | response = type('MockResponse', (), {
189 | 'status_code': 404,
190 | 'raise_for_status': raise_error
191 | })()
192 | mock_request.return_value = response
193 | else:
194 | # Mock successful retrieval
195 | response = type('MockResponse', (), {
196 | 'status_code': 200,
197 | 'json': lambda *args, **kwargs: {
198 | "content": "Daily note content for January 15th",
199 | "stat": {"mtime": 1704067200000, "ctime": 1704067200000},
200 | "frontmatter": {}
201 | },
202 | 'raise_for_status': lambda *args, **kwargs: None
203 | })()
204 | mock_request.return_value = response
205 |
206 | # Call actual tool function
207 | return await get_note_content(path)
208 |
209 | context.tool_result = context.loop.run_until_complete(run_tool())
210 |
211 |
212 | @then('the tool should return successful results')
213 | def step_verify_successful_results(context):
214 | assert context.tool_result.get("success") is True
215 | assert "results" in context.tool_result or "data" in context.tool_result
216 |
217 |
218 | @then('the results should contain the searched content')
219 | def step_verify_search_content(context):
220 | assert context.tool_result["success"] is True
221 | assert len(context.tool_result["results"]) > 0
222 | # Verify actual search result structure
223 | result = context.tool_result["results"][0]
224 | assert "matches" in result
225 | assert len(result["matches"]) > 0
226 |
227 |
228 | @then('the tool should return both files and folders')
229 | def step_verify_files_and_folders_returned(context):
230 | assert context.tool_result["success"] is True
231 | assert len(context.tool_result["directories"]) > 0 # Should have dir
232 | assert len(context.tool_result["files"]) > 0 # Should have files when include_files=True
233 | assert context.tool_result["include_files"] is True
234 |
235 |
236 | @then('the tool should return an error')
237 | def step_verify_error_result(context):
238 | assert context.tool_result.get("success") is False
239 | assert "error" in context.tool_result
240 |
241 |
242 | @when('I call search_vault tool with created_since "{date}"')
243 | def step_call_search_with_created_since(context, date):
244 | from obsidian_mcp.server import search_vault
245 |
246 | async def run_tool():
247 | with patch('httpx.AsyncClient.request') as mock_request:
248 | def mock_api_response(method, url, **kwargs):
249 | if '/vault/' in url and not url.endswith('/'):
250 | # Extract filename from URL to return correct metadata
251 | filename = url.split('/')[-1]
252 | if filename in context.mock_metadata_responses:
253 | response = type('MockResponse', (), {
254 | 'status_code': 200,
255 | 'json': lambda *args, **kwargs: context.mock_metadata_responses[filename],
256 | 'raise_for_status': lambda *args, **kwargs: None
257 | })()
258 | return response
259 |
260 | # Default: return file list for filter-only search
261 | response = type('MockResponse', (), {
262 | 'status_code': 200,
263 | 'json': lambda *args, **kwargs: {"files": context.mock_files_list},
264 | 'raise_for_status': lambda *args, **kwargs: None
265 | })()
266 | return response
267 |
268 | mock_request.side_effect = mock_api_response
269 | return await search_vault(created_since=date)
270 |
271 | context.tool_result = context.loop.run_until_complete(run_tool())
272 |
273 |
274 | @when('I call search_vault tool with title_contains {title_list} and match mode "{mode}"')
275 | def step_call_search_with_title_contains(context, title_list, mode):
276 | from obsidian_mcp.server import search_vault
277 | import json
278 |
279 | # Parse the title list from string representation
280 | title_contains = json.loads(title_list)
281 |
282 | async def run_tool():
283 | with patch('httpx.AsyncClient.request') as mock_request:
284 | def mock_api_response(method, url, **kwargs):
285 | if '/vault/' in url and not url.endswith('/'):
286 | response = type('MockResponse', (), {
287 | 'status_code': 200,
288 | 'json': lambda *args, **kwargs: context.mock_metadata_base,
289 | 'raise_for_status': lambda *args, **kwargs: None
290 | })()
291 | return response
292 |
293 | # Return file list for filter-only search
294 | response = type('MockResponse', (), {
295 | 'status_code': 200,
296 | 'json': lambda *args, **kwargs: {"files": context.mock_files_list},
297 | 'raise_for_status': lambda *args, **kwargs: None
298 | })()
299 | return response
300 |
301 | mock_request.side_effect = mock_api_response
302 | return await search_vault(title_contains=title_contains, title_match_mode=mode)
303 |
304 | context.tool_result = context.loop.run_until_complete(run_tool())
305 |
306 |
307 | @when('I call search_vault tool with search_in_path "{path}"')
308 | def step_call_search_with_path(context, path):
309 | from obsidian_mcp.server import search_vault
310 |
311 | async def run_tool():
312 | with patch('httpx.AsyncClient.request') as mock_request:
313 | def mock_api_response(method, url, **kwargs):
314 | if '/vault/' in url and not url.endswith('/'):
315 | response = type('MockResponse', (), {
316 | 'status_code': 200,
317 | 'json': lambda *args, **kwargs: context.mock_metadata_base,
318 | 'raise_for_status': lambda *args, **kwargs: None
319 | })()
320 | return response
321 |
322 | # Return file list for filter-only search
323 | response = type('MockResponse', (), {
324 | 'status_code': 200,
325 | 'json': lambda *args, **kwargs: {"files": context.mock_files_list},
326 | 'raise_for_status': lambda *args, **kwargs: None
327 | })()
328 | return response
329 |
330 | mock_request.side_effect = mock_api_response
331 | return await search_vault(search_in_path=path)
332 |
333 | context.tool_result = context.loop.run_until_complete(run_tool())
334 |
335 |
336 | @when('I call search_vault tool with regex "{pattern}"')
337 | def step_call_search_with_regex(context, pattern):
338 | from obsidian_mcp.server import search_vault
339 |
340 | async def run_tool():
341 | with patch('httpx.AsyncClient.request') as mock_request:
342 | def mock_api_response(method, url, **kwargs):
343 | if '/vault/' in url and not url.endswith('/'):
344 | # Extract filename from URL to return appropiiate content
345 | filename = url.split('/')[-1]
346 | if filename in context.mock_note_contents:
347 | response = type('MockResponse', (), {
348 | 'status_code': 200,
349 | 'json': lambda *args, **kwargs: context.mock_note_contents[filename],
350 | 'raise_for_status': lambda *args, **kwargs: None
351 | })()
352 | return response
353 |
354 | # Return file list for Regex search
355 | response = type('MockResponse', (), {
356 | 'status_code': 200,
357 | 'json': lambda *args, **kwargs: {"files": context.mock_files_list},
358 | 'raise_for_status': lambda *args, **kwargs: None
359 | })()
360 | return response
361 |
362 | mock_request.side_effect = mock_api_response
363 | return await search_vault(query=pattern, query_type="regex")
364 |
365 | context.tool_result = context.loop.run_until_complete(run_tool())
366 |
367 |
368 | @when('I call search_vault tool with tag "{tag}"')
369 | def step_call_search_with_tag(context, tag):
370 | from obsidian_mcp.server import search_vault
371 |
372 | async def run_tool():
373 | with patch('httpx.AsyncClient.request') as mock_request:
374 | def mock_api_response(method, url, **kwargs):
375 | if '/vault/' in url and not url.endswith('/'):
376 | # Extract filename from URL to return appropriate content
377 | filename = url.split('/')[-1]
378 | if filename in context.mock_tag_contents:
379 | response = type('MockResponse', (), {
380 | 'status_code': 200,
381 | 'json': lambda *args, **kwargs: context.mock_tag_contents[filename],
382 | 'raise_for_status': lambda *args, **kwargs: None
383 | })()
384 | return response
385 |
386 | # Return file list for tag search
387 | response = type('MockResponse', (), {
388 | 'status_code': 200,
389 | 'json': lambda *args, **kwargs: {"files": context.mock_files_list},
390 | 'raise_for_status': lambda *args, **kwargs: None
391 | })()
392 | return response
393 |
394 | mock_request.side_effect = mock_api_response
395 | return await search_vault(tag=tag)
396 |
397 | context.tool_result = context.loop.run_until_complete(run_tool())
398 |
399 |
400 | @when('I call search_vault tool with tags {tag_list} and match mode "{mode}"')
401 | def step_call_search_with_multiple_tags(context, tag_list, mode):
402 | from obsidian_mcp.server import search_vault
403 | import json
404 |
405 | # Parse the tag list from string representation
406 | tags = json.loads(tag_list)
407 |
408 | async def run_tool():
409 | with patch('httpx.AsyncClient.request') as mock_request:
410 | def mock_api_response(method, url, **kwargs):
411 | if '/vault/' in url and not url.endswith('/'):
412 | # Extract filename from URL to return appropriate content
413 | filename = url.split('/')[-1]
414 | if filename in context.mock_multi_tag_contents:
415 | response = type('MockResponse', (), {
416 | 'status_code': 200,
417 | 'json': lambda *args, **kwargs: context.mock_multi_tag_contents[filename],
418 | 'raise_for_status': lambda *args, **kwargs: None
419 | })()
420 | return response
421 |
422 | # Return file list for tag search
423 | response = type('MockResponse', (), {
424 | 'status_code': 200,
425 | 'json': lambda *args, **kwargs: {"files": context.mock_files_list},
426 | 'raise_for_status': lambda *args, **kwargs: None
427 | })()
428 | return response
429 |
430 | mock_request.side_effect = mock_api_response
431 | return await search_vault(tag=tags, tag_match_mode=mode)
432 |
433 | context.tool_result = context.loop.run_until_complete(run_tool())
434 |
435 |
436 | @then('the tool should return only notes created after that date')
437 | def step_verify_created_since_filter(context):
438 | assert context.tool_result["success"] is True
439 | assert len(context.tool_result["results"]) == 1 # Only new-note.md should match
440 | assert context.tool_result["results"][0]["path"] == "new-note.md"
441 |
442 |
443 | @then('the tool should return notes matching either foo or bar')
444 | def step_verify_title_or_match(context):
445 | assert context.tool_result["success"] is True
446 | assert len(context.tool_result["results"]) == 2 # foo project.md and bar chart.md
447 | paths = [result["path"] for result in context.tool_result["results"]]
448 | assert "foo project.md" in paths
449 | assert "bar chart.md" in paths
450 | assert "baz notes.md" not in paths
451 |
452 |
453 | @then('the tool should return only notes containing both foo and bar')
454 | def step_verify_title_and_match(context):
455 | assert context.tool_result["success"] is True
456 | assert len(context.tool_result["results"]) == 1 # Only "foo AND bar project.md"
457 | assert context.tool_result["results"][0]["path"] == "foo and bar project.md"
458 |
459 |
460 | @then('the tool should return only notes from projects directory')
461 | def step_verify_path_filter(context):
462 | assert context.tool_result["success"] is True
463 | for result in context.tool_result["results"]:
464 | assert result["path"].startswith("projects/")
465 |
466 |
467 | @then('the tool should return notes matching the regex pattern')
468 | def step_verify_regex_match(context):
469 | assert context.tool_result["success"] is True
470 | assert len(context.tool_result["results"]) > 0 # Should find notes with foo OR bar content
471 |
472 |
473 | @then('the tool should return only notes tagged with project')
474 | def step_verify_tag_filter(context):
475 | assert context.tool_result["success"] is True
476 | assert len(context.tool_result["results"]) == 1 # Only project-note.md should match
477 | assert context.tool_result["results"][0]["path"] == "project-note.md"
478 |
479 |
480 | @then('the tool should return notes with either project or urgent tags')
481 | def step_verify_multiple_tags_or_filter(context):
482 | assert context.tool_result["success"] is True
483 | assert len(context.tool_result["results"]) == 3 # urgent-project.md, project-only.md, urgent-only.md
484 | paths = [result["path"] for result in context.tool_result["results"]]
485 | assert "urgent-project.md" in paths
486 | assert "project-only.md" in paths
487 | assert "urgent-only.md" in paths
488 | assert "no-tags.md" not in paths
489 |
490 |
491 | @then('the tool should return only notes with both project and urgent tags')
492 | def step_verify_multiple_tags_and_filter(context):
493 | assert context.tool_result["success"] is True
494 | assert len(context.tool_result["results"]) == 1 # Only urgent-project.md should match
495 | assert context.tool_result["results"][0]["path"] == "urgent-project.md"
```