# Directory Structure
```
├── .gitignore
├── .python-version
├── main.py
├── pyproject.toml
├── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.13
2 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | *~
2 |
3 | # Python-generated files
4 | __pycache__/
5 | *.py[oc]
6 | build/
7 | dist/
8 | wheels/
9 | *.egg-info
10 |
11 | # Virtual environments
12 | .venv
13 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 |
2 | [library-mcp](https://github.com/lethain/library-mcp) is an MCP server for interacting with
3 | Markdown knowledge bases. Basically, folders that may or may not have subfolders, that include
4 | files with `.md` extension and start with metadata like:
5 |
6 | ----
7 | title: My blog post
8 | tags:
9 | - python
10 | - programming
11 | url: /my-blog-post
12 | ---
13 |
14 | # My blog post
15 | Yesterday I was dreaming about...
16 |
17 | The typical workflow in the current verison is to
18 | retrieve recent content for a given tag, and then
19 | discuss using that tag:
20 |
21 | Get the next 50 posts with tag "executive",
22 | then tell me what I should do about this problem
23 | I am running into: ...
24 |
25 | You can also do the same but by date ranges:
26 |
27 | Summarize the blog posts I wrote in the past year.
28 |
29 | You might reasonably ask "why not just upload your entire blog
30 | into the context window?" and there are two places where this library
31 | outperforms that approach:
32 |
33 | 1. My blog corpus is much larger than most model's context windows today.
34 | Further, even if the context windows became exhaustively large, I wrote a lot
35 | of mediocre stuff in the past, so maybe omitting it's a feature.
36 | 2. I have a number of distinct Markdown knowledge bases, and this lets me
37 | operate across them in tandem.
38 |
39 | Finally, this is a hobby project, intended for running locally on your
40 | laptop. No humans have been harmed using this software, but it does
41 | work pretty well!
42 |
43 |
44 | # Tools
45 |
46 | This MCP server exposes these tools.
47 |
48 | ### Content Search Tools
49 |
50 | Tools for retrieving content into your context window:
51 |
52 | * `get_by_tag` - Retrieves content by tag
53 | * `get_by_text` - Searches content for specific text
54 | * `get_by_slug_or_url` - Finds posts by slug or URL
55 | * `get_by_date_range` - Gets posts published within a date range
56 |
57 | ### Tag Management Tools
58 |
59 | Tools for navigating your knowledge base:
60 |
61 | * `search_tags` - Searches for tags matching a query
62 | * `list_all_tags` - Lists all tags sorted by post count and recency
63 |
64 | ### Maintenance Tools
65 |
66 | Tools for dealing with running the tool:
67 |
68 | * `rebuild` - Rebuilds the content index,
69 | useful if you have added more content,
70 | edited existing content, etc
71 |
72 |
73 | # Setup / Installation
74 |
75 | These instructions describe installation for [Claude Desktop](https://claude.ai/download) on OS X.
76 | It should work similarly on other platforms.
77 |
78 | 1. Install [Claude Desktop](https://claude.ai/download).
79 | 2. Clone [library-mcp](https://github.com/lethain/library-mcp) into
80 | a convenient location, I'm assuming `/Users/will/library-mcp`
81 | 3. Make sure you have `uv` installed, you can [follow these instructions](https://modelcontextprotocol.io/quickstart/server)
82 | 4. Go to Cladue Desktop, Setting, Developer, and have it create your MCP config file.
83 | Then you want to update your `claude_desktop_config.json`.
84 | (Note that you should replace `will` with your user, e.g. the output of `whoami`.
85 |
86 | cd /Users/will/Library/Application Support/Claude
87 | vi claude_desktop_config.json
88 |
89 | Then add this section:
90 |
91 | {
92 | "mcpServers": {
93 | "library": {
94 | "command": "uv",
95 | "args": [
96 | "--directory",
97 | "/Users/will/library-mcp",
98 | "run",
99 | "main.py",
100 | "/Users/will/irrational_hugo/content"
101 | ]
102 | }
103 | }
104 | }
105 |
106 | 5. Close Claude and reopen it.
107 | 6. It should work...
108 |
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "library-mcp"
3 | version = "0.1.0"
4 | description = "Add your description here"
5 | readme = "README.md"
6 | requires-python = ">=3.13"
7 | dependencies = [
8 | "httpx>=0.28.1",
9 | "mcp[cli]>=1.6.0",
10 | "pyyaml>=6.0.2",
11 | ]
12 |
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import re
3 | import sys
4 | import glob
5 | import yaml
6 | from typing import Any, Dict, List, Set, Optional, Union, Tuple
7 | from dataclasses import dataclass
8 | from datetime import datetime
9 | import httpx
10 | from mcp.server.fastmcp import FastMCP
11 |
12 | # Redirect all debug prints to stderr
13 | def debug_print(*args, **kwargs):
14 | print(*args, file=sys.stderr, **kwargs)
15 |
16 |
17 | @dataclass
18 | class ContentFile:
19 | path: str
20 | meta: Dict[str, Any]
21 | data: str
22 |
23 | @property
24 | def date(self) -> Optional[datetime]:
25 | """Extract date from metadata or fallback to file modification time"""
26 | if 'date' in self.meta:
27 | try:
28 | date_str = str(self.meta['date'])
29 | if 'T' in date_str and not date_str.endswith('Z') and '+' not in date_str:
30 | date_str += 'Z' # Add UTC indicator if missing
31 | return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
32 | except (ValueError, TypeError) as e:
33 | debug_print(f"Error parsing date: {e} for {self.path}")
34 | pass
35 |
36 | try:
37 | # Get modification time as naive datetime (assumed UTC)
38 | mod_time = datetime.fromtimestamp(os.path.getmtime(self.path))
39 | # Return as naive datetime for consistent comparison
40 | return mod_time
41 | except OSError:
42 | return None
43 |
44 | @property
45 | def slug(self) -> str:
46 | """Extract slug from metadata or from filename"""
47 | if 'slug' in self.meta:
48 | return str(self.meta['slug'])
49 |
50 | # Extract from filename (basename without extension)
51 | filename = os.path.basename(self.path)
52 | return os.path.splitext(filename)[0]
53 |
54 | @property
55 | def url(self) -> Optional[str]:
56 | """Extract URL from metadata if available"""
57 | if 'url' in self.meta:
58 | return str(self.meta['url'])
59 | return None
60 |
61 |
62 | class HugoContentManager:
63 | def __init__(self, content_dirs: List[str]):
64 | self.content_dirs = content_dirs
65 | self.dir_to_files: Dict[str, List[str]] = {}
66 | self.path_to_content: Dict[str, ContentFile] = {}
67 | self.load_content()
68 |
69 | def load_content(self) -> None:
70 | """Load all content from the specified directories"""
71 | self.dir_to_files = {}
72 | self.path_to_content = {}
73 |
74 | for content_dir in self.content_dirs:
75 | if not os.path.isdir(content_dir):
76 | debug_print(f"Warning: {content_dir} is not a valid directory, skipping")
77 | continue
78 |
79 | md_files = []
80 | for root, _, files in os.walk(content_dir):
81 | for file in files:
82 | if file.endswith('.md'):
83 | full_path = os.path.join(root, file)
84 | md_files.append(full_path)
85 |
86 | self.dir_to_files[content_dir] = md_files
87 | debug_print(f"Found {len(md_files)} markdown files in {content_dir}")
88 |
89 | for file_path in md_files:
90 | try:
91 | with open(file_path, 'r', encoding='utf-8') as f:
92 | content = f.read()
93 |
94 | meta, data = self._parse_markdown(content)
95 | self.path_to_content[file_path] = ContentFile(
96 | path=file_path,
97 | meta=meta,
98 | data=data
99 | )
100 | except Exception as e:
101 | debug_print(f"Error processing {file_path}: {e}")
102 |
103 | debug_print(f"Total files processed: {len(self.path_to_content)}")
104 |
105 | def _normalize_tags(self, tags: Union[str, List, None]) -> List[str]:
106 | """Normalize tags to a list format regardless of input type"""
107 | if tags is None:
108 | return []
109 |
110 | if isinstance(tags, list):
111 | return [str(tag).strip() for tag in tags]
112 |
113 | if isinstance(tags, str):
114 | # If it looks like a YAML list
115 | if tags.startswith('[') and tags.endswith(']'):
116 | inner = tags[1:-1].strip()
117 | if not inner:
118 | return []
119 | return [tag.strip().strip('\'"') for tag in inner.split(',')]
120 |
121 | # If it's a comma-separated string
122 | if ',' in tags:
123 | return [tag.strip() for tag in tags.split(',')]
124 |
125 | # Single tag
126 | return [tags.strip()]
127 |
128 | # Any other type, convert to string and return as single item
129 | return [str(tags)]
130 |
131 | def _parse_markdown(self, content: str) -> Tuple[Dict[str, Any], str]:
132 | """Parse markdown content, separating front matter from content"""
133 | front_matter_pattern = r"^---\s*\n(.*?)\n---\s*\n"
134 | match = re.search(front_matter_pattern, content, re.DOTALL)
135 |
136 | if not match:
137 | return {}, content
138 |
139 | front_matter_text = match.group(1)
140 |
141 | # Use PyYAML to properly parse the front matter
142 | try:
143 | meta = yaml.safe_load(front_matter_text) or {}
144 | except Exception as e:
145 | debug_print(f"YAML parsing error: {e}")
146 | meta = {}
147 |
148 | # Ensure meta is a dictionary
149 | if not isinstance(meta, dict):
150 | debug_print(f"Front matter did not parse as a dictionary: {type(meta)}")
151 | meta = {}
152 |
153 | # Ensure tags are always in list format
154 | if 'tags' in meta and meta['tags'] is not None:
155 | if not isinstance(meta['tags'], list):
156 | meta['tags'] = [meta['tags']]
157 |
158 | # Extract the actual content (everything after front matter)
159 | data = content[match.end():]
160 |
161 | return meta, data
162 |
163 | def get_by_tag(self, tag: str, limit: int = 50) -> List[ContentFile]:
164 | """Find all files with a given tag"""
165 | matches = []
166 | tag_lower = tag.lower()
167 |
168 | debug_print(f"Searching for tag: '{tag_lower}'")
169 | for file_path, content_file in self.path_to_content.items():
170 | raw_tags = content_file.meta.get('tags', [])
171 | tags = self._normalize_tags(raw_tags)
172 |
173 | # Debug
174 | if tags:
175 | debug_print(f"File: {os.path.basename(file_path)} - Tags: {tags}")
176 |
177 | # Check for exact tag match (case insensitive)
178 | if any(tag_lower == t.lower() for t in tags):
179 | debug_print(f"Found exact tag match in {os.path.basename(file_path)}")
180 | matches.append(content_file)
181 | continue
182 |
183 | # Check if the tag is part of a tag
184 | for t in tags:
185 | if tag_lower in t.lower():
186 | debug_print(f"Found partial tag match in {os.path.basename(file_path)}: '{t}'")
187 | matches.append(content_file)
188 | break
189 |
190 | debug_print(f"Found {len(matches)} files with tag '{tag}'")
191 |
192 | # Sort by date (most recent first)
193 | def get_sort_key(content_file):
194 | date = content_file.date
195 | if date is None:
196 | return datetime.min
197 | # Make date naive if it has timezone info
198 | if hasattr(date, 'tzinfo') and date.tzinfo is not None:
199 | date = date.replace(tzinfo=None)
200 | return date
201 |
202 | matches.sort(key=get_sort_key, reverse=True)
203 |
204 | return matches[:limit]
205 |
206 | def get_by_text(self, query: str, limit: int = 50) -> List[ContentFile]:
207 | """Find all files containing the specified text"""
208 | matches = []
209 | query_lower = query.lower()
210 |
211 | debug_print(f"Searching for text: '{query}'")
212 | for file_path, content_file in self.path_to_content.items():
213 | if query_lower in content_file.data.lower():
214 | matches.append(content_file)
215 |
216 | debug_print(f"Found {len(matches)} files containing '{query}'")
217 |
218 | # Sort by date (most recent first)
219 | def get_sort_key(content_file):
220 | date = content_file.date
221 | if date is None:
222 | return datetime.min
223 | # Make date naive if it has timezone info
224 | if hasattr(date, 'tzinfo') and date.tzinfo is not None:
225 | date = date.replace(tzinfo=None)
226 | return date
227 |
228 | matches.sort(key=get_sort_key, reverse=True)
229 |
230 | return matches[:limit]
231 |
232 | def search_tags(self, tag_query: str, limit: int = 20) -> List[str]:
233 | """Search for tags matching the provided query"""
234 | all_tags = set()
235 | tag_query_lower = tag_query.lower()
236 |
237 | debug_print(f"Searching for tags containing: '{tag_query_lower}'")
238 | for _, content_file in self.path_to_content.items():
239 | raw_tags = content_file.meta.get('tags', [])
240 | tags = self._normalize_tags(raw_tags)
241 |
242 | # Add tags that match the query
243 | for tag in tags:
244 | if tag_query_lower in tag.lower():
245 | all_tags.add(tag)
246 |
247 | # Convert to list and sort alphabetically
248 | tag_list = sorted(list(all_tags))
249 | debug_print(f"Found {len(tag_list)} tags matching '{tag_query_lower}'")
250 |
251 | return tag_list[:limit]
252 |
253 | def list_all_tags(self) -> List[Tuple[str, int, Optional[datetime]]]:
254 | """List all tags with their post count and most recent post date"""
255 | tag_info: Dict[str, Tuple[int, Optional[datetime]]] = {}
256 |
257 | debug_print("Collecting tag statistics...")
258 | for _, content_file in self.path_to_content.items():
259 | raw_tags = content_file.meta.get('tags', [])
260 | tags = self._normalize_tags(raw_tags)
261 | post_date = content_file.date
262 |
263 | for tag in tags:
264 | if tag in tag_info:
265 | count, latest_date = tag_info[tag]
266 | # Handle the case where either date might be None
267 | if latest_date is None:
268 | new_latest = post_date
269 | elif post_date is None:
270 | new_latest = latest_date
271 | else:
272 | # Make both dates naive if they're not already
273 | if hasattr(latest_date, 'tzinfo') and latest_date.tzinfo is not None:
274 | latest_date = latest_date.replace(tzinfo=None)
275 | if hasattr(post_date, 'tzinfo') and post_date.tzinfo is not None:
276 | post_date = post_date.replace(tzinfo=None)
277 | new_latest = max(latest_date, post_date)
278 | tag_info[tag] = (count + 1, new_latest)
279 | else:
280 | tag_info[tag] = (1, post_date)
281 |
282 | # Convert to list of tuples (tag, count, latest_date)
283 | result = [(tag, count, date) for tag, (count, date) in tag_info.items()]
284 |
285 | # Sort by count (descending) and then by date (most recent first)
286 | # Make all dates naive for comparison
287 | def get_sort_key(item):
288 | count = item[1]
289 | date = item[2]
290 | if date is None:
291 | return (-count, datetime.min)
292 | # Make date naive if it has timezone info
293 | if hasattr(date, 'tzinfo') and date.tzinfo is not None:
294 | date = date.replace(tzinfo=None)
295 | return (-count, date)
296 |
297 | result.sort(key=get_sort_key, reverse=True)
298 |
299 | debug_print(f"Collected statistics for {len(result)} tags")
300 | return result
301 |
302 | def get_by_slug_or_url(self, identifier: str) -> Optional[ContentFile]:
303 | """Find a post by its slug or URL"""
304 | identifier_lower = identifier.lower()
305 |
306 | debug_print(f"Searching for post with slug or URL: '{identifier}'")
307 |
308 | # First check for exact URL match (case insensitive)
309 | for _, content_file in self.path_to_content.items():
310 | url = content_file.url
311 | if url and url.lower() == identifier_lower:
312 | debug_print(f"Found exact URL match: {url}")
313 | return content_file
314 |
315 | # Then check for exact slug match (case insensitive)
316 | for _, content_file in self.path_to_content.items():
317 | slug = content_file.slug
318 | if slug.lower() == identifier_lower:
319 | debug_print(f"Found exact slug match: {slug}")
320 | return content_file
321 |
322 | # Try partial path match if no exact matches found
323 | for path, content_file in self.path_to_content.items():
324 | if identifier_lower in path.lower():
325 | debug_print(f"Found partial path match: {path}")
326 | return content_file
327 |
328 | debug_print(f"No post found for '{identifier}'")
329 | return None
330 |
331 | def get_by_date_range(self, start_date: datetime, end_date: datetime, limit: int = 50) -> List[ContentFile]:
332 | """Find all posts within a date range"""
333 | matches = []
334 |
335 | debug_print(f"Searching for posts between {start_date} and {end_date}")
336 | for _, content_file in self.path_to_content.items():
337 | post_date = content_file.date
338 | if post_date:
339 | # Make date naive for comparison if it has timezone info
340 | if hasattr(post_date, 'tzinfo') and post_date.tzinfo is not None:
341 | post_date = post_date.replace(tzinfo=None)
342 |
343 | # Make start and end dates naive for comparison
344 | start_naive = start_date
345 | if hasattr(start_naive, 'tzinfo') and start_naive.tzinfo is not None:
346 | start_naive = start_naive.replace(tzinfo=None)
347 |
348 | end_naive = end_date
349 | if hasattr(end_naive, 'tzinfo') and end_naive.tzinfo is not None:
350 | end_naive = end_naive.replace(tzinfo=None)
351 |
352 | if start_naive <= post_date <= end_naive:
353 | matches.append(content_file)
354 |
355 | debug_print(f"Found {len(matches)} posts within date range")
356 |
357 | # Sort by date (most recent first)
358 | def get_sort_key(content_file):
359 | date = content_file.date
360 | if date is None:
361 | return datetime.min
362 | # Make date naive if it has timezone info
363 | if hasattr(date, 'tzinfo') and date.tzinfo is not None:
364 | date = date.replace(tzinfo=None)
365 | return date
366 |
367 | matches.sort(key=get_sort_key, reverse=True)
368 |
369 | return matches[:limit]
370 |
371 |
372 | def format_content_for_output(content_files: List[ContentFile]) -> str:
373 | """Format the content files for output"""
374 | if not content_files:
375 | return "No matching content found."
376 |
377 | result = []
378 |
379 | for i, file in enumerate(content_files):
380 | result.append(f"File: {file.path}")
381 | result.append("Metadata:")
382 | for key, value in file.meta.items():
383 | result.append(f" {key}: {value}")
384 |
385 | # Include the full content
386 | result.append("Content:")
387 | result.append(file.data.strip())
388 |
389 | # Add separator between entries, but not after the last one
390 | if i < len(content_files) - 1:
391 | result.append("-" * 50)
392 |
393 | return "\n".join(result)
394 |
395 |
396 | def format_tags_for_output(tags: List[Tuple[str, int, Optional[datetime]]]) -> str:
397 | """Format tag information for output"""
398 | if not tags:
399 | return "No tags found."
400 |
401 | result = []
402 | result.append("Tags (by post count and most recent post):")
403 |
404 | for tag, count, date in tags:
405 | if date is None:
406 | date_str = "Unknown"
407 | else:
408 | # Strip timezone info for display if present
409 | if hasattr(date, 'tzinfo') and date.tzinfo is not None:
410 | date = date.replace(tzinfo=None)
411 |
412 | # Only use date part for display
413 | if date != datetime.min:
414 | date_str = date.strftime("%Y-%m-%d")
415 | else:
416 | date_str = "Unknown"
417 |
418 | result.append(f"- {tag}: {count} posts, most recent: {date_str}")
419 |
420 | return "\n".join(result)
421 |
422 |
423 | # Create MCP server
424 | mcp = FastMCP("hugo_content")
425 | content_manager = None
426 |
427 |
428 | @mcp.tool()
429 | async def get_by_tag(tag: str, limit: int = 50) -> str:
430 | """Get blog content by its tag.
431 |
432 | Args:
433 | tag: the tag associated with content
434 | limit: the number of results to include
435 | """
436 | if content_manager is None:
437 | return "Content has not been loaded. Please ensure the server is properly initialized."
438 |
439 | matching_content = content_manager.get_by_tag(tag, limit)
440 | return format_content_for_output(matching_content)
441 |
442 |
443 | @mcp.tool()
444 | async def get_by_text(query: str, limit: int = 50) -> str:
445 | """Get blog content by text in content.
446 |
447 | Args:
448 | query: text for an exact match
449 | limit: the number of results to include
450 | """
451 | if content_manager is None:
452 | return "Content has not been loaded. Please ensure the server is properly initialized."
453 |
454 | matching_content = content_manager.get_by_text(query, limit)
455 | return format_content_for_output(matching_content)
456 |
457 |
458 | @mcp.tool()
459 | async def rebuild() -> bool:
460 | """Rebuild text index. Useful for when contents have changed on disk"""
461 | if content_manager is None:
462 | return False
463 |
464 | debug_print("Rebuilding content index...")
465 | content_manager.load_content()
466 | debug_print("Content index rebuilt successfully")
467 | return True
468 |
469 |
470 | @mcp.tool()
471 | async def search_tags(tag_query: str, limit: int = 20) -> str:
472 | """Search for tags matching the provided query.
473 |
474 | Args:
475 | tag_query: partial or full tag name to search for
476 | limit: the maximum number of tags to return
477 | """
478 | if content_manager is None:
479 | return "Content has not been loaded. Please ensure the server is properly initialized."
480 |
481 | matching_tags = content_manager.search_tags(tag_query, limit)
482 |
483 | if not matching_tags:
484 | return f"No tags found matching '{tag_query}'."
485 |
486 | result = [f"Tags matching '{tag_query}':"]
487 | for tag in matching_tags:
488 | result.append(f"- {tag}")
489 |
490 | return "\n".join(result)
491 |
492 |
493 | @mcp.tool()
494 | async def list_all_tags() -> str:
495 | """List all tags sorted by number of posts and most recent post."""
496 | if content_manager is None:
497 | return "Content has not been loaded. Please ensure the server is properly initialized."
498 |
499 | tag_info = content_manager.list_all_tags()
500 | return format_tags_for_output(tag_info)
501 |
502 |
503 | @mcp.tool()
504 | async def get_by_slug_or_url(identifier: str) -> str:
505 | """Get a post by its slug or URL.
506 |
507 | Args:
508 | identifier: the slug, URL, or path fragment to search for
509 | """
510 | if content_manager is None:
511 | return "Content has not been loaded. Please ensure the server is properly initialized."
512 |
513 | post = content_manager.get_by_slug_or_url(identifier)
514 |
515 | if post is None:
516 | return f"No post found with slug or URL matching '{identifier}'."
517 |
518 | # Format as a list to reuse format_content_for_output
519 | return format_content_for_output([post])
520 |
521 |
522 | @mcp.tool()
523 | async def get_by_date_range(start_date: str, end_date: str, limit: int = 50) -> str:
524 | """Get posts published within a date range.
525 |
526 | Args:
527 | start_date: the start date in ISO format (YYYY-MM-DD)
528 | end_date: the end date in ISO format (YYYY-MM-DD)
529 | limit: the maximum number of posts to return
530 | """
531 | if content_manager is None:
532 | return "Content has not been loaded. Please ensure the server is properly initialized."
533 |
534 | try:
535 | # Parse dates with time set to beginning/end of day
536 | # Always create naive datetimes for consistent comparison
537 | start = datetime.fromisoformat(f"{start_date}T00:00:00")
538 | end = datetime.fromisoformat(f"{end_date}T23:59:59")
539 | except ValueError as e:
540 | return f"Error parsing dates: {e}. Please use ISO format (YYYY-MM-DD)."
541 |
542 | posts = content_manager.get_by_date_range(start, end, limit)
543 | return format_content_for_output(posts)
544 |
545 |
546 | if __name__ == "__main__":
547 | if len(sys.argv) < 2:
548 | debug_print("Usage: python script.py <content_dir1> [<content_dir2> ...]")
549 | sys.exit(1)
550 |
551 | content_dirs = sys.argv[1:]
552 | debug_print(f"Loading content from directories: {', '.join(content_dirs)}")
553 |
554 | content_manager = HugoContentManager(content_dirs)
555 | debug_print(f"Loaded {len(content_manager.path_to_content)} markdown files")
556 |
557 | mcp.run(transport='stdio')
```