#
tokens: 46266/50000 3/79 files (page 4/4)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 4 of 4. Use http://codebase.md/utensils/mcp-nixos?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── mcp-server-architect.md
│   │   ├── nix-expert.md
│   │   └── python-expert.md
│   ├── commands
│   │   └── release.md
│   └── settings.json
├── .dockerignore
├── .envrc
├── .github
│   └── workflows
│       ├── ci.yml
│       ├── claude-code-review.yml
│       ├── claude.yml
│       ├── deploy-flakehub.yml
│       ├── deploy-website.yml
│       └── publish.yml
├── .gitignore
├── .mcp.json
├── .pre-commit-config.yaml
├── CLAUDE.md
├── Dockerfile
├── flake.lock
├── flake.nix
├── LICENSE
├── MANIFEST.in
├── mcp_nixos
│   ├── __init__.py
│   └── server.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── RELEASE_NOTES.md
├── RELEASE_WORKFLOW.md
├── smithery.yaml
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_channels.py
│   ├── test_edge_cases.py
│   ├── test_evals.py
│   ├── test_flakes.py
│   ├── test_integration.py
│   ├── test_main.py
│   ├── test_mcp_behavior.py
│   ├── test_mcp_tools.py
│   ├── test_nixhub.py
│   ├── test_nixos_stats.py
│   ├── test_options.py
│   ├── test_plain_text_output.py
│   ├── test_real_world_scenarios.py
│   ├── test_regression.py
│   └── test_server.py
├── uv.lock
└── website
    ├── .eslintignore
    ├── .eslintrc.json
    ├── .gitignore
    ├── .prettierignore
    ├── .prettierrc
    ├── .vscode
    │   └── settings.json
    ├── app
    │   ├── about
    │   │   └── page.tsx
    │   ├── docs
    │   │   └── claude.html
    │   ├── globals.css
    │   ├── layout.tsx
    │   ├── page.tsx
    │   ├── test-code-block
    │   │   └── page.tsx
    │   └── usage
    │       └── page.tsx
    ├── components
    │   ├── AnchorHeading.tsx
    │   ├── ClientFooter.tsx
    │   ├── ClientNavbar.tsx
    │   ├── CodeBlock.tsx
    │   ├── CollapsibleSection.tsx
    │   ├── FeatureCard.tsx
    │   ├── Footer.tsx
    │   └── Navbar.tsx
    ├── metadata-checker.html
    ├── netlify.toml
    ├── next.config.js
    ├── package-lock.json
    ├── package.json
    ├── postcss.config.js
    ├── public
    │   ├── favicon
    │   │   ├── android-chrome-192x192.png
    │   │   ├── android-chrome-512x512.png
    │   │   ├── apple-touch-icon.png
    │   │   ├── browserconfig.xml
    │   │   ├── favicon-16x16.png
    │   │   ├── favicon-32x32.png
    │   │   ├── favicon.ico
    │   │   ├── mstile-150x150.png
    │   │   ├── README.md
    │   │   └── site.webmanifest
    │   ├── images
    │   │   ├── .gitkeep
    │   │   ├── attribution.md
    │   │   ├── claude-logo.png
    │   │   ├── JamesBrink.jpeg
    │   │   ├── mcp-nixos.png
    │   │   ├── nixos-snowflake-colour.svg
    │   │   ├── og-image.png
    │   │   ├── sean-callan.png
    │   │   └── utensils-logo.png
    │   ├── robots.txt
    │   └── sitemap.xml
    ├── README.md
    ├── tailwind.config.js
    ├── tsconfig.json
    └── windsurf_deployment.yaml
```

# Files

--------------------------------------------------------------------------------
/tests/test_mcp_behavior.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """MCP behavior evaluation tests for real-world usage scenarios."""
  3 | 
  4 | from unittest.mock import Mock, patch
  5 | 
  6 | import pytest
  7 | from mcp_nixos import server
  8 | 
  9 | 
 10 | def get_tool_function(tool_name: str):
 11 |     """Get the underlying function from a FastMCP tool."""
 12 |     tool = getattr(server, tool_name)
 13 |     if hasattr(tool, "fn"):
 14 |         return tool.fn
 15 |     return tool
 16 | 
 17 | 
 18 | # Get the underlying functions for direct use
 19 | darwin_info = get_tool_function("darwin_info")
 20 | darwin_options_by_prefix = get_tool_function("darwin_options_by_prefix")
 21 | darwin_search = get_tool_function("darwin_search")
 22 | darwin_stats = get_tool_function("darwin_stats")
 23 | home_manager_info = get_tool_function("home_manager_info")
 24 | home_manager_list_options = get_tool_function("home_manager_list_options")
 25 | home_manager_options_by_prefix = get_tool_function("home_manager_options_by_prefix")
 26 | home_manager_search = get_tool_function("home_manager_search")
 27 | home_manager_stats = get_tool_function("home_manager_stats")
 28 | nixos_channels = get_tool_function("nixos_channels")
 29 | nixos_info = get_tool_function("nixos_info")
 30 | nixos_search = get_tool_function("nixos_search")
 31 | nixos_stats = get_tool_function("nixos_stats")
 32 | 
 33 | 
 34 | class MockAssistant:
 35 |     """Mock AI assistant for testing MCP tool usage patterns."""
 36 | 
 37 |     def __init__(self):
 38 |         self.tool_calls = []
 39 |         self.responses = []
 40 | 
 41 |     async def use_tool(self, tool_name: str, **kwargs) -> str:
 42 |         """Simulate using an MCP tool."""
 43 |         from mcp_nixos import server
 44 | 
 45 |         self.tool_calls.append({"tool": tool_name, "args": kwargs})
 46 | 
 47 |         # Call the actual tool - get the underlying function from FastMCP tool
 48 |         tool_func = getattr(server, tool_name)
 49 |         if hasattr(tool_func, "fn"):
 50 |             # FastMCP wrapped function - use the underlying function
 51 |             result = await tool_func.fn(**kwargs)
 52 |         else:
 53 |             # Direct function call
 54 |             result = await tool_func(**kwargs)
 55 |         self.responses.append(result)
 56 |         return result
 57 | 
 58 |     def analyze_response(self, response: str) -> dict[str, bool | int]:
 59 |         """Analyze tool response for key information."""
 60 |         analysis = {
 61 |             "has_results": "Found" in response or ":" in response,
 62 |             "is_error": "Error" in response,
 63 |             "has_bullet_points": "•" in response,
 64 |             "line_count": len(response.strip().split("\n")),
 65 |             "mentions_not_found": "not found" in response.lower(),
 66 |         }
 67 |         return analysis
 68 | 
 69 | 
 70 | @pytest.mark.evals
 71 | class TestMCPBehaviorEvals:
 72 |     """Test MCP tool behavior in realistic scenarios."""
 73 | 
 74 |     @pytest.mark.asyncio
 75 |     async def test_scenario_install_package(self):
 76 |         """User wants to install a specific package."""
 77 |         assistant = MockAssistant()
 78 | 
 79 |         # Step 1: Search for the package
 80 |         response1 = await assistant.use_tool("nixos_search", query="neovim", search_type="packages", limit=5)
 81 |         analysis1 = assistant.analyze_response(response1)
 82 | 
 83 |         assert analysis1["has_results"] or analysis1["mentions_not_found"]
 84 |         assert not analysis1["is_error"]
 85 | 
 86 |         # Step 2: Get detailed info if found
 87 |         if analysis1["has_results"]:
 88 |             response2 = await assistant.use_tool("nixos_info", name="neovim", type="package")
 89 |             analysis2 = assistant.analyze_response(response2)
 90 | 
 91 |             assert "Package:" in response2
 92 |             assert "Version:" in response2
 93 |             assert not analysis2["is_error"]
 94 | 
 95 |         # Verify tool usage pattern
 96 |         assert len(assistant.tool_calls) >= 1
 97 |         assert assistant.tool_calls[0]["tool"] == "nixos_search"
 98 | 
 99 |     @pytest.mark.asyncio
100 |     async def test_scenario_configure_service(self):
101 |         """User wants to configure a NixOS service."""
102 |         assistant = MockAssistant()
103 | 
104 |         # Step 1: Search for service options
105 |         response1 = await assistant.use_tool("nixos_search", query="nginx", search_type="options", limit=10)
106 | 
107 |         # Step 2: Get specific option details
108 |         if "services.nginx.enable" in response1:
109 |             response2 = await assistant.use_tool("nixos_info", name="services.nginx.enable", type="option")
110 | 
111 |             assert "Type: boolean" in response2
112 |             assert "Default:" in response2
113 | 
114 |     @pytest.mark.asyncio
115 |     async def test_scenario_explore_home_manager(self):
116 |         """User wants to explore Home Manager configuration."""
117 |         assistant = MockAssistant()
118 | 
119 |         # Step 1: List categories
120 |         response1 = await assistant.use_tool("home_manager_list_options")
121 |         assert "programs" in response1
122 |         assert "services" in response1
123 | 
124 |         # Step 2: Explore programs category
125 |         await assistant.use_tool("home_manager_options_by_prefix", option_prefix="programs")
126 | 
127 |         # Step 3: Search for specific program
128 |         response3 = await assistant.use_tool("home_manager_search", query="firefox", limit=5)
129 | 
130 |         # Step 4: Get details on specific option
131 |         if "programs.firefox.enable" in response3:
132 |             response4 = await assistant.use_tool("home_manager_info", name="programs.firefox.enable")
133 |             assert "Option:" in response4
134 | 
135 |     @pytest.mark.asyncio
136 |     async def test_scenario_macos_configuration(self):
137 |         """User wants to configure macOS with nix-darwin."""
138 |         assistant = MockAssistant()
139 | 
140 |         # Step 1: Search for Homebrew integration
141 |         await assistant.use_tool("darwin_search", query="homebrew", limit=10)
142 | 
143 |         # Step 2: Explore system defaults
144 |         response2 = await assistant.use_tool("darwin_options_by_prefix", option_prefix="system.defaults")
145 | 
146 |         # Step 3: Get specific dock settings
147 |         if "system.defaults.dock" in response2:
148 |             response3 = await assistant.use_tool("darwin_options_by_prefix", option_prefix="system.defaults.dock")
149 | 
150 |             # Check for autohide option
151 |             if "autohide" in response3:
152 |                 response4 = await assistant.use_tool("darwin_info", name="system.defaults.dock.autohide")
153 |                 assert "Option:" in response4
154 | 
155 |     @pytest.mark.asyncio
156 |     async def test_scenario_compare_channels(self):
157 |         """User wants to compare packages across channels."""
158 |         assistant = MockAssistant()
159 | 
160 |         package = "postgresql"
161 |         channels = ["unstable", "stable"]
162 | 
163 |         results = {}
164 |         for channel in channels:
165 |             response = await assistant.use_tool("nixos_info", name=package, type="package", channel=channel)
166 |             if "Version:" in response:
167 |                 # Extract version
168 |                 for line in response.split("\n"):
169 |                     if line.startswith("Version:"):
170 |                         results[channel] = line.split("Version:")[1].strip()
171 | 
172 |         # User can now compare versions across channels
173 |         assert len(assistant.tool_calls) == len(channels)
174 | 
175 |     @pytest.mark.asyncio
176 |     async def test_scenario_find_package_by_program(self):
177 |         """User wants to find which package provides a specific program."""
178 |         assistant = MockAssistant()
179 | 
180 |         # Search for package that provides 'gcc'
181 |         response = await assistant.use_tool("nixos_search", query="gcc", search_type="programs", limit=10)
182 | 
183 |         analysis = assistant.analyze_response(response)
184 |         if analysis["has_results"]:
185 |             assert "provided by" in response
186 |             assert "gcc" in response.lower()
187 | 
188 |     @pytest.mark.asyncio
189 |     async def test_scenario_complex_option_exploration(self):
190 |         """User wants to understand complex NixOS options."""
191 |         assistant = MockAssistant()
192 | 
193 |         # Look for virtualisation options
194 |         response1 = await assistant.use_tool(
195 |             "nixos_search", query="virtualisation.docker", search_type="options", limit=20
196 |         )
197 | 
198 |         if "virtualisation.docker.enable" in response1:
199 |             # Get details on enable option
200 |             await assistant.use_tool("nixos_info", name="virtualisation.docker.enable", type="option")
201 | 
202 |             # Search for related options
203 |             await assistant.use_tool("nixos_search", query="docker", search_type="options", limit=10)
204 | 
205 |             # Verify we get comprehensive docker configuration options
206 |             assert any(r for r in assistant.responses if "docker" in r.lower())
207 | 
208 |     @pytest.mark.asyncio
209 |     async def test_scenario_git_configuration(self):
210 |         """User wants to configure git with Home Manager."""
211 |         assistant = MockAssistant()
212 | 
213 |         # Explore git options
214 |         response1 = await assistant.use_tool("home_manager_options_by_prefix", option_prefix="programs.git")
215 | 
216 |         # Count git-related options
217 |         git_options = response1.count("programs.git")
218 |         assert git_options > 10  # Git should have many options
219 | 
220 |         # Look for specific features
221 |         features = ["delta", "lfs", "signing", "aliases"]
222 |         found_features = sum(1 for f in features if f in response1)
223 |         assert found_features >= 2  # Should find at least some features
224 | 
225 |     @pytest.mark.asyncio
226 |     async def test_scenario_error_recovery(self):
227 |         """Test how tools handle errors and guide users."""
228 |         assistant = MockAssistant()
229 | 
230 |         # Try invalid channel
231 |         response1 = await assistant.use_tool("nixos_search", query="test", channel="invalid-channel")
232 |         assert "Error" in response1
233 |         assert "Invalid channel" in response1
234 | 
235 |         # Try non-existent package
236 |         response2 = await assistant.use_tool("nixos_info", name="definitely-not-a-real-package-12345", type="package")
237 |         assert "not found" in response2.lower()
238 | 
239 |         # Try invalid type
240 |         response3 = await assistant.use_tool("nixos_search", query="test", search_type="invalid-type")
241 |         assert "Error" in response3
242 |         assert "Invalid type" in response3
243 | 
244 |     @pytest.mark.asyncio
245 |     async def test_scenario_bulk_option_discovery(self):
246 |         """User wants to discover all options for a service."""
247 |         assistant = MockAssistant()
248 | 
249 |         # Search for all nginx options
250 |         response1 = await assistant.use_tool("nixos_search", query="services.nginx", search_type="options", limit=50)
251 | 
252 |         if "Found" in response1:
253 |             # Count unique option types
254 |             option_types = set()
255 |             for line in response1.split("\n"):
256 |                 if "Type:" in line:
257 |                     option_type = line.split("Type:")[1].strip()
258 |                     option_types.add(option_type)
259 | 
260 |             # nginx should have various option types
261 |             assert len(option_types) >= 2
262 | 
263 |     @pytest.mark.asyncio
264 |     async def test_scenario_multi_tool_workflow(self):
265 |         """Test realistic multi-step workflows."""
266 |         assistant = MockAssistant()
267 | 
268 |         # Workflow: Set up a development environment
269 | 
270 |         # 1. Check statistics
271 |         stats = await assistant.use_tool("nixos_stats")
272 |         assert "Packages:" in stats
273 | 
274 |         # 2. Search for development tools
275 |         dev_tools = ["vscode", "git", "docker", "nodejs"]
276 |         for tool in dev_tools[:2]:  # Test first two to save time
277 |             response = await assistant.use_tool("nixos_search", query=tool, search_type="packages", limit=3)
278 |             if "Found" in response:
279 |                 # Get info on first result
280 |                 package_name = None
281 |                 for line in response.split("\n"):
282 |                     if line.startswith("•"):
283 |                         # Extract package name
284 |                         package_name = line.split("•")[1].split("(")[0].strip()
285 |                         break
286 | 
287 |                 if package_name:
288 |                     info = await assistant.use_tool("nixos_info", name=package_name, type="package")
289 |                     assert "Package:" in info
290 | 
291 |         # 3. Configure git in Home Manager
292 |         await assistant.use_tool("home_manager_search", query="git", limit=10)
293 | 
294 |         # Verify workflow completed
295 |         assert len(assistant.tool_calls) >= 4
296 |         assert not any("Error" in r for r in assistant.responses[:3])  # First 3 should succeed
297 | 
298 |     @pytest.mark.asyncio
299 |     async def test_scenario_performance_monitoring(self):
300 |         """Monitor performance characteristics of tool calls."""
301 |         import time
302 | 
303 |         assistant = MockAssistant()
304 |         timings = {}
305 | 
306 |         # Time different operations
307 |         operations = [
308 |             ("nixos_stats", {}),
309 |             ("nixos_search", {"query": "python", "limit": 20}),
310 |             ("home_manager_list_options", {}),
311 |             ("darwin_search", {"query": "system", "limit": 10}),
312 |         ]
313 | 
314 |         for op_name, op_args in operations:
315 |             start = time.time()
316 |             try:
317 |                 await assistant.use_tool(op_name, **op_args)
318 |                 elapsed = time.time() - start
319 |                 timings[op_name] = elapsed
320 |             except Exception:
321 |                 timings[op_name] = -1
322 | 
323 |         # All operations should complete reasonably quickly
324 |         for op, timing in timings.items():
325 |             if timing > 0:
326 |                 assert timing < 30, f"{op} took too long: {timing}s"
327 | 
328 |     @pytest.mark.asyncio
329 |     async def test_scenario_option_value_types(self):
330 |         """Test understanding different option value types."""
331 |         assistant = MockAssistant()
332 | 
333 |         # Search for options with different types
334 |         type_examples = {
335 |             "boolean": "enable",
336 |             "string": "description",
337 |             "list": "allowedTCPPorts",
338 |             "attribute set": "extraConfig",
339 |         }
340 | 
341 |         found_types = {}
342 |         for type_name, search_term in type_examples.items():
343 |             response = await assistant.use_tool("nixos_search", query=search_term, search_type="options", limit=5)
344 |             if "Type:" in response:
345 |                 found_types[type_name] = response
346 | 
347 |         # Should find at least some different types
348 |         assert len(found_types) >= 2
349 | 
350 | 
351 | # ===== Content from test_mcp_behavior_comprehensive.py =====
352 | class TestMCPBehaviorComprehensive:
353 |     """Test real-world usage patterns based on actual tool testing results."""
354 | 
355 |     @pytest.mark.asyncio
356 |     async def test_nixos_package_discovery_flow(self):
357 |         """Test typical package discovery workflow."""
358 |         # 1. Search for packages
359 |         with patch("mcp_nixos.server.es_query") as mock_es:
360 |             mock_es.return_value = [
361 |                 {
362 |                     "_source": {
363 |                         "type": "package",
364 |                         "package_pname": "git",
365 |                         "package_pversion": "2.49.0",
366 |                         "package_description": "Distributed version control system",
367 |                     }
368 |                 },
369 |                 {
370 |                     "_source": {
371 |                         "type": "package",
372 |                         "package_pname": "gitoxide",
373 |                         "package_pversion": "0.40.0",
374 |                         "package_description": "Rust implementation of Git",
375 |                     }
376 |                 },
377 |             ]
378 | 
379 |             result = await nixos_search("git", limit=5)
380 |             assert "git (2.49.0)" in result
381 |             assert "Distributed version control system" in result
382 |             assert "gitoxide" in result
383 | 
384 |         # 2. Get detailed info about a specific package
385 |         with patch("mcp_nixos.server.es_query") as mock_es:
386 |             mock_es.return_value = [
387 |                 {
388 |                     "_source": {
389 |                         "type": "package",
390 |                         "package_pname": "git",
391 |                         "package_pversion": "2.49.0",
392 |                         "package_description": "Distributed version control system",
393 |                         "package_homepage": ["https://git-scm.com/"],
394 |                         "package_license_set": ["GNU General Public License v2.0"],
395 |                     }
396 |                 }
397 |             ]
398 | 
399 |             result = await nixos_info("git")
400 |             assert "Package: git" in result
401 |             assert "Version: 2.49.0" in result
402 |             assert "Homepage: https://git-scm.com/" in result
403 |             assert "License: GNU General Public License v2.0" in result
404 | 
405 |     @pytest.mark.asyncio
406 |     async def test_nixos_channel_awareness(self):
407 |         """Test channel discovery and usage."""
408 |         # 1. List available channels
409 |         with patch("mcp_nixos.server.channel_cache.get_available") as mock_discover:
410 |             with patch("mcp_nixos.server.channel_cache.get_resolved") as mock_resolved:
411 |                 mock_discover.return_value = {
412 |                     "latest-43-nixos-unstable": "151,798 documents",
413 |                     "latest-43-nixos-25.05": "151,698 documents",
414 |                     "latest-43-nixos-25.11": "152,000 documents",
415 |                 }
416 |                 mock_resolved.return_value = {
417 |                     "unstable": "latest-43-nixos-unstable",
418 |                     "stable": "latest-43-nixos-25.11",
419 |                     "25.05": "latest-43-nixos-25.05",
420 |                     "25.11": "latest-43-nixos-25.11",
421 |                     "beta": "latest-43-nixos-25.11",
422 |                 }
423 | 
424 |                 # Mock that we're not using fallback
425 |                 from mcp_nixos.server import channel_cache
426 | 
427 |                 channel_cache.using_fallback = False
428 | 
429 |                 result = await nixos_channels()
430 |                 assert "NixOS Channels" in result
431 |                 assert "stable (current: 25.11)" in result
432 |                 assert "unstable" in result
433 |                 assert "✓ Available" in result
434 | 
435 |         # 2. Get stats for a channel
436 |         with patch("requests.post") as mock_post:
437 |             mock_resp = Mock()
438 |             mock_resp.status_code = 200
439 |             mock_resp.json.side_effect = [
440 |                 {"count": 129865},  # packages
441 |                 {"count": 21933},  # options
442 |             ]
443 |             mock_resp.raise_for_status.return_value = None
444 |             mock_post.return_value = mock_resp
445 | 
446 |             result = await nixos_stats()
447 |             assert "NixOS Statistics" in result
448 |             assert "129,865" in result
449 |             assert "21,933" in result
450 | 
451 |     @pytest.mark.asyncio
452 |     async def test_home_manager_option_discovery_flow(self):
453 |         """Test typical Home Manager option discovery workflow."""
454 |         # 1. Search for options
455 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
456 |             mock_parse.return_value = [
457 |                 {
458 |                     "name": "programs.git.enable",
459 |                     "type": "boolean",
460 |                     "description": "Whether to enable Git",
461 |                 },
462 |                 {
463 |                     "name": "programs.git.userName",
464 |                     "type": "string",
465 |                     "description": "Default Git username",
466 |                 },
467 |                 {
468 |                     "name": "programs.git.userEmail",
469 |                     "type": "string",
470 |                     "description": "Default Git email",
471 |                 },
472 |             ]
473 | 
474 |             result = await home_manager_search("git", limit=3)
475 |             assert "programs.git.enable" in result
476 |             assert "programs.git.userName" in result
477 |             assert "programs.git.userEmail" in result
478 | 
479 |         # 2. Browse by prefix to find exact option names
480 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
481 |             mock_parse.return_value = [
482 |                 {
483 |                     "name": "programs.git.enable",
484 |                     "type": "boolean",
485 |                     "description": "Whether to enable Git",
486 |                 },
487 |                 {
488 |                     "name": "programs.git.aliases",
489 |                     "type": "attribute set of string",
490 |                     "description": "Git aliases",
491 |                 },
492 |                 {
493 |                     "name": "programs.git.delta.enable",
494 |                     "type": "boolean",
495 |                     "description": "Whether to enable delta syntax highlighting",
496 |                 },
497 |             ]
498 | 
499 |             result = await home_manager_options_by_prefix("programs.git")
500 |             assert "programs.git.enable" in result
501 |             assert "programs.git.aliases" in result
502 |             assert "programs.git.delta.enable" in result
503 | 
504 |         # 3. Get specific option info (requires exact name)
505 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
506 |             mock_parse.return_value = [
507 |                 {
508 |                     "name": "programs.git.enable",
509 |                     "type": "boolean",
510 |                     "description": "Whether to enable Git",
511 |                 }
512 |             ]
513 | 
514 |             result = await home_manager_info("programs.git.enable")
515 |             assert "Option: programs.git.enable" in result
516 |             assert "Type: boolean" in result
517 |             assert "Whether to enable Git" in result
518 | 
519 |     @pytest.mark.asyncio
520 |     async def test_home_manager_category_exploration(self):
521 |         """Test exploring Home Manager categories."""
522 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
523 |             # Simulate real category distribution
524 |             mock_parse.return_value = [
525 |                 {"name": "programs.git.enable", "type": "", "description": ""},
526 |                 {"name": "programs.vim.enable", "type": "", "description": ""},
527 |                 {"name": "services.gpg-agent.enable", "type": "", "description": ""},
528 |                 {"name": "home.packages", "type": "", "description": ""},
529 |                 {"name": "accounts.email.accounts", "type": "", "description": ""},
530 |             ]
531 | 
532 |             result = await home_manager_list_options()
533 |             assert "Home Manager option categories" in result
534 |             assert "programs (2 options)" in result
535 |             assert "services (1 options)" in result
536 |             assert "home (1 options)" in result
537 |             assert "accounts (1 options)" in result
538 | 
539 |     @pytest.mark.asyncio
540 |     async def test_darwin_system_configuration_flow(self):
541 |         """Test typical Darwin configuration workflow."""
542 |         # 1. Search for system options
543 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
544 |             mock_parse.return_value = [
545 |                 {
546 |                     "name": "system.defaults.dock.autohide",
547 |                     "type": "boolean",
548 |                     "description": "Whether to automatically hide the dock",
549 |                 },
550 |                 {
551 |                     "name": "system.defaults.NSGlobalDomain.AppleInterfaceStyle",
552 |                     "type": "string",
553 |                     "description": "Set to 'Dark' to enable dark mode",
554 |                 },
555 |                 {
556 |                     "name": "system.stateVersion",
557 |                     "type": "string",
558 |                     "description": "The nix-darwin state version",
559 |                 },
560 |             ]
561 | 
562 |             result = await darwin_search("system", limit=3)
563 |             assert "system.defaults.dock.autohide" in result
564 |             assert "system.defaults.NSGlobalDomain.AppleInterfaceStyle" in result
565 |             assert "system.stateVersion" in result
566 | 
567 |         # 2. Browse system options by prefix
568 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
569 |             mock_parse.return_value = [
570 |                 {
571 |                     "name": "system.defaults.dock.autohide",
572 |                     "type": "boolean",
573 |                     "description": "Whether to automatically hide the dock",
574 |                 },
575 |                 {
576 |                     "name": "system.defaults.dock.autohide-delay",
577 |                     "type": "float",
578 |                     "description": "Dock autohide delay",
579 |                 },
580 |                 {
581 |                     "name": "system.defaults.dock.orientation",
582 |                     "type": "string",
583 |                     "description": "Dock position on screen",
584 |                 },
585 |             ]
586 | 
587 |             result = await darwin_options_by_prefix("system.defaults.dock")
588 |             assert "system.defaults.dock.autohide" in result
589 |             assert "system.defaults.dock.autohide-delay" in result
590 |             assert "system.defaults.dock.orientation" in result
591 | 
592 |     @pytest.mark.asyncio
593 |     async def test_error_handling_with_suggestions(self):
594 |         """Test error handling provides helpful suggestions."""
595 |         # Invalid channel
596 |         with patch("mcp_nixos.server.get_channels") as mock_get:
597 |             mock_get.return_value = {
598 |                 "stable": "latest-43-nixos-25.05",
599 |                 "unstable": "latest-43-nixos-unstable",
600 |                 "25.05": "latest-43-nixos-25.05",
601 |                 "24.11": "latest-43-nixos-24.11",
602 |             }
603 | 
604 |             result = await nixos_search("test", channel="24.05")
605 |             assert "Invalid channel" in result
606 |             assert "Available channels:" in result
607 |             assert "24.11" in result or "25.05" in result
608 | 
609 |     @pytest.mark.asyncio
610 |     async def test_cross_tool_consistency(self):
611 |         """Test that different tools provide consistent information."""
612 |         # Channel consistency
613 |         with patch("mcp_nixos.server.get_channels") as mock_get:
614 |             channels = {
615 |                 "stable": "latest-43-nixos-25.05",
616 |                 "unstable": "latest-43-nixos-unstable",
617 |                 "25.05": "latest-43-nixos-25.05",
618 |                 "beta": "latest-43-nixos-25.05",
619 |             }
620 |             mock_get.return_value = channels
621 | 
622 |             # All tools should accept the same channels
623 |             for channel in ["stable", "unstable", "25.05", "beta"]:
624 |                 with patch("mcp_nixos.server.es_query") as mock_es:
625 |                     mock_es.return_value = []
626 |                     result = await nixos_search("test", channel=channel)
627 |                     assert "Error" not in result or "Invalid channel" not in result
628 | 
629 |     @pytest.mark.asyncio
630 |     async def test_real_world_git_configuration_scenario(self):
631 |         """Test a complete Git configuration discovery scenario."""
632 |         # User wants to configure Git in Home Manager
633 | 
634 |         # Step 1: Search for git-related options
635 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
636 |             mock_parse.return_value = [
637 |                 {
638 |                     "name": "programs.git.enable",
639 |                     "type": "boolean",
640 |                     "description": "Whether to enable Git",
641 |                 },
642 |                 {
643 |                     "name": "programs.git.userName",
644 |                     "type": "string",
645 |                     "description": "Default Git username",
646 |                 },
647 |             ]
648 | 
649 |             result = await home_manager_search("git user")
650 |             assert "programs.git.userName" in result
651 | 
652 |         # Step 2: Browse all git options
653 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
654 |             mock_parse.return_value = [
655 |                 {"name": "programs.git.enable", "type": "boolean", "description": "Whether to enable Git"},
656 |                 {"name": "programs.git.userName", "type": "string", "description": "Default Git username"},
657 |                 {"name": "programs.git.userEmail", "type": "string", "description": "Default Git email"},
658 |                 {"name": "programs.git.signing.key", "type": "string", "description": "GPG signing key"},
659 |                 {
660 |                     "name": "programs.git.signing.signByDefault",
661 |                     "type": "boolean",
662 |                     "description": "Sign commits by default",
663 |                 },
664 |             ]
665 | 
666 |             result = await home_manager_options_by_prefix("programs.git")
667 |             assert "programs.git.userName" in result
668 |             assert "programs.git.userEmail" in result
669 |             assert "programs.git.signing.key" in result
670 | 
671 |         # Step 3: Get details for specific options
672 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
673 |             mock_parse.return_value = [
674 |                 {
675 |                     "name": "programs.git.signing.signByDefault",
676 |                     "type": "boolean",
677 |                     "description": "Whether to sign commits by default",
678 |                 }
679 |             ]
680 | 
681 |             result = await home_manager_info("programs.git.signing.signByDefault")
682 |             assert "Type: boolean" in result
683 |             assert "sign commits by default" in result
684 | 
685 |     @pytest.mark.asyncio
686 |     async def test_performance_with_large_result_sets(self):
687 |         """Test handling of large result sets efficiently."""
688 |         # Home Manager has 2000+ options
689 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
690 |             # Simulate large option set
691 |             mock_options = []
692 |             for i in range(2129):  # Actual count from testing
693 |                 mock_options.append(
694 |                     {
695 |                         "name": f"programs.option{i}",
696 |                         "type": "string",
697 |                         "description": f"Option {i}",
698 |                     }
699 |                 )
700 |             mock_parse.return_value = mock_options
701 | 
702 |             result = await home_manager_list_options()
703 |             assert "2129 options" in result or "programs (" in result
704 | 
705 |     @pytest.mark.asyncio
706 |     async def test_package_not_found_behavior(self):
707 |         """Test behavior when packages/options are not found."""
708 |         # Package not found
709 |         with patch("mcp_nixos.server.es_query") as mock_es:
710 |             mock_es.return_value = []
711 | 
712 |             result = await nixos_info("nonexistent-package")
713 |             assert "not found" in result.lower()
714 | 
715 |         # Option not found
716 |         with patch("mcp_nixos.server.parse_html_options") as mock_parse:
717 |             mock_parse.return_value = []
718 | 
719 |             result = await home_manager_info("nonexistent.option")
720 |             assert "not found" in result.lower()
721 | 
722 |     @pytest.mark.asyncio
723 |     async def test_channel_migration_scenario(self):
724 |         """Test that users can migrate from old to new channels."""
725 |         # User on 24.11 wants to upgrade to 25.05
726 |         with patch("mcp_nixos.server.get_channels") as mock_get:
727 |             mock_get.return_value = {
728 |                 "stable": "latest-43-nixos-25.05",
729 |                 "25.05": "latest-43-nixos-25.05",
730 |                 "24.11": "latest-43-nixos-24.11",
731 |                 "unstable": "latest-43-nixos-unstable",
732 |             }
733 | 
734 |             # Can still query old channel
735 |             with patch("mcp_nixos.server.es_query") as mock_es:
736 |                 mock_es.return_value = []
737 |                 result = await nixos_search("test", channel="24.11")
738 |                 assert "Error" not in result or "Invalid channel" not in result
739 | 
740 |             # Can query new stable
741 |             with patch("mcp_nixos.server.es_query") as mock_es:
742 |                 mock_es.return_value = []
743 |                 result = await nixos_search("test", channel="stable")
744 |                 assert "Error" not in result or "Invalid channel" not in result
745 | 
746 |     @pytest.mark.asyncio
747 |     async def test_option_type_information(self):
748 |         """Test that option type information is properly displayed."""
749 |         test_cases = [
750 |             ("boolean option", "boolean", "programs.git.enable"),
751 |             ("string option", "string", "programs.git.userName"),
752 |             ("attribute set", "attribute set of string", "programs.git.aliases"),
753 |             ("list option", "list of string", "home.packages"),
754 |             ("complex type", "null or string or signed integer", "services.dunst.settings.global.offset"),
755 |         ]
756 | 
757 |         for desc, type_str, option_name in test_cases:
758 |             with patch("mcp_nixos.server.parse_html_options") as mock_parse:
759 |                 mock_parse.return_value = [
760 |                     {
761 |                         "name": option_name,
762 |                         "type": type_str,
763 |                         "description": f"Test {desc}",
764 |                     }
765 |                 ]
766 | 
767 |                 result = await home_manager_info(option_name)
768 |                 assert f"Type: {type_str}" in result
769 | 
770 |     @pytest.mark.asyncio
771 |     @patch("mcp_nixos.server.parse_html_options")
772 |     async def test_stats_functions_limitations(self, mock_parse):
773 |         """Test that stats functions return actual statistics now."""
774 |         # Mock parsed options for Home Manager
775 |         mock_parse.return_value = [
776 |             {"name": "programs.git.enable", "type": "boolean", "description": "Enable git"},
777 |             {"name": "programs.zsh.enable", "type": "boolean", "description": "Enable zsh"},
778 |             {"name": "services.gpg-agent.enable", "type": "boolean", "description": "Enable GPG agent"},
779 |             {"name": "home.packages", "type": "list", "description": "Packages to install"},
780 |             {"name": "wayland.windowManager.sway.enable", "type": "boolean", "description": "Enable Sway"},
781 |             {"name": "xsession.enable", "type": "boolean", "description": "Enable X session"},
782 |         ]
783 | 
784 |         # Home Manager stats now return actual statistics
785 |         result = await home_manager_stats()
786 |         assert "Home Manager Statistics:" in result
787 |         assert "Total options:" in result
788 |         assert "Categories:" in result
789 |         assert "Top categories:" in result
790 | 
791 |         # Mock parsed options for Darwin
792 |         mock_parse.return_value = [
793 |             {"name": "services.nix-daemon.enable", "type": "boolean", "description": "Enable nix-daemon"},
794 |             {"name": "system.defaults.dock.autohide", "type": "boolean", "description": "Auto-hide dock"},
795 |             {"name": "launchd.agents.test", "type": "attribute set", "description": "Launchd agents"},
796 |             {"name": "programs.zsh.enable", "type": "boolean", "description": "Enable zsh"},
797 |             {"name": "homebrew.enable", "type": "boolean", "description": "Enable Homebrew"},
798 |         ]
799 | 
800 |         # Darwin stats now return actual statistics
801 |         result = await darwin_stats()
802 |         assert "nix-darwin Statistics:" in result
803 |         assert "Total options:" in result
804 |         assert "Categories:" in result
805 |         assert "Top categories:" in result
806 | 
```

--------------------------------------------------------------------------------
/tests/test_flakes.py:
--------------------------------------------------------------------------------

```python
   1 | """Evaluation tests for flake search and improved stats functionality."""
   2 | 
   3 | from unittest.mock import MagicMock, Mock, patch
   4 | 
   5 | import pytest
   6 | import requests
   7 | from mcp_nixos import server
   8 | 
   9 | 
  10 | def get_tool_function(tool_name: str):
  11 |     """Get the underlying function from a FastMCP tool."""
  12 |     tool = getattr(server, tool_name)
  13 |     if hasattr(tool, "fn"):
  14 |         return tool.fn
  15 |     return tool
  16 | 
  17 | 
  18 | # Get the underlying functions for direct use
  19 | darwin_stats = get_tool_function("darwin_stats")
  20 | home_manager_stats = get_tool_function("home_manager_stats")
  21 | nixos_flakes_search = get_tool_function("nixos_flakes_search")
  22 | nixos_flakes_stats = get_tool_function("nixos_flakes_stats")
  23 | nixos_search = get_tool_function("nixos_search")
  24 | 
  25 | 
  26 | class TestFlakeSearchEvals:
  27 |     """Test flake search functionality with real-world scenarios."""
  28 | 
  29 |     @pytest.fixture(autouse=True)
  30 |     def mock_channel_validation(self):
  31 |         """Mock channel validation to always pass for 'unstable'."""
  32 |         with patch("mcp_nixos.server.channel_cache") as mock_cache:
  33 |             mock_cache.get_available.return_value = {"unstable": "latest-45-nixos-unstable"}
  34 |             mock_cache.get_resolved.return_value = {"unstable": "latest-45-nixos-unstable"}
  35 |             with patch("mcp_nixos.server.validate_channel") as mock_validate:
  36 |                 mock_validate.return_value = True
  37 |                 yield mock_cache
  38 | 
  39 |     @pytest.fixture
  40 |     def mock_flake_response(self):
  41 |         """Mock response for flake search results."""
  42 |         return {
  43 |             "hits": {
  44 |                 "total": {"value": 3},
  45 |                 "hits": [
  46 |                     {
  47 |                         "_source": {
  48 |                             "flake_attr_name": "neovim",
  49 |                             "flake_name": "nixpkgs",
  50 |                             "flake_url": "github:NixOS/nixpkgs",
  51 |                             "flake_description": "Vim-fork focused on extensibility and usability",
  52 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux", "x86_64-darwin", "aarch64-darwin"],
  53 |                         }
  54 |                     },
  55 |                     {
  56 |                         "_source": {
  57 |                             "flake_attr_name": "packages.x86_64-linux.neovim",
  58 |                             "flake_name": "neovim-nightly",
  59 |                             "flake_url": "github:nix-community/neovim-nightly-overlay",
  60 |                             "flake_description": "Neovim nightly builds",
  61 |                             "flake_platforms": ["x86_64-linux"],
  62 |                         }
  63 |                     },
  64 |                     {
  65 |                         "_source": {
  66 |                             "flake_attr_name": "packages.aarch64-darwin.neovim",
  67 |                             "flake_name": "neovim-nightly",
  68 |                             "flake_url": "github:nix-community/neovim-nightly-overlay",
  69 |                             "flake_description": "Neovim nightly builds",
  70 |                             "flake_platforms": ["aarch64-darwin"],
  71 |                         }
  72 |                     },
  73 |                 ],
  74 |             }
  75 |         }
  76 | 
  77 |     @pytest.fixture
  78 |     def mock_popular_flakes_response(self):
  79 |         """Mock response for popular flakes."""
  80 |         return {
  81 |             "hits": {
  82 |                 "total": {"value": 5},
  83 |                 "hits": [
  84 |                     {
  85 |                         "_source": {
  86 |                             "flake_attr_name": "homeConfigurations.example",
  87 |                             "flake_name": "home-manager",
  88 |                             "flake_url": "github:nix-community/home-manager",
  89 |                             "flake_description": "Manage a user environment using Nix",
  90 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux", "x86_64-darwin", "aarch64-darwin"],
  91 |                         }
  92 |                     },
  93 |                     {
  94 |                         "_source": {
  95 |                             "flake_attr_name": "nixosConfigurations.example",
  96 |                             "flake_name": "nixos-hardware",
  97 |                             "flake_url": "github:NixOS/nixos-hardware",
  98 |                             "flake_description": "NixOS modules to support various hardware",
  99 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux"],
 100 |                         }
 101 |                     },
 102 |                     {
 103 |                         "_source": {
 104 |                             "flake_attr_name": "devShells.x86_64-linux.default",
 105 |                             "flake_name": "devenv",
 106 |                             "flake_url": "github:cachix/devenv",
 107 |                             "flake_description": (
 108 |                                 "Fast, Declarative, Reproducible, and Composable Developer Environments"
 109 |                             ),
 110 |                             "flake_platforms": ["x86_64-linux", "x86_64-darwin"],
 111 |                         }
 112 |                     },
 113 |                     {
 114 |                         "_source": {
 115 |                             "flake_attr_name": "packages.x86_64-linux.agenix",
 116 |                             "flake_name": "agenix",
 117 |                             "flake_url": "github:ryantm/agenix",
 118 |                             "flake_description": "age-encrypted secrets for NixOS",
 119 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux", "x86_64-darwin", "aarch64-darwin"],
 120 |                         }
 121 |                     },
 122 |                     {
 123 |                         "_source": {
 124 |                             "flake_attr_name": "packages.x86_64-darwin.agenix",
 125 |                             "flake_name": "agenix",
 126 |                             "flake_url": "github:ryantm/agenix",
 127 |                             "flake_description": "age-encrypted secrets for NixOS",
 128 |                             "flake_platforms": ["x86_64-darwin", "aarch64-darwin"],
 129 |                         }
 130 |                     },
 131 |                 ],
 132 |             }
 133 |         }
 134 | 
 135 |     @pytest.fixture
 136 |     def mock_empty_response(self):
 137 |         """Mock empty response."""
 138 |         return {"hits": {"total": {"value": 0}, "hits": []}}
 139 | 
 140 |     @patch("requests.post")
 141 |     @pytest.mark.asyncio
 142 |     async def test_flake_search_basic(self, mock_post, mock_flake_response):
 143 |         """Test basic flake search functionality."""
 144 |         mock_post.return_value.status_code = 200
 145 |         mock_post.return_value.json.return_value = mock_flake_response
 146 | 
 147 |         result = await nixos_search("neovim", search_type="flakes")
 148 | 
 149 |         # Verify API call
 150 |         mock_post.assert_called_once()
 151 |         call_args = mock_post.call_args
 152 |         assert "_search" in call_args[0][0]
 153 | 
 154 |         # Check query structure - now using json parameter instead of data
 155 |         query_data = call_args[1]["json"]
 156 |         # The query now uses bool->filter->term for type filtering
 157 |         assert "query" in query_data
 158 |         assert "size" in query_data
 159 | 
 160 |         # Verify output format
 161 |         assert "unique flakes" in result
 162 |         assert "• nixpkgs" in result or "• neovim" in result
 163 |         assert "• neovim-nightly" in result
 164 | 
 165 |     @patch("requests.post")
 166 |     @pytest.mark.asyncio
 167 |     async def test_flake_search_deduplication(self, mock_post, mock_flake_response):
 168 |         """Test that flake deduplication works correctly."""
 169 |         mock_post.return_value.status_code = 200
 170 |         mock_post.return_value.json.return_value = mock_flake_response
 171 | 
 172 |         result = await nixos_search("neovim", search_type="flakes")
 173 | 
 174 |         # Should deduplicate neovim-nightly entries
 175 |         assert result.count("neovim-nightly") == 1
 176 |         # But should show it has multiple packages
 177 |         assert "Neovim nightly builds" in result
 178 | 
 179 |     @patch("requests.post")
 180 |     @pytest.mark.asyncio
 181 |     async def test_flake_search_popular(self, mock_post, mock_popular_flakes_response):
 182 |         """Test searching for popular flakes."""
 183 |         mock_post.return_value.status_code = 200
 184 |         mock_post.return_value.json.return_value = mock_popular_flakes_response
 185 | 
 186 |         result = await nixos_search("home-manager devenv agenix", search_type="flakes")
 187 | 
 188 |         assert "Found 5 total matches (4 unique flakes)" in result or "Found 4 unique flakes" in result
 189 |         assert "• home-manager" in result
 190 |         assert "• devenv" in result
 191 |         assert "• agenix" in result
 192 |         assert "Manage a user environment using Nix" in result
 193 |         assert "Fast, Declarative, Reproducible, and Composable Developer Environments" in result
 194 |         assert "age-encrypted secrets for NixOS" in result
 195 | 
 196 |     @patch("requests.post")
 197 |     @pytest.mark.asyncio
 198 |     async def test_flake_search_no_results(self, mock_post, mock_empty_response):
 199 |         """Test flake search with no results."""
 200 |         mock_post.return_value.status_code = 200
 201 |         mock_post.return_value.json.return_value = mock_empty_response
 202 | 
 203 |         result = await nixos_search("nonexistentflake123", search_type="flakes")
 204 | 
 205 |         assert "No flakes found" in result
 206 | 
 207 |     @patch("requests.post")
 208 |     @pytest.mark.asyncio
 209 |     async def test_flake_search_wildcard(self, mock_post):
 210 |         """Test flake search with wildcard patterns."""
 211 |         mock_response = {
 212 |             "hits": {
 213 |                 "total": {"value": 2},
 214 |                 "hits": [
 215 |                     {
 216 |                         "_source": {
 217 |                             "flake_attr_name": "nixvim",
 218 |                             "flake_name": "nixvim",
 219 |                             "flake_url": "github:nix-community/nixvim",
 220 |                             "flake_description": "Configure Neovim with Nix",
 221 |                             "flake_platforms": ["x86_64-linux", "x86_64-darwin"],
 222 |                         }
 223 |                     },
 224 |                     {
 225 |                         "_source": {
 226 |                             "flake_attr_name": "vim-startify",
 227 |                             "flake_name": "vim-plugins",
 228 |                             "flake_url": "github:m15a/nixpkgs-vim-extra-plugins",
 229 |                             "flake_description": "Extra Vim plugins for Nix",
 230 |                             "flake_platforms": ["x86_64-linux"],
 231 |                         }
 232 |                     },
 233 |                 ],
 234 |             }
 235 |         }
 236 | 
 237 |         mock_post.return_value.status_code = 200
 238 |         mock_post.return_value.json.return_value = mock_response
 239 | 
 240 |         result = await nixos_search("*vim*", search_type="flakes")
 241 | 
 242 |         assert "Found 2 unique flakes" in result
 243 |         assert "• nixvim" in result
 244 |         assert "• vim-plugins" in result
 245 | 
 246 |     @patch("requests.post")
 247 |     @pytest.mark.asyncio
 248 |     async def test_flake_search_error_handling(self, mock_post):
 249 |         """Test flake search error handling."""
 250 |         mock_response = MagicMock()
 251 |         mock_response.status_code = 500
 252 |         mock_response.content = b"Internal Server Error"
 253 | 
 254 |         # Create an HTTPError with a response attribute
 255 |         http_error = requests.HTTPError("500 Server Error")
 256 |         http_error.response = mock_response
 257 |         mock_response.raise_for_status.side_effect = http_error
 258 | 
 259 |         mock_post.return_value = mock_response
 260 | 
 261 |         result = await nixos_search("test", search_type="flakes")
 262 | 
 263 |         assert "Error" in result
 264 |         # The actual error message will be the exception string
 265 |         assert "'NoneType' object has no attribute 'status_code'" not in result
 266 | 
 267 |     @patch("requests.post")
 268 |     @pytest.mark.asyncio
 269 |     async def test_flake_search_malformed_response(self, mock_post):
 270 |         """Test handling of malformed flake responses."""
 271 |         mock_response = {
 272 |             "hits": {
 273 |                 "total": {"value": 1},
 274 |                 "hits": [
 275 |                     {
 276 |                         "_source": {
 277 |                             "flake_attr_name": "broken",
 278 |                             # Missing required fields
 279 |                         }
 280 |                     }
 281 |                 ],
 282 |             }
 283 |         }
 284 | 
 285 |         mock_post.return_value.status_code = 200
 286 |         mock_post.return_value.json.return_value = mock_response
 287 | 
 288 |         result = await nixos_search("broken", search_type="flakes")
 289 | 
 290 |         # Should handle gracefully - with missing fields, no flakes will be created
 291 |         assert "Found 1 total matches (0 unique flakes)" in result
 292 | 
 293 | 
 294 | class TestImprovedStatsEvals:
 295 |     """Test improved stats functionality."""
 296 | 
 297 |     @patch("requests.get")
 298 |     @pytest.mark.asyncio
 299 |     async def test_home_manager_stats_with_data(self, mock_get):
 300 |         """Test home_manager_stats returns actual statistics."""
 301 |         mock_html = """
 302 |         <html>
 303 |         <body>
 304 |             <dl class="variablelist">
 305 |                 <dt id="opt-programs.git.enable">programs.git.enable</dt>
 306 |                 <dd>Enable git</dd>
 307 |                 <dt id="opt-programs.vim.enable">programs.vim.enable</dt>
 308 |                 <dd>Enable vim</dd>
 309 |                 <dt id="opt-services.gpg-agent.enable">services.gpg-agent.enable</dt>
 310 |                 <dd>Enable gpg-agent</dd>
 311 |             </dl>
 312 |         </body>
 313 |         </html>
 314 |         """
 315 | 
 316 |         mock_get.return_value.status_code = 200
 317 |         mock_get.return_value.content = mock_html.encode("utf-8")
 318 | 
 319 |         result = await home_manager_stats()
 320 | 
 321 |         assert "Home Manager Statistics:" in result
 322 |         assert "Total options: 3" in result
 323 |         assert "Categories:" in result
 324 |         assert "- programs: 2 options" in result
 325 |         assert "- services: 1 options" in result
 326 | 
 327 |     @patch("requests.get")
 328 |     @pytest.mark.asyncio
 329 |     async def test_home_manager_stats_error_handling(self, mock_get):
 330 |         """Test home_manager_stats error handling."""
 331 |         mock_get.return_value.status_code = 404
 332 |         mock_get.return_value.content = b"Not Found"
 333 | 
 334 |         result = await home_manager_stats()
 335 | 
 336 |         assert "Error" in result
 337 | 
 338 |     @patch("requests.get")
 339 |     @pytest.mark.asyncio
 340 |     async def test_darwin_stats_with_data(self, mock_get):
 341 |         """Test darwin_stats returns actual statistics."""
 342 |         mock_html = """
 343 |         <html>
 344 |         <body>
 345 |             <div id="toc">
 346 |                 <dl>
 347 |                     <dt><a href="#opt-system.defaults.dock.autohide">system.defaults.dock.autohide</a></dt>
 348 |                     <dd>Auto-hide the dock</dd>
 349 |                     <dt><a href="#opt-system.defaults.finder.ShowPathbar">system.defaults.finder.ShowPathbar</a></dt>
 350 |                     <dd>Show path bar in Finder</dd>
 351 |                     <dt><a href="#opt-homebrew.enable">homebrew.enable</a></dt>
 352 |                     <dd>Enable Homebrew</dd>
 353 |                     <dt><a href="#opt-homebrew.casks">homebrew.casks</a></dt>
 354 |                     <dd>List of Homebrew casks to install</dd>
 355 |                 </dl>
 356 |             </div>
 357 |         </body>
 358 |         </html>
 359 |         """
 360 | 
 361 |         mock_get.return_value.status_code = 200
 362 |         mock_get.return_value.content = mock_html.encode("utf-8")
 363 | 
 364 |         result = await darwin_stats()
 365 | 
 366 |         assert "nix-darwin Statistics:" in result
 367 |         assert "Total options: 4" in result
 368 |         assert "Categories:" in result
 369 |         assert "- system: 2 options" in result
 370 |         assert "- homebrew: 2 options" in result
 371 | 
 372 |     @patch("requests.get")
 373 |     @pytest.mark.asyncio
 374 |     async def test_darwin_stats_error_handling(self, mock_get):
 375 |         """Test darwin_stats error handling."""
 376 |         mock_get.return_value.status_code = 500
 377 |         mock_get.return_value.content = b"Server Error"
 378 | 
 379 |         result = await darwin_stats()
 380 | 
 381 |         assert "Error" in result
 382 | 
 383 |     @patch("requests.get")
 384 |     @pytest.mark.asyncio
 385 |     async def test_stats_with_complex_categories(self, mock_get):
 386 |         """Test stats functions with complex nested categories."""
 387 |         mock_html = """
 388 |         <html>
 389 |         <body>
 390 |             <dl class="variablelist">
 391 |                 <dt id="opt-programs.git.enable">programs.git.enable</dt>
 392 |                 <dd>Enable git</dd>
 393 |                 <dt id="opt-programs.git.signing.key">programs.git.signing.key</dt>
 394 |                 <dd>GPG signing key</dd>
 395 |                 <dt id="opt-services.xserver.displayManager.gdm.enable">services.xserver.displayManager.gdm.enable</dt>
 396 |                 <dd>Enable GDM</dd>
 397 |                 <dt id="opt-home.packages">home.packages</dt>
 398 |                 <dd>List of packages</dd>
 399 |             </dl>
 400 |         </body>
 401 |         </html>
 402 |         """
 403 | 
 404 |         mock_get.return_value.status_code = 200
 405 |         mock_get.return_value.content = mock_html.encode("utf-8")
 406 | 
 407 |         result = await home_manager_stats()
 408 | 
 409 |         assert "Total options: 4" in result
 410 |         assert "- programs: 2 options" in result
 411 |         assert "- services: 1 options" in result
 412 |         assert "- home: 1 options" in result
 413 | 
 414 |     @patch("requests.get")
 415 |     @pytest.mark.asyncio
 416 |     async def test_stats_with_empty_html(self, mock_get):
 417 |         """Test stats functions with empty HTML."""
 418 |         mock_get.return_value.status_code = 200
 419 |         mock_get.return_value.content = b"<html><body></body></html>"
 420 | 
 421 |         result = await home_manager_stats()
 422 | 
 423 |         # When no options are found, the function returns an error
 424 |         assert "Error" in result
 425 |         assert "Failed to fetch Home Manager statistics" in result
 426 | 
 427 | 
 428 | class TestRealWorldScenarios:
 429 |     """Test real-world usage scenarios for flake search and stats."""
 430 | 
 431 |     @pytest.fixture(autouse=True)
 432 |     def mock_channel_validation(self):
 433 |         """Mock channel validation to always pass for 'unstable'."""
 434 |         with patch("mcp_nixos.server.channel_cache") as mock_cache:
 435 |             mock_cache.get_available.return_value = {"unstable": "latest-45-nixos-unstable"}
 436 |             mock_cache.get_resolved.return_value = {"unstable": "latest-45-nixos-unstable"}
 437 |             with patch("mcp_nixos.server.validate_channel") as mock_validate:
 438 |                 mock_validate.return_value = True
 439 |                 yield mock_cache
 440 | 
 441 |     @patch("requests.post")
 442 |     @pytest.mark.asyncio
 443 |     async def test_developer_workflow_flake_search(self, mock_post):
 444 |         """Test a developer searching for development environment flakes."""
 445 |         # First search for devenv
 446 |         devenv_response = {
 447 |             "hits": {
 448 |                 "total": {"value": 1},
 449 |                 "hits": [
 450 |                     {
 451 |                         "_source": {
 452 |                             "flake_attr_name": "devShells.x86_64-linux.default",
 453 |                             "flake_name": "devenv",
 454 |                             "flake_url": "github:cachix/devenv",
 455 |                             "flake_description": (
 456 |                                 "Fast, Declarative, Reproducible, and Composable Developer Environments"
 457 |                             ),
 458 |                             "flake_platforms": ["x86_64-linux", "x86_64-darwin"],
 459 |                         }
 460 |                     }
 461 |                 ],
 462 |             }
 463 |         }
 464 | 
 465 |         mock_post.return_value.status_code = 200
 466 |         mock_post.return_value.json.return_value = devenv_response
 467 | 
 468 |         result = await nixos_search("devenv", search_type="flakes")
 469 | 
 470 |         assert "• devenv" in result
 471 |         assert "Fast, Declarative, Reproducible, and Composable Developer Environments" in result
 472 |         assert "Developer Environments" in result
 473 | 
 474 |     @patch("requests.post")
 475 |     @pytest.mark.asyncio
 476 |     async def test_system_configuration_flake_search(self, mock_post):
 477 |         """Test searching for system configuration flakes."""
 478 |         config_response = {
 479 |             "hits": {
 480 |                 "total": {"value": 3},
 481 |                 "hits": [
 482 |                     {
 483 |                         "_source": {
 484 |                             "flake_attr_name": "nixosModules.default",
 485 |                             "flake_name": "impermanence",
 486 |                             "flake_url": "github:nix-community/impermanence",
 487 |                             "flake_description": (
 488 |                                 "Modules to help you handle persistent state on systems with ephemeral root storage"
 489 |                             ),
 490 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux"],
 491 |                         }
 492 |                     },
 493 |                     {
 494 |                         "_source": {
 495 |                             "flake_attr_name": "nixosModules.home-manager",
 496 |                             "flake_name": "home-manager",
 497 |                             "flake_url": "github:nix-community/home-manager",
 498 |                             "flake_description": "Manage a user environment using Nix",
 499 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux", "x86_64-darwin", "aarch64-darwin"],
 500 |                         }
 501 |                     },
 502 |                     {
 503 |                         "_source": {
 504 |                             "flake_attr_name": "nixosModules.sops",
 505 |                             "flake_name": "sops-nix",
 506 |                             "flake_url": "github:Mic92/sops-nix",
 507 |                             "flake_description": "Atomic secret provisioning for NixOS based on sops",
 508 |                             "flake_platforms": ["x86_64-linux", "aarch64-linux"],
 509 |                         }
 510 |                     },
 511 |                 ],
 512 |             }
 513 |         }
 514 | 
 515 |         mock_post.return_value.status_code = 200
 516 |         mock_post.return_value.json.return_value = config_response
 517 | 
 518 |         result = await nixos_search("nixosModules", search_type="flakes")
 519 | 
 520 |         assert "Found 3 unique flakes" in result
 521 |         assert "• impermanence" in result
 522 |         assert "• home-manager" in result
 523 |         assert "• sops-nix" in result
 524 |         assert "ephemeral root storage" in result
 525 |         assert "secret provisioning" in result
 526 | 
 527 |     @patch("requests.get")
 528 |     @patch("requests.post")
 529 |     @pytest.mark.asyncio
 530 |     async def test_combined_workflow_stats_and_search(self, mock_post, mock_get):
 531 |         """Test a workflow combining stats check and targeted search."""
 532 |         # First, check Home Manager stats
 533 |         stats_html = """
 534 |         <html>
 535 |         <body>
 536 |             <dl class="variablelist">
 537 |                 <dt id="opt-programs.neovim.enable">programs.neovim.enable</dt>
 538 |                 <dd>Enable neovim</dd>
 539 |                 <dt id="opt-programs.neovim.plugins">programs.neovim.plugins</dt>
 540 |                 <dd>List of vim plugins</dd>
 541 |                 <dt id="opt-programs.vim.enable">programs.vim.enable</dt>
 542 |                 <dd>Enable vim</dd>
 543 |             </dl>
 544 |         </body>
 545 |         </html>
 546 |         """
 547 | 
 548 |         mock_get.return_value.status_code = 200
 549 |         mock_get.return_value.content = stats_html.encode("utf-8")
 550 | 
 551 |         stats_result = await home_manager_stats()
 552 | 
 553 |         assert "Total options: 3" in stats_result
 554 |         assert "- programs: 3 options" in stats_result
 555 | 
 556 |         # Then search for related flakes
 557 |         flake_response = {
 558 |             "hits": {
 559 |                 "total": {"value": 1},
 560 |                 "hits": [
 561 |                     {
 562 |                         "_source": {
 563 |                             "flake_attr_name": "homeManagerModules.nixvim",
 564 |                             "flake_name": "nixvim",
 565 |                             "flake_url": "github:nix-community/nixvim",
 566 |                             "flake_description": "Configure Neovim with Nix",
 567 |                             "flake_platforms": ["x86_64-linux", "x86_64-darwin"],
 568 |                         }
 569 |                     }
 570 |                 ],
 571 |             }
 572 |         }
 573 | 
 574 |         mock_post.return_value.status_code = 200
 575 |         mock_post.return_value.json.return_value = flake_response
 576 | 
 577 |         search_result = await nixos_search("nixvim", search_type="flakes")
 578 | 
 579 |         assert "• nixvim" in search_result
 580 |         assert "Configure Neovim with Nix" in search_result
 581 | 
 582 | 
 583 | if __name__ == "__main__":
 584 |     pytest.main([__file__, "-v"])
 585 | 
 586 | 
 587 | # ===== Content from test_flake_search_improved.py =====
 588 | class TestImprovedFlakeSearch:
 589 |     """Test improved flake search functionality."""
 590 | 
 591 |     @pytest.fixture
 592 |     def mock_empty_flake_response(self):
 593 |         """Mock response for empty query with various flake types."""
 594 |         return {
 595 |             "hits": {
 596 |                 "total": {"value": 894},
 597 |                 "hits": [
 598 |                     {
 599 |                         "_source": {
 600 |                             "flake_name": "",
 601 |                             "flake_description": "Home Manager for Nix",
 602 |                             "package_pname": "home-manager",
 603 |                             "package_attr_name": "docs-json",
 604 |                             "flake_source": {"type": "github", "owner": "nix-community", "repo": "home-manager"},
 605 |                             "flake_resolved": {"type": "github", "owner": "nix-community", "repo": "home-manager"},
 606 |                         }
 607 |                     },
 608 |                     {
 609 |                         "_source": {
 610 |                             "flake_name": "haskell.nix",
 611 |                             "flake_description": "Alternative Haskell Infrastructure for Nixpkgs",
 612 |                             "package_pname": "hix",
 613 |                             "package_attr_name": "hix",
 614 |                             "flake_source": {"type": "github", "owner": "input-output-hk", "repo": "haskell.nix"},
 615 |                             "flake_resolved": {"type": "github", "owner": "input-output-hk", "repo": "haskell.nix"},
 616 |                         }
 617 |                     },
 618 |                     {
 619 |                         "_source": {
 620 |                             "flake_name": "nix-vscode-extensions",
 621 |                             "flake_description": (
 622 |                                 "VS Code Marketplace (~40K) and Open VSX (~3K) extensions as Nix expressions."
 623 |                             ),
 624 |                             "package_pname": "updateExtensions",
 625 |                             "package_attr_name": "updateExtensions",
 626 |                             "flake_source": {
 627 |                                 "type": "github",
 628 |                                 "owner": "nix-community",
 629 |                                 "repo": "nix-vscode-extensions",
 630 |                             },
 631 |                             "flake_resolved": {
 632 |                                 "type": "github",
 633 |                                 "owner": "nix-community",
 634 |                                 "repo": "nix-vscode-extensions",
 635 |                             },
 636 |                         }
 637 |                     },
 638 |                     {
 639 |                         "_source": {
 640 |                             "flake_name": "",
 641 |                             "flake_description": "A Python wrapper for the Trovo API",
 642 |                             "package_pname": "python3.11-python-trovo-0.1.7",
 643 |                             "package_attr_name": "default",
 644 |                             "flake_source": {"type": "git", "url": "https://codeberg.org/wolfangaukang/python-trovo"},
 645 |                             "flake_resolved": {"type": "git", "url": "https://codeberg.org/wolfangaukang/python-trovo"},
 646 |                         }
 647 |                     },
 648 |                 ],
 649 |             }
 650 |         }
 651 | 
 652 |     @patch("requests.post")
 653 |     @pytest.mark.asyncio
 654 |     async def test_empty_query_returns_all_flakes(self, mock_post, mock_empty_flake_response):
 655 |         """Test that empty query returns all flakes."""
 656 |         mock_post.return_value.status_code = 200
 657 |         mock_post.return_value.json.return_value = mock_empty_flake_response
 658 | 
 659 |         result = await nixos_flakes_search("", limit=50)
 660 | 
 661 |         # Should use match_all query for empty search
 662 |         call_args = mock_post.call_args
 663 |         query_data = call_args[1]["json"]
 664 |         # The query is wrapped in bool->filter->must structure
 665 |         assert "match_all" in str(query_data["query"])
 666 | 
 667 |         # Should show results
 668 |         assert "4 unique flakes" in result
 669 |         assert "home-manager" in result
 670 |         assert "haskell.nix" in result
 671 |         assert "nix-vscode-extensions" in result
 672 | 
 673 |     @patch("requests.post")
 674 |     @pytest.mark.asyncio
 675 |     async def test_wildcard_query_returns_all_flakes(self, mock_post, mock_empty_flake_response):
 676 |         """Test that * query returns all flakes."""
 677 |         mock_post.return_value.status_code = 200
 678 |         mock_post.return_value.json.return_value = mock_empty_flake_response
 679 | 
 680 |         await nixos_flakes_search("*", limit=50)  # Result not used in this test
 681 | 
 682 |         # Should use match_all query for wildcard
 683 |         call_args = mock_post.call_args
 684 |         query_data = call_args[1]["json"]
 685 |         # The query is wrapped in bool->filter->must structure
 686 |         assert "match_all" in str(query_data["query"])
 687 | 
 688 |     @patch("requests.post")
 689 |     @pytest.mark.asyncio
 690 |     async def test_search_by_owner(self, mock_post):
 691 |         """Test searching by owner like nix-community."""
 692 |         mock_response = {
 693 |             "hits": {
 694 |                 "total": {"value": 2},
 695 |                 "hits": [
 696 |                     {
 697 |                         "_source": {
 698 |                             "flake_name": "home-manager",
 699 |                             "flake_description": "Home Manager for Nix",
 700 |                             "package_pname": "home-manager",
 701 |                             "flake_resolved": {"type": "github", "owner": "nix-community", "repo": "home-manager"},
 702 |                         }
 703 |                     }
 704 |                 ],
 705 |             }
 706 |         }
 707 |         mock_post.return_value.status_code = 200
 708 |         mock_post.return_value.json.return_value = mock_response
 709 | 
 710 |         await nixos_flakes_search("nix-community", limit=20)  # Result tested via assertions
 711 | 
 712 |         # Should search in owner field
 713 |         call_args = mock_post.call_args
 714 |         query_data = call_args[1]["json"]
 715 |         # The query structure has bool->filter and bool->must
 716 |         assert "nix-community" in str(query_data["query"])
 717 | 
 718 |     @patch("requests.post")
 719 |     @pytest.mark.asyncio
 720 |     async def test_deduplication_by_repo(self, mock_post):
 721 |         """Test that multiple packages from same repo are deduplicated."""
 722 |         mock_response = {
 723 |             "hits": {
 724 |                 "total": {"value": 4},
 725 |                 "hits": [
 726 |                     {
 727 |                         "_source": {
 728 |                             "flake_name": "",
 729 |                             "package_pname": "hix",
 730 |                             "package_attr_name": "hix",
 731 |                             "flake_resolved": {"owner": "input-output-hk", "repo": "haskell.nix"},
 732 |                         }
 733 |                     },
 734 |                     {
 735 |                         "_source": {
 736 |                             "flake_name": "",
 737 |                             "package_pname": "hix-build",
 738 |                             "package_attr_name": "hix-build",
 739 |                             "flake_resolved": {"owner": "input-output-hk", "repo": "haskell.nix"},
 740 |                         }
 741 |                     },
 742 |                     {
 743 |                         "_source": {
 744 |                             "flake_name": "",
 745 |                             "package_pname": "hix-env",
 746 |                             "package_attr_name": "hix-env",
 747 |                             "flake_resolved": {"owner": "input-output-hk", "repo": "haskell.nix"},
 748 |                         }
 749 |                     },
 750 |                 ],
 751 |             }
 752 |         }
 753 |         mock_post.return_value.status_code = 200
 754 |         mock_post.return_value.json.return_value = mock_response
 755 | 
 756 |         result = await nixos_flakes_search("haskell", limit=20)
 757 | 
 758 |         # Should show only one flake with multiple packages
 759 |         assert "1 unique flakes" in result
 760 |         assert "input-output-hk/haskell.nix" in result
 761 |         assert "Packages: hix, hix-build, hix-env" in result
 762 | 
 763 |     @patch("requests.post")
 764 |     @pytest.mark.asyncio
 765 |     async def test_handles_flakes_without_name(self, mock_post):
 766 |         """Test handling flakes with empty flake_name."""
 767 |         mock_response = {
 768 |             "hits": {
 769 |                 "total": {"value": 1},
 770 |                 "hits": [
 771 |                     {
 772 |                         "_source": {
 773 |                             "flake_name": "",
 774 |                             "flake_description": "Home Manager for Nix",
 775 |                             "package_pname": "home-manager",
 776 |                             "flake_resolved": {"owner": "nix-community", "repo": "home-manager"},
 777 |                         }
 778 |                     }
 779 |                 ],
 780 |             }
 781 |         }
 782 |         mock_post.return_value.status_code = 200
 783 |         mock_post.return_value.json.return_value = mock_response
 784 | 
 785 |         result = await nixos_flakes_search("home-manager", limit=20)
 786 | 
 787 |         # Should use repo name when flake_name is empty
 788 |         assert "home-manager" in result
 789 |         assert "nix-community/home-manager" in result
 790 | 
 791 |     @patch("requests.post")
 792 |     @pytest.mark.asyncio
 793 |     async def test_no_results_shows_suggestions(self, mock_post):
 794 |         """Test that no results shows helpful suggestions."""
 795 |         mock_response = {"hits": {"total": {"value": 0}, "hits": []}}
 796 |         mock_post.return_value.status_code = 200
 797 |         mock_post.return_value.json.return_value = mock_response
 798 | 
 799 |         result = await nixos_flakes_search("nonexistent", limit=20)
 800 | 
 801 |         assert "No flakes found" in result
 802 |         assert "Popular flakes: nixpkgs, home-manager, flake-utils, devenv" in result
 803 |         assert "By owner: nix-community, numtide, cachix" in result
 804 |         assert "GitHub: https://github.com/topics/nix-flakes" in result
 805 |         assert "FlakeHub: https://flakehub.com/" in result
 806 | 
 807 |     @patch("requests.post")
 808 |     @pytest.mark.asyncio
 809 |     async def test_handles_git_urls(self, mock_post):
 810 |         """Test handling of non-GitHub Git URLs."""
 811 |         mock_response = {
 812 |             "hits": {
 813 |                 "total": {"value": 1},
 814 |                 "hits": [
 815 |                     {
 816 |                         "_source": {
 817 |                             "flake_name": "",
 818 |                             "package_pname": "python-trovo",
 819 |                             "flake_resolved": {"type": "git", "url": "https://codeberg.org/wolfangaukang/python-trovo"},
 820 |                         }
 821 |                     }
 822 |                 ],
 823 |             }
 824 |         }
 825 |         mock_post.return_value.status_code = 200
 826 |         mock_post.return_value.json.return_value = mock_response
 827 | 
 828 |         result = await nixos_flakes_search("python", limit=20)
 829 | 
 830 |         assert "python-trovo" in result
 831 | 
 832 |     @patch("requests.post")
 833 |     @pytest.mark.asyncio
 834 |     async def test_search_tracks_total_hits(self, mock_post):
 835 |         """Test that search tracks total hits."""
 836 |         mock_response = {"hits": {"total": {"value": 894}, "hits": []}}
 837 |         mock_post.return_value.status_code = 200
 838 |         mock_post.return_value.json.return_value = mock_response
 839 | 
 840 |         # Make the call
 841 |         await nixos_flakes_search("", limit=20)
 842 | 
 843 |         # Check that track_total_hits was set
 844 |         call_args = mock_post.call_args
 845 |         query_data = call_args[1]["json"]
 846 |         assert query_data.get("track_total_hits") is True
 847 | 
 848 |     @patch("requests.post")
 849 |     @pytest.mark.asyncio
 850 |     async def test_increased_size_multiplier(self, mock_post):
 851 |         """Test that we request more results to account for duplicates."""
 852 |         mock_response = {"hits": {"total": {"value": 0}, "hits": []}}
 853 |         mock_post.return_value.status_code = 200
 854 |         mock_post.return_value.json.return_value = mock_response
 855 | 
 856 |         await nixos_flakes_search("test", limit=20)
 857 | 
 858 |         # Should request more than limit to account for duplicates
 859 |         call_args = mock_post.call_args
 860 |         query_data = call_args[1]["json"]
 861 |         assert query_data["size"] > 20  # Should be limit * 5 = 100
 862 | 
 863 | 
 864 | # ===== Content from test_flake_search.py =====
 865 | class TestFlakeSearch:
 866 |     """Test flake search functionality."""
 867 | 
 868 |     @pytest.mark.asyncio
 869 |     @patch("mcp_nixos.server.requests.post")
 870 |     async def test_flakes_search_empty_query(self, mock_post):
 871 |         """Test flake search with empty query returns all flakes."""
 872 |         # Mock response
 873 |         mock_response = Mock()
 874 |         mock_response.status_code = 200
 875 |         mock_response.json.return_value = {
 876 |             "hits": {
 877 |                 "total": {"value": 100},
 878 |                 "hits": [
 879 |                     {
 880 |                         "_source": {
 881 |                             "flake_name": "home-manager",
 882 |                             "flake_description": "Home Manager for Nix",
 883 |                             "flake_resolved": {
 884 |                                 "type": "github",
 885 |                                 "owner": "nix-community",
 886 |                                 "repo": "home-manager",
 887 |                             },
 888 |                             "package_pname": "home-manager",
 889 |                             "package_attr_name": "default",
 890 |                         }
 891 |                     }
 892 |                 ],
 893 |             }
 894 |         }
 895 |         mock_post.return_value = mock_response
 896 | 
 897 |         result = await nixos_flakes_search("", limit=10)
 898 | 
 899 |         assert "Found 100 total matches" in result
 900 |         assert "home-manager" in result
 901 |         assert "nix-community/home-manager" in result
 902 |         assert "Home Manager for Nix" in result
 903 | 
 904 |         # Verify the query structure
 905 |         call_args = mock_post.call_args
 906 |         query_data = call_args[1]["json"]["query"]
 907 |         # Should have a bool query with filter and must
 908 |         assert "bool" in query_data
 909 |         assert "filter" in query_data["bool"]
 910 |         assert "must" in query_data["bool"]
 911 | 
 912 |     @pytest.mark.asyncio
 913 |     @patch("mcp_nixos.server.requests.post")
 914 |     async def test_flakes_search_with_query(self, mock_post):
 915 |         """Test flake search with specific query."""
 916 |         # Mock response
 917 |         mock_response = Mock()
 918 |         mock_response.status_code = 200
 919 |         mock_response.json.return_value = {
 920 |             "hits": {
 921 |                 "total": {"value": 5},
 922 |                 "hits": [
 923 |                     {
 924 |                         "_source": {
 925 |                             "flake_name": "devenv",
 926 |                             "flake_description": "Fast, Declarative, Reproducible Developer Environments",
 927 |                             "flake_resolved": {
 928 |                                 "type": "github",
 929 |                                 "owner": "cachix",
 930 |                                 "repo": "devenv",
 931 |                             },
 932 |                             "package_pname": "devenv",
 933 |                             "package_attr_name": "default",
 934 |                         }
 935 |                     }
 936 |                 ],
 937 |             }
 938 |         }
 939 |         mock_post.return_value = mock_response
 940 | 
 941 |         result = await nixos_flakes_search("devenv", limit=10)
 942 | 
 943 |         assert "Found 5" in result
 944 |         assert "devenv" in result
 945 |         assert "cachix/devenv" in result
 946 |         assert "Fast, Declarative" in result
 947 | 
 948 |         # Verify the query structure has filter and inner bool
 949 |         call_args = mock_post.call_args
 950 |         query_data = call_args[1]["json"]["query"]
 951 |         assert "bool" in query_data
 952 |         assert "filter" in query_data["bool"]
 953 |         assert "must" in query_data["bool"]
 954 |         # The actual search query is inside must
 955 |         inner_query = query_data["bool"]["must"][0]
 956 |         assert "bool" in inner_query
 957 |         assert "should" in inner_query["bool"]
 958 | 
 959 |     @pytest.mark.asyncio
 960 |     @patch("mcp_nixos.server.requests.post")
 961 |     async def test_flakes_search_no_results(self, mock_post):
 962 |         """Test flake search with no results."""
 963 |         # Mock response
 964 |         mock_response = Mock()
 965 |         mock_response.status_code = 200
 966 |         mock_response.json.return_value = {"hits": {"total": {"value": 0}, "hits": []}}
 967 |         mock_post.return_value = mock_response
 968 | 
 969 |         result = await nixos_flakes_search("nonexistent", limit=10)
 970 | 
 971 |         assert "No flakes found matching 'nonexistent'" in result
 972 |         assert "Try searching for:" in result
 973 |         assert "Popular flakes:" in result
 974 | 
 975 |     @pytest.mark.asyncio
 976 |     @patch("mcp_nixos.server.requests.post")
 977 |     async def test_flakes_search_deduplication(self, mock_post):
 978 |         """Test flake search properly deduplicates flakes."""
 979 |         # Mock response with duplicate flakes
 980 |         mock_response = Mock()
 981 |         mock_response.status_code = 200
 982 |         mock_response.json.return_value = {
 983 |             "hits": {
 984 |                 "total": {"value": 4},
 985 |                 "hits": [
 986 |                     {
 987 |                         "_source": {
 988 |                             "flake_name": "nixpkgs",
 989 |                             "flake_resolved": {"type": "github", "owner": "NixOS", "repo": "nixpkgs"},
 990 |                             "package_pname": "hello",
 991 |                             "package_attr_name": "hello",
 992 |                         }
 993 |                     },
 994 |                     {
 995 |                         "_source": {
 996 |                             "flake_name": "nixpkgs",
 997 |                             "flake_resolved": {"type": "github", "owner": "NixOS", "repo": "nixpkgs"},
 998 |                             "package_pname": "git",
 999 |                             "package_attr_name": "git",
1000 |                         }
1001 |                     },
1002 |                 ],
1003 |             }
1004 |         }
1005 |         mock_post.return_value = mock_response
1006 | 
1007 |         result = await nixos_flakes_search("nixpkgs", limit=10)
1008 | 
1009 |         # Should show 1 unique flake with 2 packages
1010 |         assert "Found 4 total matches (1 unique flakes)" in result
1011 |         assert "nixpkgs" in result
1012 |         assert "NixOS/nixpkgs" in result
1013 |         assert "Packages: git, hello" in result
1014 | 
1015 |     @pytest.mark.asyncio
1016 |     @patch("mcp_nixos.server.requests.post")
1017 |     async def test_flakes_stats(self, mock_post):
1018 |         """Test flake statistics."""
1019 |         # Mock responses
1020 |         mock_count_response = Mock()
1021 |         mock_count_response.status_code = 200
1022 |         mock_count_response.json.return_value = {"count": 452176}
1023 | 
1024 |         # Mock search response for sampling
1025 |         mock_search_response = Mock()
1026 |         mock_search_response.status_code = 200
1027 |         mock_search_response.json.return_value = {
1028 |             "hits": {
1029 |                 "hits": [
1030 |                     {
1031 |                         "_source": {
1032 |                             "flake_resolved": {
1033 |                                 "url": "https://github.com/nix-community/home-manager",
1034 |                                 "type": "github",
1035 |                             },
1036 |                             "package_pname": "home-manager",
1037 |                         }
1038 |                     },
1039 |                     {
1040 |                         "_source": {
1041 |                             "flake_resolved": {"url": "https://github.com/NixOS/nixpkgs", "type": "github"},
1042 |                             "package_pname": "hello",
1043 |                         }
1044 |                     },
1045 |                 ]
1046 |             }
1047 |         }
1048 | 
1049 |         mock_post.side_effect = [mock_count_response, mock_search_response]
1050 | 
1051 |         result = await nixos_flakes_stats()
1052 | 
1053 |         assert "Available flakes: 452,176" in result
1054 |         # Stats now samples documents, not using aggregations
1055 |         # So we won't see the mocked aggregation values
1056 | 
1057 |     @pytest.mark.asyncio
1058 |     @patch("mcp_nixos.server.requests.post")
1059 |     async def test_flakes_search_error_handling(self, mock_post):
1060 |         """Test flake search error handling."""
1061 |         # Mock 404 response with HTTPError
1062 |         from requests import HTTPError
1063 | 
1064 |         mock_response = Mock()
1065 |         mock_response.status_code = 404
1066 |         error = HTTPError()
1067 |         error.response = mock_response
1068 |         mock_response.raise_for_status.side_effect = error
1069 |         mock_post.return_value = mock_response
1070 | 
1071 |         result = await nixos_flakes_search("test", limit=10)
1072 | 
1073 |         assert "Error" in result
1074 |         assert "Flake indices not found" in result
1075 | 
1076 | 
1077 | # ===== Content from test_flakes_stats_eval.py =====
1078 | class TestFlakesStatsEval:
1079 |     """Test evaluations for flakes statistics and counting."""
1080 | 
1081 |     @pytest.mark.asyncio
1082 |     @patch("mcp_nixos.server.requests.post")
1083 |     async def test_get_total_flakes_count(self, mock_post):
1084 |         """Eval: User asks 'how many flakes are there?'"""
1085 | 
1086 |         # Mock flakes stats responses
1087 |         def side_effect(*args, **kwargs):
1088 |             url = args[0]
1089 |             if "/_count" in url:
1090 |                 # Count request
1091 |                 mock_response = Mock()
1092 |                 mock_response.status_code = 200
1093 |                 mock_response.json.return_value = {"count": 4500}
1094 |                 return mock_response
1095 |             # Regular search request
1096 |             # Search request to get sample documents
1097 |             mock_response = Mock()
1098 |             mock_response.status_code = 200
1099 |             mock_response.json.return_value = {
1100 |                 "hits": {
1101 |                     "total": {"value": 4500},
1102 |                     "hits": [
1103 |                         {
1104 |                             "_source": {
1105 |                                 "flake_resolved": {"url": "https://github.com/NixOS/nixpkgs", "type": "github"},
1106 |                                 "package_pname": "hello",
1107 |                             }
1108 |                         },
1109 |                         {
1110 |                             "_source": {
1111 |                                 "flake_resolved": {
1112 |                                     "url": "https://github.com/nix-community/home-manager",
1113 |                                     "type": "github",
1114 |                                 },
1115 |                                 "package_pname": "home-manager",
1116 |                             }
1117 |                         },
1118 |                     ]
1119 |                     * 10,  # Simulate more hits
1120 |                 }
1121 |             }
1122 |             return mock_response
1123 | 
1124 |         mock_post.side_effect = side_effect
1125 | 
1126 |         # Get flakes stats
1127 |         result = await nixos_flakes_stats()
1128 | 
1129 |         # Should show available flakes count (formatted with comma)
1130 |         assert "Available flakes:" in result
1131 |         assert "4,500" in result  # Matches our mock data
1132 | 
1133 |         # Should show unique repositories count
1134 |         assert "Unique repositories:" in result
1135 |         # The actual count depends on unique URLs in mock data
1136 | 
1137 |         # Should show breakdown by type
1138 |         assert "Flake types:" in result
1139 |         assert "github:" in result  # Our mock data only has github type
1140 | 
1141 |         # Should show top contributors
1142 |         assert "Top contributors:" in result
1143 |         assert "NixOS:" in result
1144 |         assert "nix-community:" in result
1145 | 
1146 |     @pytest.mark.asyncio
1147 |     @patch("mcp_nixos.server.requests.post")
1148 |     async def test_flakes_search_shows_total_count(self, mock_post):
1149 |         """Eval: Flakes search should show total matching flakes."""
1150 |         # Mock search response with multiple hits
1151 |         mock_response = Mock()
1152 |         mock_response.status_code = 200
1153 |         mock_response.json.return_value = {
1154 |             "hits": {
1155 |                 "total": {"value": 156},
1156 |                 "hits": [
1157 |                     {
1158 |                         "_source": {
1159 |                             "flake_name": "nixpkgs",
1160 |                             "flake_description": "Nix Packages collection",
1161 |                             "flake_resolved": {
1162 |                                 "owner": "NixOS",
1163 |                                 "repo": "nixpkgs",
1164 |                             },
1165 |                             "package_attr_name": "packages.x86_64-linux.hello",
1166 |                         }
1167 |                     },
1168 |                     {
1169 |                         "_source": {
1170 |                             "flake_name": "nixpkgs",
1171 |                             "flake_description": "Nix Packages collection",
1172 |                             "flake_resolved": {
1173 |                                 "owner": "NixOS",
1174 |                                 "repo": "nixpkgs",
1175 |                             },
1176 |                             "package_attr_name": "packages.x86_64-linux.git",
1177 |                         }
1178 |                     },
1179 |                 ],
1180 |             }
1181 |         }
1182 |         mock_post.return_value = mock_response
1183 | 
1184 |         # Search for nix
1185 |         result = await nixos_flakes_search("nix", limit=2)
1186 | 
1187 |         # Should show both total matches and unique flakes count
1188 |         assert "total matches" in result
1189 |         assert "unique flakes" in result
1190 |         assert "nixpkgs" in result
1191 | 
1192 |     @pytest.mark.asyncio
1193 |     @patch("mcp_nixos.server.requests.post")
1194 |     async def test_flakes_wildcard_search_shows_all(self, mock_post):
1195 |         """Eval: User searches with '*' to see all flakes."""
1196 |         # Mock response with many flakes
1197 |         mock_response = Mock()
1198 |         mock_response.status_code = 200
1199 |         mock_response.json.return_value = {
1200 |             "hits": {
1201 |                 "total": {"value": 4500},
1202 |                 "hits": [
1203 |                     {
1204 |                         "_source": {
1205 |                             "flake_name": "devenv",
1206 |                             "flake_description": "Development environments",
1207 |                             "flake_resolved": {"owner": "cachix", "repo": "devenv"},
1208 |                             "package_attr_name": "packages.x86_64-linux.devenv",
1209 |                         }
1210 |                     },
1211 |                     {
1212 |                         "_source": {
1213 |                             "flake_name": "home-manager",
1214 |                             "flake_description": "Manage user configuration",
1215 |                             "flake_resolved": {"owner": "nix-community", "repo": "home-manager"},
1216 |                             "package_attr_name": "packages.x86_64-linux.home-manager",
1217 |                         }
1218 |                     },
1219 |                     {
1220 |                         "_source": {
1221 |                             "flake_name": "",
1222 |                             "flake_description": "Flake utilities",
1223 |                             "flake_resolved": {"owner": "numtide", "repo": "flake-utils"},
1224 |                             "package_attr_name": "lib.eachDefaultSystem",
1225 |                         }
1226 |                     },
1227 |                 ],
1228 |             }
1229 |         }
1230 |         mock_post.return_value = mock_response
1231 | 
1232 |         # Wildcard search
1233 |         result = await nixos_flakes_search("*", limit=10)
1234 | 
1235 |         # Should show total count
1236 |         assert "total matches" in result
1237 | 
1238 |         # Should list some flakes
1239 |         assert "devenv" in result
1240 |         assert "home-manager" in result
1241 | 
1242 |     @pytest.mark.asyncio
1243 |     @patch("mcp_nixos.server.requests.post")
1244 |     async def test_flakes_stats_with_no_flakes(self, mock_post):
1245 |         """Eval: Flakes stats when no flakes are indexed."""
1246 | 
1247 |         # Mock empty response
1248 |         def side_effect(*args, **kwargs):
1249 |             url = args[0]
1250 |             mock_response = Mock()
1251 |             mock_response.status_code = 200
1252 | 
1253 |             if "/_count" in url:
1254 |                 # Count request
1255 |                 mock_response.json.return_value = {"count": 0}
1256 |             else:
1257 |                 # Search with aggregations
1258 |                 mock_response.json.return_value = {
1259 |                     "hits": {"total": {"value": 0}},
1260 |                     "aggregations": {
1261 |                         "unique_flakes": {"value": 0},
1262 |                         "flake_types": {"buckets": []},
1263 |                         "top_owners": {"buckets": []},
1264 |                     },
1265 |                 }
1266 |             return mock_response
1267 | 
1268 |         mock_post.side_effect = side_effect
1269 | 
1270 |         result = await nixos_flakes_stats()
1271 | 
1272 |         # Should handle empty case gracefully
1273 |         assert "Available flakes: 0" in result
1274 | 
1275 |     @pytest.mark.asyncio
1276 |     @patch("mcp_nixos.server.requests.post")
1277 |     async def test_flakes_stats_error_handling(self, mock_post):
1278 |         """Eval: Flakes stats handles API errors gracefully."""
1279 |         # Mock 404 error
1280 |         mock_response = Mock()
1281 |         mock_response.status_code = 404
1282 |         mock_response.raise_for_status.side_effect = Exception("Not found")
1283 |         mock_post.return_value = mock_response
1284 | 
1285 |         result = await nixos_flakes_stats()
1286 | 
1287 |         # Should return error message
1288 |         assert "Error" in result
1289 |         assert "Flake indices not found" in result or "Not found" in result
1290 | 
1291 |     @pytest.mark.asyncio
1292 |     @patch("mcp_nixos.server.requests.post")
1293 |     async def test_compare_flakes_vs_packages(self, mock_post):
1294 |         """Eval: User wants to understand flakes vs packages relationship."""
1295 |         # First call: flakes stats
1296 |         mock_flakes_response = Mock()
1297 |         mock_flakes_response.status_code = 200
1298 |         mock_flakes_response.json.return_value = {
1299 |             "hits": {"total": {"value": 4500}},
1300 |             "aggregations": {
1301 |                 "unique_flakes": {"value": 894},
1302 |                 "flake_types": {
1303 |                     "buckets": [
1304 |                         {"key": "github", "doc_count": 3800},
1305 |                     ]
1306 |                 },
1307 |                 "top_contributors": {
1308 |                     "buckets": [
1309 |                         {"key": "NixOS", "doc_count": 450},
1310 |                     ]
1311 |                 },
1312 |             },
1313 |         }
1314 | 
1315 |         # Second call: regular packages stats (for comparison)
1316 |         mock_packages_response = Mock()
1317 |         mock_packages_response.json.return_value = {
1318 |             "aggregations": {
1319 |                 "attr_count": {"value": 151798},
1320 |                 "option_count": {"value": 20156},
1321 |                 "program_count": {"value": 3421},
1322 |                 "license_count": {"value": 125},
1323 |                 "maintainer_count": {"value": 3254},
1324 |                 "platform_counts": {"buckets": []},
1325 |             }
1326 |         }
1327 | 
1328 |         def side_effect(*args, **kwargs):
1329 |             url = args[0]
1330 |             if "latest-43-group-manual" in url:
1331 |                 if "/_count" in url:
1332 |                     # Count request
1333 |                     mock_response = Mock()
1334 |                     mock_response.status_code = 200
1335 |                     mock_response.json.return_value = {"count": 4500}
1336 |                     return mock_response
1337 |                 # Search request - return sample hits
1338 |                 mock_response = Mock()
1339 |                 mock_response.status_code = 200
1340 |                 mock_response.json.return_value = {
1341 |                     "hits": {
1342 |                         "hits": [
1343 |                             {
1344 |                                 "_source": {
1345 |                                     "flake_resolved": {"url": "https://github.com/NixOS/nixpkgs", "type": "github"}
1346 |                                 }
1347 |                             }
1348 |                         ]
1349 |                         * 5
1350 |                     }
1351 |                 }
1352 |                 return mock_response
1353 |             return mock_packages_response
1354 | 
1355 |         mock_post.side_effect = side_effect
1356 | 
1357 |         # Get flakes stats
1358 |         flakes_result = await nixos_flakes_stats()
1359 |         assert "Available flakes:" in flakes_result
1360 |         assert "4,500" in flakes_result  # From our mock
1361 | 
1362 |         # Should also show unique repositories
1363 |         assert "Unique repositories:" in flakes_result
1364 | 
```

--------------------------------------------------------------------------------
/mcp_nixos/server.py:
--------------------------------------------------------------------------------

```python
   1 | #!/usr/bin/env python3
   2 | """MCP-NixOS Server - Model Context Protocol tools for NixOS, Home Manager, and nix-darwin.
   3 | 
   4 | Provides search and query capabilities for:
   5 | - NixOS packages, options, and programs via Elasticsearch API
   6 | - Home Manager configuration options via HTML documentation parsing
   7 | - nix-darwin (macOS) configuration options via HTML documentation parsing
   8 | 
   9 | All responses are formatted as human-readable plain text for optimal LLM interaction.
  10 | """
  11 | 
  12 | import re
  13 | from typing import Any
  14 | 
  15 | import requests
  16 | from bs4 import BeautifulSoup
  17 | from fastmcp import FastMCP
  18 | 
  19 | 
  20 | class APIError(Exception):
  21 |     """Custom exception for API-related errors."""
  22 | 
  23 | 
  24 | class DocumentParseError(Exception):
  25 |     """Custom exception for document parsing errors."""
  26 | 
  27 | 
  28 | mcp = FastMCP("mcp-nixos")
  29 | 
  30 | # API Configuration
  31 | NIXOS_API = "https://search.nixos.org/backend"
  32 | NIXOS_AUTH = ("aWVSALXpZv", "X8gPHnzL52wFEekuxsfQ9cSh")
  33 | 
  34 | # Base channel patterns - these are dynamic and auto-discovered
  35 | BASE_CHANNELS = {
  36 |     "unstable": "nixos-unstable",
  37 |     "24.11": "nixos-24.11",
  38 |     "25.05": "nixos-25.05",
  39 | }
  40 | 
  41 | # Fallback channels when API discovery fails
  42 | # These are static mappings based on most recent known patterns
  43 | FALLBACK_CHANNELS = {
  44 |     "unstable": "latest-44-nixos-unstable",
  45 |     "stable": "latest-44-nixos-25.05",
  46 |     "25.05": "latest-44-nixos-25.05",
  47 |     "25.11": "latest-44-nixos-25.11",  # For when 25.11 is released
  48 |     "beta": "latest-44-nixos-25.05",
  49 | }
  50 | 
  51 | HOME_MANAGER_URL = "https://nix-community.github.io/home-manager/options.xhtml"
  52 | DARWIN_URL = "https://nix-darwin.github.io/nix-darwin/manual/index.html"
  53 | 
  54 | 
  55 | class ChannelCache:
  56 |     """Cache for discovered channels and resolved mappings."""
  57 | 
  58 |     def __init__(self) -> None:
  59 |         """Initialize empty cache."""
  60 |         self.available_channels: dict[str, str] | None = None
  61 |         self.resolved_channels: dict[str, str] | None = None
  62 |         self.using_fallback: bool = False
  63 | 
  64 |     def get_available(self) -> dict[str, str]:
  65 |         """Get available channels, discovering if needed."""
  66 |         if self.available_channels is None:
  67 |             self.available_channels = self._discover_available_channels()
  68 |         return self.available_channels if self.available_channels is not None else {}
  69 | 
  70 |     def get_resolved(self) -> dict[str, str]:
  71 |         """Get resolved channel mappings, resolving if needed."""
  72 |         if self.resolved_channels is None:
  73 |             self.resolved_channels = self._resolve_channels()
  74 |         return self.resolved_channels if self.resolved_channels is not None else {}
  75 | 
  76 |     def _discover_available_channels(self) -> dict[str, str]:
  77 |         """Discover available NixOS channels by testing API patterns."""
  78 |         # Test multiple generation patterns (43, 44, 45) and versions
  79 |         generations = [43, 44, 45, 46]  # Future-proof
  80 |         # Removed deprecated versions (20.09, 24.11 - EOL June 2025)
  81 |         versions = ["unstable", "25.05", "25.11", "26.05", "30.05"]  # Current and future
  82 | 
  83 |         available = {}
  84 |         for gen in generations:
  85 |             for version in versions:
  86 |                 pattern = f"latest-{gen}-nixos-{version}"
  87 |                 try:
  88 |                     resp = requests.post(
  89 |                         f"{NIXOS_API}/{pattern}/_count",
  90 |                         json={"query": {"match_all": {}}},
  91 |                         auth=NIXOS_AUTH,
  92 |                         timeout=10,  # Increased from 5s to 10s for slow connections
  93 |                     )
  94 |                     if resp.status_code == 200:
  95 |                         count = resp.json().get("count", 0)
  96 |                         if count > 0:
  97 |                             available[pattern] = f"{count:,} documents"
  98 |                 except Exception:
  99 |                     continue
 100 | 
 101 |         return available
 102 | 
 103 |     def _resolve_channels(self) -> dict[str, str]:
 104 |         """Resolve user-friendly channel names to actual indices."""
 105 |         available = self.get_available()
 106 | 
 107 |         # If no channels were discovered, use fallback channels
 108 |         if not available:
 109 |             self.using_fallback = True
 110 |             return FALLBACK_CHANNELS.copy()
 111 | 
 112 |         resolved = {}
 113 | 
 114 |         # Find unstable (should be consistent)
 115 |         unstable_pattern = None
 116 |         for pattern in available:
 117 |             if "unstable" in pattern:
 118 |                 unstable_pattern = pattern
 119 |                 break
 120 | 
 121 |         if unstable_pattern:
 122 |             resolved["unstable"] = unstable_pattern
 123 | 
 124 |         # Find stable release (highest version number with most documents)
 125 |         stable_candidates = []
 126 |         for pattern, count_str in available.items():
 127 |             if "unstable" not in pattern:
 128 |                 # Extract version (e.g., "25.05" from "latest-43-nixos-25.05")
 129 |                 parts = pattern.split("-")
 130 |                 if len(parts) >= 4:
 131 |                     version = parts[3]  # "25.05"
 132 |                     try:
 133 |                         # Parse version for comparison (25.05 -> 25.05)
 134 |                         major, minor = map(int, version.split("."))
 135 |                         count = int(count_str.replace(",", "").replace(" documents", ""))
 136 |                         stable_candidates.append((major, minor, version, pattern, count))
 137 |                     except (ValueError, IndexError):
 138 |                         continue
 139 | 
 140 |         if stable_candidates:
 141 |             # Sort by version (descending), then by document count (descending) as tiebreaker
 142 |             stable_candidates.sort(key=lambda x: (x[0], x[1], x[4]), reverse=True)
 143 |             current_stable = stable_candidates[0]
 144 | 
 145 |             resolved["stable"] = current_stable[3]  # pattern
 146 |             resolved[current_stable[2]] = current_stable[3]  # version -> pattern
 147 | 
 148 |             # Add other version mappings (prefer higher generation/count for same version)
 149 |             version_patterns: dict[str, tuple[str, int]] = {}
 150 |             for _major, _minor, version, pattern, count in stable_candidates:
 151 |                 if version not in version_patterns or count > version_patterns[version][1]:
 152 |                     version_patterns[version] = (pattern, count)
 153 | 
 154 |             for version, (pattern, _count) in version_patterns.items():
 155 |                 resolved[version] = pattern
 156 | 
 157 |         # Add beta (alias for stable)
 158 |         if "stable" in resolved:
 159 |             resolved["beta"] = resolved["stable"]
 160 | 
 161 |         # If we still have no channels after all that, use fallback
 162 |         if not resolved:
 163 |             self.using_fallback = True
 164 |             return FALLBACK_CHANNELS.copy()
 165 | 
 166 |         return resolved
 167 | 
 168 | 
 169 | # Create a single instance of the cache
 170 | channel_cache = ChannelCache()
 171 | 
 172 | 
 173 | def error(msg: str, code: str = "ERROR") -> str:
 174 |     """Format error as plain text."""
 175 |     # Ensure msg is always a string, even if empty
 176 |     msg = str(msg) if msg is not None else ""
 177 |     return f"Error ({code}): {msg}"
 178 | 
 179 | 
 180 | def get_channels() -> dict[str, str]:
 181 |     """Get current channel mappings (cached and resolved)."""
 182 |     return channel_cache.get_resolved()
 183 | 
 184 | 
 185 | def validate_channel(channel: str) -> bool:
 186 |     """Validate if a channel exists and is accessible."""
 187 |     channels = get_channels()
 188 |     if channel in channels:
 189 |         index = channels[channel]
 190 |         try:
 191 |             resp = requests.post(
 192 |                 f"{NIXOS_API}/{index}/_count", json={"query": {"match_all": {}}}, auth=NIXOS_AUTH, timeout=5
 193 |             )
 194 |             return resp.status_code == 200 and resp.json().get("count", 0) > 0
 195 |         except Exception:
 196 |             return False
 197 |     return False
 198 | 
 199 | 
 200 | def get_channel_suggestions(invalid_channel: str) -> str:
 201 |     """Get helpful suggestions for invalid channels."""
 202 |     channels = get_channels()
 203 |     available = list(channels.keys())
 204 |     suggestions = []
 205 | 
 206 |     # Find similar channel names
 207 |     invalid_lower = invalid_channel.lower()
 208 |     for channel in available:
 209 |         if invalid_lower in channel.lower() or channel.lower() in invalid_lower:
 210 |             suggestions.append(channel)
 211 | 
 212 |     if not suggestions:
 213 |         # Fallback to most common channels
 214 |         common = ["unstable", "stable", "beta"]
 215 |         # Also include version numbers
 216 |         version_channels = [ch for ch in available if "." in ch and ch.replace(".", "").isdigit()]
 217 |         common.extend(version_channels[:2])  # Add up to 2 version channels
 218 |         suggestions = [ch for ch in common if ch in available]
 219 |         if not suggestions:
 220 |             suggestions = available[:4]  # First 4 available
 221 | 
 222 |     return f"Available channels: {', '.join(suggestions)}"
 223 | 
 224 | 
 225 | def es_query(index: str, query: dict[str, Any], size: int = 20) -> list[dict[str, Any]]:
 226 |     """Execute Elasticsearch query."""
 227 |     try:
 228 |         resp = requests.post(
 229 |             f"{NIXOS_API}/{index}/_search", json={"query": query, "size": size}, auth=NIXOS_AUTH, timeout=10
 230 |         )
 231 |         resp.raise_for_status()
 232 |         data = resp.json()
 233 |         # Handle malformed responses gracefully
 234 |         if isinstance(data, dict) and "hits" in data:
 235 |             hits = data.get("hits", {})
 236 |             if isinstance(hits, dict) and "hits" in hits:
 237 |                 return list(hits.get("hits", []))
 238 |         return []
 239 |     except requests.Timeout as exc:
 240 |         raise APIError("API error: Connection timed out") from exc
 241 |     except requests.HTTPError as exc:
 242 |         raise APIError(f"API error: {str(exc)}") from exc
 243 |     except Exception as exc:
 244 |         raise APIError(f"API error: {str(exc)}") from exc
 245 | 
 246 | 
 247 | def parse_html_options(url: str, query: str = "", prefix: str = "", limit: int = 100) -> list[dict[str, str]]:
 248 |     """Parse options from HTML documentation."""
 249 |     try:
 250 |         resp = requests.get(url, timeout=30)  # Increase timeout for large docs
 251 |         resp.raise_for_status()
 252 |         # Use resp.content to let BeautifulSoup handle encoding detection
 253 |         # This prevents encoding errors like "unknown encoding: windows-1252"
 254 |         soup = BeautifulSoup(resp.content, "html.parser")
 255 |         options = []
 256 | 
 257 |         # Get all dt elements
 258 |         dts = soup.find_all("dt")
 259 | 
 260 |         for dt in dts:
 261 |             # Get option name
 262 |             name = ""
 263 |             if "home-manager" in url:
 264 |                 # Home Manager uses anchor IDs like "opt-programs.git.enable"
 265 |                 anchor = dt.find("a", id=True)
 266 |                 if anchor:
 267 |                     anchor_id = anchor.get("id", "")
 268 |                     # Remove "opt-" prefix and convert underscores
 269 |                     if anchor_id.startswith("opt-"):
 270 |                         name = anchor_id[4:]  # Remove "opt-" prefix
 271 |                         # Convert _name_ placeholders back to <name>
 272 |                         name = name.replace("_name_", "<name>")
 273 |                 else:
 274 |                     # Fallback to text content
 275 |                     name_elem = dt.find(string=True, recursive=False)
 276 |                     if name_elem:
 277 |                         name = name_elem.strip()
 278 |                     else:
 279 |                         name = dt.get_text(strip=True)
 280 |             else:
 281 |                 # Darwin and fallback - use text content
 282 |                 name = dt.get_text(strip=True)
 283 | 
 284 |             # Skip if it doesn't look like an option (must contain a dot)
 285 |             # But allow single-word options in some cases
 286 |             if "." not in name and len(name.split()) > 1:
 287 |                 continue
 288 | 
 289 |             # Filter by query or prefix
 290 |             if query and query.lower() not in name.lower():
 291 |                 continue
 292 |             if prefix and not (name.startswith(prefix + ".") or name == prefix):
 293 |                 continue
 294 | 
 295 |             # Find the corresponding dd element
 296 |             dd = dt.find_next_sibling("dd")
 297 |             if dd:
 298 |                 # Extract description (first p tag or direct text)
 299 |                 desc_elem = dd.find("p")
 300 |                 if desc_elem:
 301 |                     description = desc_elem.get_text(strip=True)
 302 |                 else:
 303 |                     # Get first text node, handle None case
 304 |                     text = dd.get_text(strip=True)
 305 |                     description = text.split("\n")[0] if text else ""
 306 | 
 307 |                 # Extract type info - look for various patterns
 308 |                 type_info = ""
 309 |                 # Pattern 1: <span class="term">Type: ...</span>
 310 |                 type_elem = dd.find("span", class_="term")
 311 |                 if type_elem and "Type:" in type_elem.get_text():
 312 |                     type_info = type_elem.get_text(strip=True).replace("Type:", "").strip()
 313 |                 # Pattern 2: Look for "Type:" in text
 314 |                 elif "Type:" in dd.get_text():
 315 |                     text = dd.get_text()
 316 |                     type_start = text.find("Type:") + 5
 317 |                     type_end = text.find("\n", type_start)
 318 |                     if type_end == -1:
 319 |                         type_end = len(text)
 320 |                     type_info = text[type_start:type_end].strip()
 321 | 
 322 |                 options.append(
 323 |                     {
 324 |                         "name": name,
 325 |                         "description": description[:200] if len(description) > 200 else description,
 326 |                         "type": type_info,
 327 |                     }
 328 |                 )
 329 | 
 330 |                 if len(options) >= limit:
 331 |                     break
 332 | 
 333 |         return options
 334 |     except Exception as exc:
 335 |         raise DocumentParseError(f"Failed to fetch docs: {str(exc)}") from exc
 336 | 
 337 | 
 338 | @mcp.tool()
 339 | async def nixos_search(query: str, search_type: str = "packages", limit: int = 20, channel: str = "unstable") -> str:
 340 |     """Search NixOS packages, options, or programs.
 341 | 
 342 |     Args:
 343 |         query: Search term to look for
 344 |         search_type: Type of search - "packages", "options", "programs", or "flakes"
 345 |         limit: Maximum number of results to return (1-100)
 346 |         channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")
 347 | 
 348 |     Returns:
 349 |         Plain text results with bullet points or error message
 350 |     """
 351 |     if search_type not in ["packages", "options", "programs", "flakes"]:
 352 |         return error(f"Invalid type '{search_type}'")
 353 |     channels = get_channels()
 354 |     if channel not in channels:
 355 |         suggestions = get_channel_suggestions(channel)
 356 |         return error(f"Invalid channel '{channel}'. {suggestions}")
 357 |     if not 1 <= limit <= 100:
 358 |         return error("Limit must be 1-100")
 359 | 
 360 |     # Redirect flakes to dedicated function
 361 |     if search_type == "flakes":
 362 |         return await _nixos_flakes_search_impl(query, limit)
 363 | 
 364 |     try:
 365 |         # Build query with correct field names
 366 |         if search_type == "packages":
 367 |             q = {
 368 |                 "bool": {
 369 |                     "must": [{"term": {"type": "package"}}],
 370 |                     "should": [
 371 |                         {"match": {"package_pname": {"query": query, "boost": 3}}},
 372 |                         {"match": {"package_description": query}},
 373 |                     ],
 374 |                     "minimum_should_match": 1,
 375 |                 }
 376 |             }
 377 |         elif search_type == "options":
 378 |             # Use wildcard for option names to handle hierarchical names like services.nginx.enable
 379 |             q = {
 380 |                 "bool": {
 381 |                     "must": [{"term": {"type": "option"}}],
 382 |                     "should": [
 383 |                         {"wildcard": {"option_name": f"*{query}*"}},
 384 |                         {"match": {"option_description": query}},
 385 |                     ],
 386 |                     "minimum_should_match": 1,
 387 |                 }
 388 |             }
 389 |         else:  # programs
 390 |             q = {
 391 |                 "bool": {
 392 |                     "must": [{"term": {"type": "package"}}],
 393 |                     "should": [
 394 |                         {"match": {"package_programs": {"query": query, "boost": 2}}},
 395 |                         {"match": {"package_pname": query}},
 396 |                     ],
 397 |                     "minimum_should_match": 1,
 398 |                 }
 399 |             }
 400 | 
 401 |         hits = es_query(channels[channel], q, limit)
 402 | 
 403 |         # Format results as plain text
 404 |         if not hits:
 405 |             return f"No {search_type} found matching '{query}'"
 406 | 
 407 |         results = []
 408 |         results.append(f"Found {len(hits)} {search_type} matching '{query}':\n")
 409 | 
 410 |         for hit in hits:
 411 |             src = hit.get("_source", {})
 412 |             if search_type == "packages":
 413 |                 name = src.get("package_pname", "")
 414 |                 version = src.get("package_pversion", "")
 415 |                 desc = src.get("package_description", "")
 416 |                 results.append(f"• {name} ({version})")
 417 |                 if desc:
 418 |                     results.append(f"  {desc}")
 419 |                 results.append("")
 420 |             elif search_type == "options":
 421 |                 name = src.get("option_name", "")
 422 |                 opt_type = src.get("option_type", "")
 423 |                 desc = src.get("option_description", "")
 424 |                 # Strip HTML tags from description
 425 |                 if desc and "<rendered-html>" in desc:
 426 |                     # Remove outer rendered-html tags
 427 |                     desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "")
 428 |                     # Remove common HTML tags
 429 |                     desc = re.sub(r"<[^>]+>", "", desc)
 430 |                     desc = desc.strip()
 431 |                 results.append(f"• {name}")
 432 |                 if opt_type:
 433 |                     results.append(f"  Type: {opt_type}")
 434 |                 if desc:
 435 |                     results.append(f"  {desc}")
 436 |                 results.append("")
 437 |             else:  # programs
 438 |                 programs = src.get("package_programs", [])
 439 |                 pkg_name = src.get("package_pname", "")
 440 | 
 441 |                 # Check if query matches any program exactly (case-insensitive)
 442 |                 query_lower = query.lower()
 443 |                 matched_programs = [p for p in programs if p.lower() == query_lower]
 444 | 
 445 |                 for prog in matched_programs:
 446 |                     results.append(f"• {prog} (provided by {pkg_name})")
 447 |                     results.append("")
 448 | 
 449 |         return "\n".join(results).strip()
 450 | 
 451 |     except Exception as e:
 452 |         return error(str(e))
 453 | 
 454 | 
 455 | @mcp.tool()
 456 | async def nixos_info(name: str, type: str = "package", channel: str = "unstable") -> str:  # pylint: disable=redefined-builtin
 457 |     """Get detailed info about a NixOS package or option.
 458 | 
 459 |     Args:
 460 |         name: Name of the package or option to look up
 461 |         type: Type of lookup - "package" or "option"
 462 |         channel: NixOS channel to search in (e.g., "unstable", "stable", "25.05")
 463 | 
 464 |     Returns:
 465 |         Plain text details about the package/option or error message
 466 |     """
 467 |     info_type = type  # Avoid shadowing built-in
 468 |     if info_type not in ["package", "option"]:
 469 |         return error("Type must be 'package' or 'option'")
 470 |     channels = get_channels()
 471 |     if channel not in channels:
 472 |         suggestions = get_channel_suggestions(channel)
 473 |         return error(f"Invalid channel '{channel}'. {suggestions}")
 474 | 
 475 |     try:
 476 |         # Exact match query with correct field names
 477 |         field = "package_pname" if info_type == "package" else "option_name"
 478 |         query = {"bool": {"must": [{"term": {"type": info_type}}, {"term": {field: name}}]}}
 479 |         hits = es_query(channels[channel], query, 1)
 480 | 
 481 |         if not hits:
 482 |             return error(f"{info_type.capitalize()} '{name}' not found", "NOT_FOUND")
 483 | 
 484 |         src = hits[0].get("_source", {})
 485 | 
 486 |         if info_type == "package":
 487 |             info = []
 488 |             info.append(f"Package: {src.get('package_pname', '')}")
 489 |             info.append(f"Version: {src.get('package_pversion', '')}")
 490 | 
 491 |             desc = src.get("package_description", "")
 492 |             if desc:
 493 |                 info.append(f"Description: {desc}")
 494 | 
 495 |             homepage = src.get("package_homepage", [])
 496 |             if homepage:
 497 |                 if isinstance(homepage, list):
 498 |                     homepage = homepage[0] if homepage else ""
 499 |                 info.append(f"Homepage: {homepage}")
 500 | 
 501 |             licenses = src.get("package_license_set", [])
 502 |             if licenses:
 503 |                 info.append(f"License: {', '.join(licenses)}")
 504 | 
 505 |             return "\n".join(info)
 506 | 
 507 |         # Option type
 508 |         info = []
 509 |         info.append(f"Option: {src.get('option_name', '')}")
 510 | 
 511 |         opt_type = src.get("option_type", "")
 512 |         if opt_type:
 513 |             info.append(f"Type: {opt_type}")
 514 | 
 515 |         desc = src.get("option_description", "")
 516 |         if desc:
 517 |             # Strip HTML tags from description
 518 |             if "<rendered-html>" in desc:
 519 |                 desc = desc.replace("<rendered-html>", "").replace("</rendered-html>", "")
 520 |                 desc = re.sub(r"<[^>]+>", "", desc)
 521 |                 desc = desc.strip()
 522 |             info.append(f"Description: {desc}")
 523 | 
 524 |         default = src.get("option_default", "")
 525 |         if default:
 526 |             info.append(f"Default: {default}")
 527 | 
 528 |         example = src.get("option_example", "")
 529 |         if example:
 530 |             info.append(f"Example: {example}")
 531 | 
 532 |         return "\n".join(info)
 533 | 
 534 |     except Exception as e:
 535 |         return error(str(e))
 536 | 
 537 | 
 538 | @mcp.tool()
 539 | async def nixos_channels() -> str:
 540 |     """List available NixOS channels with their status.
 541 | 
 542 |     Returns:
 543 |         Plain text list showing channel names, versions, and availability
 544 |     """
 545 |     try:
 546 |         # Get resolved channels and available raw data
 547 |         configured = get_channels()
 548 |         available = channel_cache.get_available()
 549 | 
 550 |         results = []
 551 | 
 552 |         # Show warning if using fallback channels
 553 |         if channel_cache.using_fallback:
 554 |             results.append("⚠️  WARNING: Using fallback channels (API discovery failed)")
 555 |             results.append("    Check network connectivity to search.nixos.org")
 556 |             results.append("")
 557 |             results.append("NixOS Channels (fallback mode):\n")
 558 |         else:
 559 |             results.append("NixOS Channels (auto-discovered):\n")
 560 | 
 561 |         # Show user-friendly channel names
 562 |         for name, index in sorted(configured.items()):
 563 |             status = "✓ Available" if index in available else "✗ Unavailable"
 564 |             doc_count = available.get(index, "Unknown")
 565 | 
 566 |             # Mark stable channel clearly
 567 |             label = f"• {name}"
 568 |             if name == "stable":
 569 |                 # Extract version from index
 570 |                 parts = index.split("-")
 571 |                 if len(parts) >= 4:
 572 |                     version = parts[3]
 573 |                     label = f"• {name} (current: {version})"
 574 | 
 575 |             results.append(f"{label} → {index}")
 576 |             if index in available:
 577 |                 results.append(f"  Status: {status} ({doc_count})")
 578 |             else:
 579 |                 if channel_cache.using_fallback:
 580 |                     results.append("  Status: Fallback (may not be current)")
 581 |                 else:
 582 |                     results.append(f"  Status: {status}")
 583 |             results.append("")
 584 | 
 585 |         # Show additional discovered channels not in our mapping
 586 |         if not channel_cache.using_fallback:
 587 |             discovered_only = set(available.keys()) - set(configured.values())
 588 |             if discovered_only:
 589 |                 results.append("Additional available channels:")
 590 |                 for index in sorted(discovered_only):
 591 |                     results.append(f"• {index} ({available[index]})")
 592 | 
 593 |         # Add deprecation warnings
 594 |         results.append("\nNote: Channels are dynamically discovered.")
 595 |         results.append("'stable' always points to the current stable release.")
 596 |         if channel_cache.using_fallback:
 597 |             results.append("\n⚠️  Fallback channels may not reflect the latest available versions.")
 598 |             results.append("   Please check your network connection to search.nixos.org.")
 599 | 
 600 |         return "\n".join(results).strip()
 601 |     except Exception as e:
 602 |         return error(str(e))
 603 | 
 604 | 
 605 | @mcp.tool()
 606 | async def nixos_stats(channel: str = "unstable") -> str:
 607 |     """Get NixOS statistics for a channel.
 608 | 
 609 |     Args:
 610 |         channel: NixOS channel to get stats for (e.g., "unstable", "stable", "25.05")
 611 | 
 612 |     Returns:
 613 |         Plain text statistics including package/option counts
 614 |     """
 615 |     channels = get_channels()
 616 |     if channel not in channels:
 617 |         suggestions = get_channel_suggestions(channel)
 618 |         return error(f"Invalid channel '{channel}'. {suggestions}")
 619 | 
 620 |     try:
 621 |         index = channels[channel]
 622 |         url = f"{NIXOS_API}/{index}/_count"
 623 | 
 624 |         # Get counts with error handling
 625 |         try:
 626 |             pkg_resp = requests.post(url, json={"query": {"term": {"type": "package"}}}, auth=NIXOS_AUTH, timeout=10)
 627 |             pkg_resp.raise_for_status()
 628 |             pkg_count = pkg_resp.json().get("count", 0)
 629 |         except Exception:
 630 |             pkg_count = 0
 631 | 
 632 |         try:
 633 |             opt_resp = requests.post(url, json={"query": {"term": {"type": "option"}}}, auth=NIXOS_AUTH, timeout=10)
 634 |             opt_resp.raise_for_status()
 635 |             opt_count = opt_resp.json().get("count", 0)
 636 |         except Exception:
 637 |             opt_count = 0
 638 | 
 639 |         if pkg_count == 0 and opt_count == 0:
 640 |             return error("Failed to retrieve statistics")
 641 | 
 642 |         return f"""NixOS Statistics for {channel} channel:
 643 | • Packages: {pkg_count:,}
 644 | • Options: {opt_count:,}"""
 645 | 
 646 |     except Exception as e:
 647 |         return error(str(e))
 648 | 
 649 | 
 650 | @mcp.tool()
 651 | async def home_manager_search(query: str, limit: int = 20) -> str:
 652 |     """Search Home Manager configuration options.
 653 | 
 654 |     Searches through available Home Manager options by name and description.
 655 | 
 656 |     Args:
 657 |         query: The search query string to match against option names and descriptions
 658 |         limit: Maximum number of results to return (default: 20, max: 100)
 659 | 
 660 |     Returns:
 661 |         Plain text list of matching options with name, type, and description
 662 |     """
 663 |     if not 1 <= limit <= 100:
 664 |         return error("Limit must be 1-100")
 665 | 
 666 |     try:
 667 |         options = parse_html_options(HOME_MANAGER_URL, query, "", limit)
 668 | 
 669 |         if not options:
 670 |             return f"No Home Manager options found matching '{query}'"
 671 | 
 672 |         results = []
 673 |         results.append(f"Found {len(options)} Home Manager options matching '{query}':\n")
 674 | 
 675 |         for opt in options:
 676 |             results.append(f"• {opt['name']}")
 677 |             if opt["type"]:
 678 |                 results.append(f"  Type: {opt['type']}")
 679 |             if opt["description"]:
 680 |                 results.append(f"  {opt['description']}")
 681 |             results.append("")
 682 | 
 683 |         return "\n".join(results).strip()
 684 | 
 685 |     except Exception as e:
 686 |         return error(str(e))
 687 | 
 688 | 
 689 | @mcp.tool()
 690 | async def home_manager_info(name: str) -> str:
 691 |     """Get detailed information about a specific Home Manager option.
 692 | 
 693 |     Requires an exact option name match. If not found, suggests similar options.
 694 | 
 695 |     Args:
 696 |         name: The exact option name (e.g., 'programs.git.enable')
 697 | 
 698 |     Returns:
 699 |         Plain text with option details (name, type, description) or error with suggestions
 700 |     """
 701 |     try:
 702 |         # Search more broadly first
 703 |         options = parse_html_options(HOME_MANAGER_URL, name, "", 100)
 704 | 
 705 |         # Look for exact match
 706 |         for opt in options:
 707 |             if opt["name"] == name:
 708 |                 info = []
 709 |                 info.append(f"Option: {name}")
 710 |                 if opt["type"]:
 711 |                     info.append(f"Type: {opt['type']}")
 712 |                 if opt["description"]:
 713 |                     info.append(f"Description: {opt['description']}")
 714 |                 return "\n".join(info)
 715 | 
 716 |         # If not found, check if there are similar options to suggest
 717 |         if options:
 718 |             suggestions = []
 719 |             for opt in options[:5]:  # Show up to 5 suggestions
 720 |                 if name in opt["name"] or opt["name"].startswith(name + "."):
 721 |                     suggestions.append(opt["name"])
 722 | 
 723 |             if suggestions:
 724 |                 return error(
 725 |                     f"Option '{name}' not found. Did you mean one of these?\n"
 726 |                     + "\n".join(f"  • {s}" for s in suggestions)
 727 |                     + f"\n\nTip: Use home_manager_options_by_prefix('{name}') to browse all options with this prefix.",
 728 |                     "NOT_FOUND",
 729 |                 )
 730 | 
 731 |         return error(
 732 |             f"Option '{name}' not found.\n"
 733 |             + f"Tip: Use home_manager_options_by_prefix('{name}') to browse available options.",
 734 |             "NOT_FOUND",
 735 |         )
 736 | 
 737 |     except Exception as e:
 738 |         return error(str(e))
 739 | 
 740 | 
 741 | @mcp.tool()
 742 | async def home_manager_stats() -> str:
 743 |     """Get statistics about Home Manager options.
 744 | 
 745 |     Retrieves overall statistics including total options, categories, and top categories.
 746 | 
 747 |     Returns:
 748 |         Plain text summary with total options, category count, and top 5 categories
 749 |     """
 750 |     try:
 751 |         # Parse all options to get statistics
 752 |         options = parse_html_options(HOME_MANAGER_URL, limit=5000)
 753 | 
 754 |         if not options:
 755 |             return error("Failed to fetch Home Manager statistics")
 756 | 
 757 |         # Count categories
 758 |         categories: dict[str, int] = {}
 759 |         for opt in options:
 760 |             cat = opt["name"].split(".")[0]
 761 |             categories[cat] = categories.get(cat, 0) + 1
 762 | 
 763 |         # Count types
 764 |         types: dict[str, int] = {}
 765 |         for opt in options:
 766 |             opt_type = opt.get("type", "unknown")
 767 |             if opt_type:
 768 |                 # Simplify complex types
 769 |                 if "null or" in opt_type:
 770 |                     opt_type = "nullable"
 771 |                 elif "list of" in opt_type:
 772 |                     opt_type = "list"
 773 |                 elif "attribute set" in opt_type:
 774 |                     opt_type = "attribute set"
 775 |                 types[opt_type] = types.get(opt_type, 0) + 1
 776 | 
 777 |         # Build statistics
 778 |         return f"""Home Manager Statistics:
 779 | • Total options: {len(options):,}
 780 | • Categories: {len(categories)}
 781 | • Top categories:
 782 |   - programs: {categories.get("programs", 0):,} options
 783 |   - services: {categories.get("services", 0):,} options
 784 |   - home: {categories.get("home", 0):,} options
 785 |   - wayland: {categories.get("wayland", 0):,} options
 786 |   - xsession: {categories.get("xsession", 0):,} options"""
 787 |     except Exception as e:
 788 |         return error(str(e))
 789 | 
 790 | 
 791 | @mcp.tool()
 792 | async def home_manager_list_options() -> str:
 793 |     """List all Home Manager option categories.
 794 | 
 795 |     Enumerates all top-level categories with their option counts.
 796 | 
 797 |     Returns:
 798 |         Plain text list of categories sorted alphabetically with option counts
 799 |     """
 800 |     try:
 801 |         # Get more options to see all categories (default 100 is too few)
 802 |         options = parse_html_options(HOME_MANAGER_URL, limit=5000)
 803 |         categories: dict[str, int] = {}
 804 | 
 805 |         for opt in options:
 806 |             name = opt["name"]
 807 |             # Process option names
 808 |             if name and not name.startswith("."):
 809 |                 if "." in name:
 810 |                     cat = name.split(".")[0]
 811 |                 else:
 812 |                     cat = name  # Option without dot is its own category
 813 |                 # Valid categories should:
 814 |                 # - Be more than 1 character
 815 |                 # - Be a valid identifier (allows underscores)
 816 |                 # - Not be common value words
 817 |                 # - Match typical nix option category patterns
 818 |                 if (
 819 |                     len(cat) > 1 and cat.isidentifier() and (cat.islower() or cat.startswith("_"))
 820 |                 ):  # This ensures valid identifier
 821 |                     # Additional filtering for known valid categories
 822 |                     valid_categories = {
 823 |                         "accounts",
 824 |                         "dconf",
 825 |                         "editorconfig",
 826 |                         "fonts",
 827 |                         "gtk",
 828 |                         "home",
 829 |                         "i18n",
 830 |                         "launchd",
 831 |                         "lib",
 832 |                         "manual",
 833 |                         "news",
 834 |                         "nix",
 835 |                         "nixgl",
 836 |                         "nixpkgs",
 837 |                         "pam",
 838 |                         "programs",
 839 |                         "qt",
 840 |                         "services",
 841 |                         "specialisation",
 842 |                         "systemd",
 843 |                         "targets",
 844 |                         "wayland",
 845 |                         "xdg",
 846 |                         "xresources",
 847 |                         "xsession",
 848 |                     }
 849 |                     # Only include if it's in the known valid list or looks like a typical category
 850 |                     if cat in valid_categories or (len(cat) >= 3 and not any(char.isdigit() for char in cat)):
 851 |                         categories[cat] = categories.get(cat, 0) + 1
 852 | 
 853 |         results = []
 854 |         results.append(f"Home Manager option categories ({len(categories)} total):\n")
 855 | 
 856 |         # Sort by count descending, then alphabetically
 857 |         sorted_cats = sorted(categories.items(), key=lambda x: (-x[1], x[0]))
 858 | 
 859 |         for cat, count in sorted_cats:
 860 |             results.append(f"• {cat} ({count} options)")
 861 | 
 862 |         return "\n".join(results)
 863 | 
 864 |     except Exception as e:
 865 |         return error(str(e))
 866 | 
 867 | 
 868 | @mcp.tool()
 869 | async def home_manager_options_by_prefix(option_prefix: str) -> str:
 870 |     """Get Home Manager options matching a specific prefix.
 871 | 
 872 |     Useful for browsing options under a category or finding exact option names.
 873 | 
 874 |     Args:
 875 |         option_prefix: The prefix to match (e.g., 'programs.git' or 'services')
 876 | 
 877 |     Returns:
 878 |         Plain text list of options with the given prefix, including descriptions
 879 |     """
 880 |     try:
 881 |         options = parse_html_options(HOME_MANAGER_URL, "", option_prefix)
 882 | 
 883 |         if not options:
 884 |             return f"No Home Manager options found with prefix '{option_prefix}'"
 885 | 
 886 |         results = []
 887 |         results.append(f"Home Manager options with prefix '{option_prefix}' ({len(options)} found):\n")
 888 | 
 889 |         for opt in sorted(options, key=lambda x: x["name"]):
 890 |             results.append(f"• {opt['name']}")
 891 |             if opt["description"]:
 892 |                 results.append(f"  {opt['description']}")
 893 |             results.append("")
 894 | 
 895 |         return "\n".join(results).strip()
 896 | 
 897 |     except Exception as e:
 898 |         return error(str(e))
 899 | 
 900 | 
 901 | @mcp.tool()
 902 | async def darwin_search(query: str, limit: int = 20) -> str:
 903 |     """Search nix-darwin (macOS) configuration options.
 904 | 
 905 |     Searches through available nix-darwin options by name and description.
 906 | 
 907 |     Args:
 908 |         query: The search query string to match against option names and descriptions
 909 |         limit: Maximum number of results to return (default: 20, max: 100)
 910 | 
 911 |     Returns:
 912 |         Plain text list of matching options with name, type, and description
 913 |     """
 914 |     if not 1 <= limit <= 100:
 915 |         return error("Limit must be 1-100")
 916 | 
 917 |     try:
 918 |         options = parse_html_options(DARWIN_URL, query, "", limit)
 919 | 
 920 |         if not options:
 921 |             return f"No nix-darwin options found matching '{query}'"
 922 | 
 923 |         results = []
 924 |         results.append(f"Found {len(options)} nix-darwin options matching '{query}':\n")
 925 | 
 926 |         for opt in options:
 927 |             results.append(f"• {opt['name']}")
 928 |             if opt["type"]:
 929 |                 results.append(f"  Type: {opt['type']}")
 930 |             if opt["description"]:
 931 |                 results.append(f"  {opt['description']}")
 932 |             results.append("")
 933 | 
 934 |         return "\n".join(results).strip()
 935 | 
 936 |     except Exception as e:
 937 |         return error(str(e))
 938 | 
 939 | 
 940 | @mcp.tool()
 941 | async def darwin_info(name: str) -> str:
 942 |     """Get detailed information about a specific nix-darwin option.
 943 | 
 944 |     Requires an exact option name match. If not found, suggests similar options.
 945 | 
 946 |     Args:
 947 |         name: The exact option name (e.g., 'system.defaults.dock.autohide')
 948 | 
 949 |     Returns:
 950 |         Plain text with option details (name, type, description) or error with suggestions
 951 |     """
 952 |     try:
 953 |         # Search more broadly first
 954 |         options = parse_html_options(DARWIN_URL, name, "", 100)
 955 | 
 956 |         # Look for exact match
 957 |         for opt in options:
 958 |             if opt["name"] == name:
 959 |                 info = []
 960 |                 info.append(f"Option: {name}")
 961 |                 if opt["type"]:
 962 |                     info.append(f"Type: {opt['type']}")
 963 |                 if opt["description"]:
 964 |                     info.append(f"Description: {opt['description']}")
 965 |                 return "\n".join(info)
 966 | 
 967 |         # If not found, check if there are similar options to suggest
 968 |         if options:
 969 |             suggestions = []
 970 |             for opt in options[:5]:  # Show up to 5 suggestions
 971 |                 if name in opt["name"] or opt["name"].startswith(name + "."):
 972 |                     suggestions.append(opt["name"])
 973 | 
 974 |             if suggestions:
 975 |                 return error(
 976 |                     f"Option '{name}' not found. Did you mean one of these?\n"
 977 |                     + "\n".join(f"  • {s}" for s in suggestions)
 978 |                     + f"\n\nTip: Use darwin_options_by_prefix('{name}') to browse all options with this prefix.",
 979 |                     "NOT_FOUND",
 980 |                 )
 981 | 
 982 |         return error(
 983 |             f"Option '{name}' not found.\n"
 984 |             + f"Tip: Use darwin_options_by_prefix('{name}') to browse available options.",
 985 |             "NOT_FOUND",
 986 |         )
 987 | 
 988 |     except Exception as e:
 989 |         return error(str(e))
 990 | 
 991 | 
 992 | @mcp.tool()
 993 | async def darwin_stats() -> str:
 994 |     """Get statistics about nix-darwin options.
 995 | 
 996 |     Retrieves overall statistics including total options, categories, and top categories.
 997 | 
 998 |     Returns:
 999 |         Plain text summary with total options, category count, and top 5 categories
1000 |     """
1001 |     try:
1002 |         # Parse all options to get statistics
1003 |         options = parse_html_options(DARWIN_URL, limit=3000)
1004 | 
1005 |         if not options:
1006 |             return error("Failed to fetch nix-darwin statistics")
1007 | 
1008 |         # Count categories
1009 |         categories: dict[str, int] = {}
1010 |         for opt in options:
1011 |             cat = opt["name"].split(".")[0]
1012 |             categories[cat] = categories.get(cat, 0) + 1
1013 | 
1014 |         # Count types
1015 |         types: dict[str, int] = {}
1016 |         for opt in options:
1017 |             opt_type = opt.get("type", "unknown")
1018 |             if opt_type:
1019 |                 # Simplify complex types
1020 |                 if "null or" in opt_type:
1021 |                     opt_type = "nullable"
1022 |                 elif "list of" in opt_type:
1023 |                     opt_type = "list"
1024 |                 elif "attribute set" in opt_type:
1025 |                     opt_type = "attribute set"
1026 |                 types[opt_type] = types.get(opt_type, 0) + 1
1027 | 
1028 |         # Build statistics
1029 |         return f"""nix-darwin Statistics:
1030 | • Total options: {len(options):,}
1031 | • Categories: {len(categories)}
1032 | • Top categories:
1033 |   - services: {categories.get("services", 0):,} options
1034 |   - system: {categories.get("system", 0):,} options
1035 |   - launchd: {categories.get("launchd", 0):,} options
1036 |   - programs: {categories.get("programs", 0):,} options
1037 |   - homebrew: {categories.get("homebrew", 0):,} options"""
1038 |     except Exception as e:
1039 |         return error(str(e))
1040 | 
1041 | 
1042 | @mcp.tool()
1043 | async def darwin_list_options() -> str:
1044 |     """List all nix-darwin option categories.
1045 | 
1046 |     Enumerates all top-level categories with their option counts.
1047 | 
1048 |     Returns:
1049 |         Plain text list of categories sorted alphabetically with option counts
1050 |     """
1051 |     try:
1052 |         # Get more options to see all categories (default 100 is too few)
1053 |         options = parse_html_options(DARWIN_URL, limit=2000)
1054 |         categories: dict[str, int] = {}
1055 | 
1056 |         for opt in options:
1057 |             name = opt["name"]
1058 |             # Process option names
1059 |             if name and not name.startswith("."):
1060 |                 if "." in name:
1061 |                     cat = name.split(".")[0]
1062 |                 else:
1063 |                     cat = name  # Option without dot is its own category
1064 |                 # Valid categories should:
1065 |                 # - Be more than 1 character
1066 |                 # - Be a valid identifier (allows underscores)
1067 |                 # - Not be common value words
1068 |                 # - Match typical nix option category patterns
1069 |                 if (
1070 |                     len(cat) > 1 and cat.isidentifier() and (cat.islower() or cat.startswith("_"))
1071 |                 ):  # This ensures valid identifier
1072 |                     # Additional filtering for known valid Darwin categories
1073 |                     valid_categories = {
1074 |                         "documentation",
1075 |                         "environment",
1076 |                         "fonts",
1077 |                         "homebrew",
1078 |                         "ids",
1079 |                         "launchd",
1080 |                         "networking",
1081 |                         "nix",
1082 |                         "nixpkgs",
1083 |                         "power",
1084 |                         "programs",
1085 |                         "security",
1086 |                         "services",
1087 |                         "system",
1088 |                         "targets",
1089 |                         "time",
1090 |                         "users",
1091 |                     }
1092 |                     # Only include if it's in the known valid list or looks like a typical category
1093 |                     if cat in valid_categories or (len(cat) >= 3 and not any(char.isdigit() for char in cat)):
1094 |                         categories[cat] = categories.get(cat, 0) + 1
1095 | 
1096 |         results = []
1097 |         results.append(f"nix-darwin option categories ({len(categories)} total):\n")
1098 | 
1099 |         # Sort by count descending, then alphabetically
1100 |         sorted_cats = sorted(categories.items(), key=lambda x: (-x[1], x[0]))
1101 | 
1102 |         for cat, count in sorted_cats:
1103 |             results.append(f"• {cat} ({count} options)")
1104 | 
1105 |         return "\n".join(results)
1106 | 
1107 |     except Exception as e:
1108 |         return error(str(e))
1109 | 
1110 | 
1111 | @mcp.tool()
1112 | async def darwin_options_by_prefix(option_prefix: str) -> str:
1113 |     """Get nix-darwin options matching a specific prefix.
1114 | 
1115 |     Useful for browsing options under a category or finding exact option names.
1116 | 
1117 |     Args:
1118 |         option_prefix: The prefix to match (e.g., 'system.defaults' or 'services')
1119 | 
1120 |     Returns:
1121 |         Plain text list of options with the given prefix, including descriptions
1122 |     """
1123 |     try:
1124 |         options = parse_html_options(DARWIN_URL, "", option_prefix)
1125 | 
1126 |         if not options:
1127 |             return f"No nix-darwin options found with prefix '{option_prefix}'"
1128 | 
1129 |         results = []
1130 |         results.append(f"nix-darwin options with prefix '{option_prefix}' ({len(options)} found):\n")
1131 | 
1132 |         for opt in sorted(options, key=lambda x: x["name"]):
1133 |             results.append(f"• {opt['name']}")
1134 |             if opt["description"]:
1135 |                 results.append(f"  {opt['description']}")
1136 |             results.append("")
1137 | 
1138 |         return "\n".join(results).strip()
1139 | 
1140 |     except Exception as e:
1141 |         return error(str(e))
1142 | 
1143 | 
1144 | @mcp.tool()
1145 | async def nixos_flakes_stats() -> str:
1146 |     """Get statistics about available NixOS flakes.
1147 | 
1148 |     Retrieves statistics from the flake search index including total packages,
1149 |     unique repositories, flake types, and top contributors.
1150 | 
1151 |     Returns:
1152 |         Plain text summary with flake statistics and top contributors
1153 |     """
1154 |     try:
1155 |         # Use the same alias as the web UI for accurate counts
1156 |         flake_index = "latest-43-group-manual"
1157 | 
1158 |         # Get total count of flake packages (not options or apps)
1159 |         try:
1160 |             resp = requests.post(
1161 |                 f"{NIXOS_API}/{flake_index}/_count",
1162 |                 json={"query": {"term": {"type": "package"}}},
1163 |                 auth=NIXOS_AUTH,
1164 |                 timeout=10,
1165 |             )
1166 |             resp.raise_for_status()
1167 |             total_packages = resp.json().get("count", 0)
1168 |         except requests.HTTPError as e:
1169 |             if e.response.status_code == 404:
1170 |                 return error("Flake indices not found. Flake search may be temporarily unavailable.")
1171 |             raise
1172 | 
1173 |         # Get unique flakes by sampling documents
1174 |         # Since aggregations on text fields don't work, we'll sample and count manually
1175 |         unique_urls = set()
1176 |         type_counts: dict[str, int] = {}
1177 |         contributor_counts: dict[str, int] = {}
1178 | 
1179 |         try:
1180 |             # Get a large sample of documents to count unique flakes
1181 |             resp = requests.post(
1182 |                 f"{NIXOS_API}/{flake_index}/_search",
1183 |                 json={
1184 |                     "size": 10000,  # Get a large sample
1185 |                     "query": {"term": {"type": "package"}},  # Only packages
1186 |                     "_source": ["flake_resolved", "flake_name", "package_pname"],
1187 |                 },
1188 |                 auth=NIXOS_AUTH,
1189 |                 timeout=10,
1190 |             )
1191 |             resp.raise_for_status()
1192 |             data = resp.json()
1193 |             hits = data.get("hits", {}).get("hits", [])
1194 | 
1195 |             # Process hits to extract unique URLs
1196 |             for hit in hits:
1197 |                 src = hit.get("_source", {})
1198 |                 resolved = src.get("flake_resolved", {})
1199 | 
1200 |                 if isinstance(resolved, dict) and "url" in resolved:
1201 |                     url = resolved["url"]
1202 |                     unique_urls.add(url)
1203 | 
1204 |                     # Count types
1205 |                     flake_type = resolved.get("type", "unknown")
1206 |                     type_counts[flake_type] = type_counts.get(flake_type, 0) + 1
1207 | 
1208 |                     # Extract contributor from URL
1209 |                     contributor = None
1210 |                     if "github.com/" in url:
1211 |                         parts = url.split("github.com/")[1].split("/")
1212 |                         if parts:
1213 |                             contributor = parts[0]
1214 |                     elif "codeberg.org/" in url:
1215 |                         parts = url.split("codeberg.org/")[1].split("/")
1216 |                         if parts:
1217 |                             contributor = parts[0]
1218 |                     elif "sr.ht/~" in url:
1219 |                         parts = url.split("sr.ht/~")[1].split("/")
1220 |                         if parts:
1221 |                             contributor = parts[0]
1222 | 
1223 |                     if contributor:
1224 |                         contributor_counts[contributor] = contributor_counts.get(contributor, 0) + 1
1225 | 
1226 |             unique_count = len(unique_urls)
1227 | 
1228 |             # Format type info
1229 |             type_info = []
1230 |             for type_name, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
1231 |                 if type_name:
1232 |                     type_info.append(f"  - {type_name}: {count:,}")
1233 | 
1234 |             # Format contributor info
1235 |             owner_info = []
1236 |             for contributor, count in sorted(contributor_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
1237 |                 owner_info.append(f"  - {contributor}: {count:,} packages")
1238 | 
1239 |         except Exception:
1240 |             # Fallback if query fails
1241 |             unique_count = 0
1242 |             type_info = []
1243 |             owner_info = []
1244 | 
1245 |         # Build statistics
1246 |         results = []
1247 |         results.append("NixOS Flakes Statistics:")
1248 |         results.append(f"• Available flakes: {total_packages:,}")
1249 |         if unique_count > 0:
1250 |             results.append(f"• Unique repositories: {unique_count:,}")
1251 | 
1252 |         if type_info:
1253 |             results.append("• Flake types:")
1254 |             results.extend(type_info)
1255 | 
1256 |         if owner_info:
1257 |             results.append("• Top contributors:")
1258 |             results.extend(owner_info)
1259 | 
1260 |         results.append("\nNote: Flakes are community-contributed and indexed separately from official packages.")
1261 | 
1262 |         return "\n".join(results)
1263 | 
1264 |     except Exception as e:
1265 |         return error(str(e))
1266 | 
1267 | 
1268 | async def _nixos_flakes_search_impl(query: str, limit: int = 20, channel: str = "unstable") -> str:
1269 |     """Internal implementation for flakes search."""
1270 |     if not 1 <= limit <= 100:
1271 |         return error("Limit must be 1-100")
1272 | 
1273 |     try:
1274 |         # Use the same alias as the web UI to get only flake packages
1275 |         flake_index = "latest-43-group-manual"
1276 | 
1277 |         # Build query for flakes
1278 |         if query.strip() == "" or query == "*":
1279 |             # Empty or wildcard query - get all flakes
1280 |             q: dict[str, Any] = {"match_all": {}}
1281 |         else:
1282 |             # Search query with multiple fields, including nested queries for flake_resolved
1283 |             q = {
1284 |                 "bool": {
1285 |                     "should": [
1286 |                         {"match": {"flake_name": {"query": query, "boost": 3}}},
1287 |                         {"match": {"flake_description": {"query": query, "boost": 2}}},
1288 |                         {"match": {"package_pname": {"query": query, "boost": 1.5}}},
1289 |                         {"match": {"package_description": query}},
1290 |                         {"wildcard": {"flake_name": {"value": f"*{query}*", "boost": 2.5}}},
1291 |                         {"wildcard": {"package_pname": {"value": f"*{query}*", "boost": 1}}},
1292 |                         {"prefix": {"flake_name": {"value": query, "boost": 2}}},
1293 |                         # Nested queries for flake_resolved fields
1294 |                         {
1295 |                             "nested": {
1296 |                                 "path": "flake_resolved",
1297 |                                 "query": {"term": {"flake_resolved.owner": query.lower()}},
1298 |                                 "boost": 2,
1299 |                             }
1300 |                         },
1301 |                         {
1302 |                             "nested": {
1303 |                                 "path": "flake_resolved",
1304 |                                 "query": {"term": {"flake_resolved.repo": query.lower()}},
1305 |                                 "boost": 2,
1306 |                             }
1307 |                         },
1308 |                     ],
1309 |                     "minimum_should_match": 1,
1310 |                 }
1311 |             }
1312 | 
1313 |         # Execute search with package filter to match web UI
1314 |         search_query = {"bool": {"filter": [{"term": {"type": "package"}}], "must": [q]}}
1315 | 
1316 |         try:
1317 |             resp = requests.post(
1318 |                 f"{NIXOS_API}/{flake_index}/_search",
1319 |                 json={"query": search_query, "size": limit * 5, "track_total_hits": True},  # Get more results
1320 |                 auth=NIXOS_AUTH,
1321 |                 timeout=10,
1322 |             )
1323 |             resp.raise_for_status()
1324 |             data = resp.json()
1325 |             hits = data.get("hits", {}).get("hits", [])
1326 |             total = data.get("hits", {}).get("total", {}).get("value", 0)
1327 |         except requests.HTTPError as e:
1328 |             if e.response and e.response.status_code == 404:
1329 |                 # No flake indices found
1330 |                 return error("Flake indices not found. Flake search may be temporarily unavailable.")
1331 |             raise
1332 | 
1333 |         # Format results as plain text
1334 |         if not hits:
1335 |             return f"""No flakes found matching '{query}'.
1336 | 
1337 | Try searching for:
1338 | • Popular flakes: nixpkgs, home-manager, flake-utils, devenv
1339 | • By owner: nix-community, numtide, cachix
1340 | • By topic: python, rust, nodejs, devops
1341 | 
1342 | Browse flakes at:
1343 | • GitHub: https://github.com/topics/nix-flakes
1344 | • FlakeHub: https://flakehub.com/"""
1345 | 
1346 |         # Group hits by flake to avoid duplicates
1347 |         flakes = {}
1348 |         packages_only = []  # For entries without flake metadata
1349 | 
1350 |         for hit in hits:
1351 |             src = hit.get("_source", {})
1352 | 
1353 |             # Get flake information
1354 |             flake_name = src.get("flake_name", "").strip()
1355 |             package_pname = src.get("package_pname", "")
1356 |             resolved = src.get("flake_resolved", {})
1357 | 
1358 |             # Skip entries without any useful name
1359 |             if not flake_name and not package_pname:
1360 |                 continue
1361 | 
1362 |             # If we have flake metadata (resolved), use it to create unique key
1363 |             if isinstance(resolved, dict) and (resolved.get("owner") or resolved.get("repo") or resolved.get("url")):
1364 |                 owner = resolved.get("owner", "")
1365 |                 repo = resolved.get("repo", "")
1366 |                 url = resolved.get("url", "")
1367 | 
1368 |                 # Create a unique key based on available info
1369 |                 if owner and repo:
1370 |                     flake_key = f"{owner}/{repo}"
1371 |                     display_name = flake_name or repo or package_pname
1372 |                 elif url:
1373 |                     # Extract name from URL for git repos
1374 |                     flake_key = url
1375 |                     if "/" in url:
1376 |                         display_name = flake_name or url.rstrip("/").split("/")[-1].replace(".git", "") or package_pname
1377 |                     else:
1378 |                         display_name = flake_name or package_pname
1379 |                 else:
1380 |                     flake_key = flake_name or package_pname
1381 |                     display_name = flake_key
1382 | 
1383 |                 # Initialize flake entry if not seen
1384 |                 if flake_key not in flakes:
1385 |                     flakes[flake_key] = {
1386 |                         "name": display_name,
1387 |                         "description": src.get("flake_description") or src.get("package_description", ""),
1388 |                         "owner": owner,
1389 |                         "repo": repo,
1390 |                         "url": url,
1391 |                         "type": resolved.get("type", ""),
1392 |                         "packages": set(),  # Use set to avoid duplicates
1393 |                     }
1394 | 
1395 |                 # Add package if available
1396 |                 attr_name = src.get("package_attr_name", "")
1397 |                 if attr_name:
1398 |                     flakes[flake_key]["packages"].add(attr_name)
1399 | 
1400 |             elif flake_name:
1401 |                 # Has flake_name but no resolved metadata
1402 |                 flake_key = flake_name
1403 | 
1404 |                 if flake_key not in flakes:
1405 |                     flakes[flake_key] = {
1406 |                         "name": flake_name,
1407 |                         "description": src.get("flake_description") or src.get("package_description", ""),
1408 |                         "owner": "",
1409 |                         "repo": "",
1410 |                         "type": "",
1411 |                         "packages": set(),
1412 |                     }
1413 | 
1414 |                 # Add package if available
1415 |                 attr_name = src.get("package_attr_name", "")
1416 |                 if attr_name:
1417 |                     flakes[flake_key]["packages"].add(attr_name)
1418 | 
1419 |             else:
1420 |                 # Package without flake metadata - might still be relevant
1421 |                 packages_only.append(
1422 |                     {
1423 |                         "name": package_pname,
1424 |                         "description": src.get("package_description", ""),
1425 |                         "attr_name": src.get("package_attr_name", ""),
1426 |                     }
1427 |                 )
1428 | 
1429 |         # Build results
1430 |         results = []
1431 |         # Show both total hits and unique flakes
1432 |         if total > len(flakes):
1433 |             results.append(f"Found {total:,} total matches ({len(flakes)} unique flakes) matching '{query}':\n")
1434 |         else:
1435 |             results.append(f"Found {len(flakes)} unique flakes matching '{query}':\n")
1436 | 
1437 |         for flake in flakes.values():
1438 |             results.append(f"• {flake['name']}")
1439 |             if flake.get("owner") and flake.get("repo"):
1440 |                 results.append(
1441 |                     f"  Repository: {flake['owner']}/{flake['repo']}"
1442 |                     + (f" ({flake['type']})" if flake.get("type") else "")
1443 |                 )
1444 |             elif flake.get("url"):
1445 |                 results.append(f"  URL: {flake['url']}")
1446 |             if flake.get("description"):
1447 |                 desc = flake["description"]
1448 |                 if len(desc) > 200:
1449 |                     desc = desc[:200] + "..."
1450 |                 results.append(f"  {desc}")
1451 |             if flake["packages"]:
1452 |                 # Show max 5 packages, sorted
1453 |                 packages = sorted(flake["packages"])[:5]
1454 |                 if len(flake["packages"]) > 5:
1455 |                     results.append(f"  Packages: {', '.join(packages)}, ... ({len(flake['packages'])} total)")
1456 |                 else:
1457 |                     results.append(f"  Packages: {', '.join(packages)}")
1458 |             results.append("")
1459 | 
1460 |         return "\n".join(results).strip()
1461 | 
1462 |     except Exception as e:
1463 |         return error(str(e))
1464 | 
1465 | 
1466 | def _version_key(version_str: str) -> tuple[int, int, int]:
1467 |     """Convert version string to tuple for proper sorting."""
1468 |     try:
1469 |         parts = version_str.split(".")
1470 |         # Handle versions like "3.9.9" or "3.10.0-rc1"
1471 |         numeric_parts = []
1472 |         for part in parts[:3]:  # Major.Minor.Patch
1473 |             # Extract numeric part
1474 |             numeric = ""
1475 |             for char in part:
1476 |                 if char.isdigit():
1477 |                     numeric += char
1478 |                 else:
1479 |                     break
1480 |             if numeric:
1481 |                 numeric_parts.append(int(numeric))
1482 |             else:
1483 |                 numeric_parts.append(0)
1484 |         # Pad with zeros if needed
1485 |         while len(numeric_parts) < 3:
1486 |             numeric_parts.append(0)
1487 |         return (numeric_parts[0], numeric_parts[1], numeric_parts[2])
1488 |     except Exception:
1489 |         return (0, 0, 0)
1490 | 
1491 | 
1492 | def _format_nixhub_found_version(package_name: str, version: str, found_version: dict[str, Any]) -> str:
1493 |     """Format a found version for display."""
1494 |     results = []
1495 |     results.append(f"✓ Found {package_name} version {version}\n")
1496 | 
1497 |     last_updated = found_version.get("last_updated", "")
1498 |     if last_updated:
1499 |         try:
1500 |             from datetime import datetime
1501 | 
1502 |             dt = datetime.fromisoformat(last_updated.replace("Z", "+00:00"))
1503 |             formatted_date = dt.strftime("%Y-%m-%d %H:%M UTC")
1504 |             results.append(f"Last updated: {formatted_date}")
1505 |         except Exception:
1506 |             results.append(f"Last updated: {last_updated}")
1507 | 
1508 |     platforms_summary = found_version.get("platforms_summary", "")
1509 |     if platforms_summary:
1510 |         results.append(f"Platforms: {platforms_summary}")
1511 | 
1512 |     # Show commit hashes
1513 |     platforms = found_version.get("platforms", [])
1514 |     if platforms:
1515 |         results.append("\nNixpkgs commits:")
1516 |         seen_commits = set()
1517 | 
1518 |         for platform in platforms:
1519 |             attr_path = platform.get("attribute_path", "")
1520 |             commit_hash = platform.get("commit_hash", "")
1521 | 
1522 |             if commit_hash and commit_hash not in seen_commits:
1523 |                 seen_commits.add(commit_hash)
1524 |                 if re.match(r"^[a-fA-F0-9]{40}$", commit_hash):
1525 |                     results.append(f"• {commit_hash}")
1526 |                     if attr_path:
1527 |                         results.append(f"  Attribute: {attr_path}")
1528 | 
1529 |     results.append("\nTo use this version:")
1530 |     results.append("1. Pin nixpkgs to one of the commit hashes above")
1531 |     results.append("2. Install using the attribute path")
1532 | 
1533 |     return "\n".join(results)
1534 | 
1535 | 
1536 | def _format_nixhub_release(release: dict[str, Any], package_name: str | None = None) -> list[str]:
1537 |     """Format a single NixHub release for display."""
1538 |     results = []
1539 |     version = release.get("version", "unknown")
1540 |     last_updated = release.get("last_updated", "")
1541 |     platforms_summary = release.get("platforms_summary", "")
1542 |     platforms = release.get("platforms", [])
1543 | 
1544 |     results.append(f"• Version {version}")
1545 | 
1546 |     if last_updated:
1547 |         # Format date nicely
1548 |         try:
1549 |             from datetime import datetime
1550 | 
1551 |             dt = datetime.fromisoformat(last_updated.replace("Z", "+00:00"))
1552 |             formatted_date = dt.strftime("%Y-%m-%d %H:%M UTC")
1553 |             results.append(f"  Last updated: {formatted_date}")
1554 |         except Exception:
1555 |             results.append(f"  Last updated: {last_updated}")
1556 | 
1557 |     if platforms_summary:
1558 |         results.append(f"  Platforms: {platforms_summary}")
1559 | 
1560 |     # Show commit hashes and attribute paths for each platform (avoid duplicates)
1561 |     if platforms:
1562 |         seen_commits = set()
1563 |         for platform in platforms:
1564 |             commit_hash = platform.get("commit_hash", "")
1565 |             attr_path = platform.get("attribute_path", "")
1566 | 
1567 |             if commit_hash and commit_hash not in seen_commits:
1568 |                 seen_commits.add(commit_hash)
1569 |                 # Validate commit hash format (40 hex chars)
1570 |                 if re.match(r"^[a-fA-F0-9]{40}$", commit_hash):
1571 |                     results.append(f"  Nixpkgs commit: {commit_hash}")
1572 |                 else:
1573 |                     results.append(f"  Nixpkgs commit: {commit_hash} (warning: invalid format)")
1574 | 
1575 |                 # Show attribute path if different from package name
1576 |                 if attr_path and package_name and attr_path != package_name:
1577 |                     results.append(f"  Attribute: {attr_path}")
1578 | 
1579 |     return results
1580 | 
1581 | 
1582 | @mcp.tool()
1583 | async def nixos_flakes_search(query: str, limit: int = 20, channel: str = "unstable") -> str:
1584 |     """Search NixOS flakes by name, description, owner, or repository.
1585 | 
1586 |     Searches the flake index for community-contributed packages and configurations.
1587 |     Flakes are indexed separately from official packages.
1588 | 
1589 |     Args:
1590 |         query: The search query (flake name, description, owner, or repository)
1591 |         limit: Maximum number of results to return (default: 20, max: 100)
1592 |         channel: Ignored - flakes use a separate indexing system
1593 | 
1594 |     Returns:
1595 |         Plain text list of unique flakes with their packages and metadata
1596 |     """
1597 |     return await _nixos_flakes_search_impl(query, limit, channel)
1598 | 
1599 | 
1600 | @mcp.tool()
1601 | async def nixhub_package_versions(package_name: str, limit: int = 10) -> str:
1602 |     """Get version history and nixpkgs commit hashes for a specific package from NixHub.io.
1603 | 
1604 |     Use this tool when users need specific package versions or commit hashes for reproducible builds.
1605 | 
1606 |     Args:
1607 |         package_name: Name of the package to query (e.g., "firefox", "python")
1608 |         limit: Maximum number of versions to return (default: 10, max: 50)
1609 | 
1610 |     Returns:
1611 |         Plain text with package info and version history including commit hashes
1612 |     """
1613 |     # Validate inputs
1614 |     if not package_name or not package_name.strip():
1615 |         return error("Package name is required")
1616 | 
1617 |     # Sanitize package name - only allow alphanumeric, hyphens, underscores, dots
1618 |     if not re.match(r"^[a-zA-Z0-9\-_.]+$", package_name):
1619 |         return error("Invalid package name. Only letters, numbers, hyphens, underscores, and dots are allowed")
1620 | 
1621 |     if not 1 <= limit <= 50:
1622 |         return error("Limit must be between 1 and 50")
1623 | 
1624 |     try:
1625 |         # Construct NixHub API URL with the _data parameter
1626 |         url = f"https://www.nixhub.io/packages/{package_name}?_data=routes%2F_nixhub.packages.%24pkg._index"
1627 | 
1628 |         # Make request with timeout and proper headers
1629 |         headers = {"Accept": "application/json", "User-Agent": "mcp-nixos/1.0.0"}  # Identify ourselves
1630 | 
1631 |         resp = requests.get(url, headers=headers, timeout=15)
1632 | 
1633 |         # Handle different HTTP status codes
1634 |         if resp.status_code == 404:
1635 |             return error(f"Package '{package_name}' not found in NixHub", "NOT_FOUND")
1636 |         if resp.status_code >= 500:
1637 |             # NixHub returns 500 for non-existent packages with unusual names
1638 |             # Check if the package name looks suspicious
1639 |             if len(package_name) > 30 or package_name.count("-") > 5:
1640 |                 return error(f"Package '{package_name}' not found in NixHub", "NOT_FOUND")
1641 |             return error("NixHub service temporarily unavailable", "SERVICE_ERROR")
1642 | 
1643 |         resp.raise_for_status()
1644 | 
1645 |         # Parse JSON response
1646 |         data = resp.json()
1647 | 
1648 |         # Validate response structure
1649 |         if not isinstance(data, dict):
1650 |             return error("Invalid response format from NixHub")
1651 | 
1652 |         # Extract package info
1653 |         # Use the requested package name, not what API returns (e.g., user asks for python3, API returns python)
1654 |         name = package_name
1655 |         summary = data.get("summary", "")
1656 |         releases = data.get("releases", [])
1657 | 
1658 |         if not releases:
1659 |             return f"Package: {name}\nNo version history available in NixHub"
1660 | 
1661 |         # Build results
1662 |         results = []
1663 |         results.append(f"Package: {name}")
1664 |         if summary:
1665 |             results.append(f"Description: {summary}")
1666 |         results.append(f"Total versions: {len(releases)}")
1667 |         results.append("")
1668 | 
1669 |         # Limit results
1670 |         shown_releases = releases[:limit]
1671 | 
1672 |         results.append(f"Version history (showing {len(shown_releases)} of {len(releases)}):\n")
1673 | 
1674 |         for release in shown_releases:
1675 |             results.extend(_format_nixhub_release(release, name))
1676 |             results.append("")
1677 | 
1678 |         # Add usage hint
1679 |         if shown_releases and any(r.get("platforms", [{}])[0].get("commit_hash") for r in shown_releases):
1680 |             results.append("To use a specific version in your Nix configuration:")
1681 |             results.append("1. Pin nixpkgs to the commit hash")
1682 |             results.append("2. Use the attribute path to install the package")
1683 | 
1684 |         return "\n".join(results).strip()
1685 | 
1686 |     except requests.Timeout:
1687 |         return error("Request to NixHub timed out", "TIMEOUT")
1688 |     except requests.RequestException as e:
1689 |         return error(f"Network error accessing NixHub: {str(e)}", "NETWORK_ERROR")
1690 |     except ValueError as e:
1691 |         return error(f"Failed to parse NixHub response: {str(e)}", "PARSE_ERROR")
1692 |     except Exception as e:
1693 |         return error(f"Unexpected error: {str(e)}")
1694 | 
1695 | 
1696 | @mcp.tool()
1697 | async def nixhub_find_version(package_name: str, version: str) -> str:
1698 |     """Find a specific version of a package in NixHub with smart search.
1699 | 
1700 |     Automatically searches with increasing limits to find the requested version.
1701 | 
1702 |     Args:
1703 |         package_name: Name of the package to query (e.g., "ruby", "python")
1704 |         version: Specific version to find (e.g., "2.6.7", "3.5.9")
1705 | 
1706 |     Returns:
1707 |         Plain text with version info and commit hash if found, or helpful message if not
1708 |     """
1709 |     # Validate inputs
1710 |     if not package_name or not package_name.strip():
1711 |         return error("Package name is required")
1712 | 
1713 |     if not version or not version.strip():
1714 |         return error("Version is required")
1715 | 
1716 |     # Sanitize inputs
1717 |     if not re.match(r"^[a-zA-Z0-9\-_.]+$", package_name):
1718 |         return error("Invalid package name. Only letters, numbers, hyphens, underscores, and dots are allowed")
1719 | 
1720 |     # Try with incremental limits
1721 |     limits_to_try = [10, 25, 50]
1722 |     found_version = None
1723 |     all_versions: list[dict[str, Any]] = []
1724 | 
1725 |     for limit in limits_to_try:
1726 |         try:
1727 |             # Make request - handle special cases for package names
1728 |             nixhub_name = package_name
1729 |             # Common package name mappings
1730 |             if package_name == "python":
1731 |                 nixhub_name = "python3"
1732 |             elif package_name == "python2":
1733 |                 nixhub_name = "python"
1734 | 
1735 |             url = f"https://www.nixhub.io/packages/{nixhub_name}?_data=routes%2F_nixhub.packages.%24pkg._index"
1736 |             headers = {"Accept": "application/json", "User-Agent": "mcp-nixos/1.0.0"}
1737 | 
1738 |             resp = requests.get(url, headers=headers, timeout=15)
1739 | 
1740 |             if resp.status_code == 404:
1741 |                 return error(f"Package '{package_name}' not found in NixHub", "NOT_FOUND")
1742 |             if resp.status_code >= 500:
1743 |                 return error("NixHub service temporarily unavailable", "SERVICE_ERROR")
1744 | 
1745 |             resp.raise_for_status()
1746 |             data = resp.json()
1747 | 
1748 |             if not isinstance(data, dict):
1749 |                 return error("Invalid response format from NixHub")
1750 | 
1751 |             releases = data.get("releases", [])
1752 | 
1753 |             # Collect all versions seen
1754 |             for release in releases[:limit]:
1755 |                 release_version = release.get("version", "")
1756 |                 if release_version and release_version not in [v["version"] for v in all_versions]:
1757 |                     all_versions.append({"version": release_version, "release": release})
1758 | 
1759 |                 # Check if this is the version we're looking for
1760 |                 if release_version == version:
1761 |                     found_version = release
1762 |                     break
1763 | 
1764 |             if found_version:
1765 |                 break
1766 | 
1767 |         except requests.Timeout:
1768 |             return error("Request to NixHub timed out", "TIMEOUT")
1769 |         except requests.RequestException as e:
1770 |             return error(f"Network error accessing NixHub: {str(e)}", "NETWORK_ERROR")
1771 |         except Exception as e:
1772 |             return error(f"Unexpected error: {str(e)}")
1773 | 
1774 |     # Format response
1775 |     if found_version:
1776 |         return _format_nixhub_found_version(package_name, version, found_version)
1777 | 
1778 |     # Version not found - provide helpful information
1779 |     results = []
1780 |     results.append(f"✗ {package_name} version {version} not found in NixHub\n")
1781 | 
1782 |     # Show available versions
1783 |     if all_versions:
1784 |         results.append(f"Available versions (checked {len(all_versions)} total):")
1785 | 
1786 |         # Sort versions properly using version comparison
1787 |         sorted_versions = sorted(all_versions, key=lambda x: _version_key(x["version"]), reverse=True)
1788 | 
1789 |         # Find newest and oldest
1790 |         newest = sorted_versions[0]["version"]
1791 |         oldest = sorted_versions[-1]["version"]
1792 | 
1793 |         results.append(f"• Newest: {newest}")
1794 |         results.append(f"• Oldest: {oldest}")
1795 | 
1796 |         # Show version range summary
1797 |         major_versions = set()
1798 |         for v in all_versions:
1799 |             parts = v["version"].split(".")
1800 |             if parts:
1801 |                 major_versions.add(parts[0])
1802 | 
1803 |         if major_versions:
1804 |             results.append(f"• Major versions available: {', '.join(sorted(major_versions, reverse=True))}")
1805 | 
1806 |         # Check if requested version is older than available
1807 |         try:
1808 |             requested_parts = version.split(".")
1809 |             oldest_parts = oldest.split(".")
1810 | 
1811 |             if len(requested_parts) >= 2 and len(oldest_parts) >= 2:
1812 |                 req_major = int(requested_parts[0])
1813 |                 req_minor = int(requested_parts[1])
1814 |                 old_major = int(oldest_parts[0])
1815 |                 old_minor = int(oldest_parts[1])
1816 | 
1817 |                 if req_major < old_major or (req_major == old_major and req_minor < old_minor):
1818 |                     results.append(f"\nVersion {version} is older than the oldest available ({oldest})")
1819 |                     results.append("This version may have been removed after reaching end-of-life.")
1820 |         except (ValueError, IndexError):
1821 |             pass
1822 | 
1823 |         results.append("\nAlternatives:")
1824 |         results.append("• Use a newer version if possible")
1825 |         results.append("• Build from source with a custom derivation")
1826 |         results.append("• Use Docker/containers with the specific version")
1827 |         results.append("• Find an old nixpkgs commit from before the version was removed")
1828 | 
1829 |     return "\n".join(results)
1830 | 
1831 | 
1832 | def main() -> None:
1833 |     """Run the MCP server."""
1834 |     mcp.run()
1835 | 
1836 | 
1837 | if __name__ == "__main__":
1838 |     main()
1839 | 
```
Page 4/4FirstPrevNextLast