# Directory Structure
```
├── .gitignore
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│   └── mcp_rocq
│       ├── __init__.py
│       ├── __main__.py
│       ├── cli.py
│       ├── handlers
│       │   ├── __init__.py
│       │   ├── coq_session.py
│       │   ├── inductive_types.py
│       │   ├── prover.py
│       │   └── type_checker.py
│       ├── parsers
│       │   └── xml_parser.py
│       ├── patch.py
│       └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
 1 | # Python virtual environments
 2 | .venv/
 3 | venv/
 4 | ENV/
 5 | 
 6 | # Python cache files
 7 | __pycache__/
 8 | *.py[cod]
 9 | *$py.class
10 | 
11 | # Distribution / packaging
12 | dist/
13 | build/
14 | *.egg-info/
15 | *.egg
16 | 
17 | # Downloaded websites and their assets (project-specific)
18 | downloads/
19 | website_library/
20 | 
21 | # Test cache
22 | .pytest_cache/
23 | .coverage
24 | htmlcov/
25 | 
26 | # IDE settings
27 | .vscode/
28 | .idea/
29 | *.swp
30 | *.swo
31 | 
32 | # Environment variables
33 | .env
34 | .env.local
35 | 
36 | # Operating System
37 | .DS_Store
38 | Thumbs.db
39 | 
40 | # uv package manager
41 | .uv/
42 | 
43 | tests/
44 | 
45 | 
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
  1 | # MCP-RoCQ (Coq Reasoning Server)
  2 | 
  3 | # Currently shows tools but Claude can't use it properly for some reason- invalid syntax generally seems the issue but there could be something else.
  4 | 
  5 | There may be a better way to set this up with the coq cli or something.
  6 | Anyone want to try and fix it who knows what they are doing would be great.
  7 | 
  8 | MCP-RoCQ is a Model Context Protocol server that provides advanced logical reasoning capabilities through integration with the Coq proof assistant. It enables automated dependent type checking, inductive type definitions, and property proving with both custom tactics and automation.
  9 | 
 10 | ## Features
 11 | 
 12 | - **Automated Dependent Type Checking**: Verify terms against complex dependent types
 13 | - **Inductive Type Definition**: Define and automatically verify custom inductive data types
 14 | - **Property Proving**: Prove logical properties using custom tactics and automation
 15 | - **XML Protocol Integration**: Reliable structured communication with Coq
 16 | - **Rich Error Handling**: Detailed feedback for type errors and failed proofs
 17 | 
 18 | ## Installation
 19 | 
 20 | 1. Install the Coq Platform 8.19 (2024.10)
 21 | 
 22 | Coq is a formal proof management system. It provides a formal language to write mathematical definitions, executable algorithms and theorems together with an environment for semi-interactive development of machine-checked proofs.
 23 | 
 24 | [https://github.com/coq/platform](https://github.com/coq/platform)
 25 | 
 26 | 2. Clone this repository:
 27 | 
 28 | ```bash
 29 | git clone https://github.com/angrysky56/mcp-rocq.git
 30 | ```
 31 | 
 32 | cd to the repo
 33 | 
 34 | ```bash
 35 | uv venv
 36 | ./venv/Scripts/activate
 37 | uv pip install -e .
 38 | ```
 39 | 
 40 | # JSON for the Claude App or mcphost config- set your paths according to how you installed coq and the repository.
 41 | 
 42 | ```json
 43 |     "mcp-rocq": {
 44 |       "command": "uv",
 45 |       "args": [
 46 |         "--directory",
 47 |         "F:/GithubRepos/mcp-rocq",
 48 |         "run",
 49 |         "mcp_rocq",
 50 |         "--coq-path",
 51 |         "F:/Coq-Platform~8.19~2024.10/bin/coqtop.exe",
 52 |         "--lib-path",
 53 |         "F:/Coq-Platform~8.19~2024.10/lib/coq"
 54 |       ]
 55 |     },
 56 | ```
 57 | 
 58 | 
 59 | # This might work- I got it going with uv and most of this could be hallucinatory though:
 60 | 
 61 | 3. Install dependencies:
 62 | 
 63 | ```bash
 64 | pip install -r requirements.txt
 65 | ```
 66 | 
 67 | ## Usage
 68 | 
 69 | The server provides three main capabilities:
 70 | 
 71 | ### 1. Type Checking
 72 | 
 73 | ```python
 74 | {
 75 |     "tool": "type_check",
 76 |     "args": {
 77 |         "term": "<term to check>",
 78 |         "expected_type": "<type>",
 79 |         "context": ["relevant", "modules"] 
 80 |     }
 81 | }
 82 | ```
 83 | 
 84 | ### 2. Inductive Types
 85 | 
 86 | ```python
 87 | {
 88 |     "tool": "define_inductive",
 89 |     "args": {
 90 |         "name": "Tree",
 91 |         "constructors": [
 92 |             "Leaf : Tree",
 93 |             "Node : Tree -> Tree -> Tree"
 94 |         ],
 95 |         "verify": true
 96 |     }
 97 | }
 98 | ```
 99 | 
100 | ### 3. Property Proving
101 | 
102 | ```python
103 | {
104 |     "tool": "prove_property",
105 |     "args": {
106 |         "property": "<statement>",
107 |         "tactics": ["<tactic sequence>"],
108 |         "use_automation": true
109 |     }
110 | }
111 | ```
112 | 
113 | ## License
114 | 
115 | This project is licensed under the MIT License - see the LICENSE file for details.
116 | 
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
1 | pytest==8.0.0
2 | pytest-asyncio==0.23.5
3 | black==24.1.0
4 | mypy==1.8.0
5 | 
```
--------------------------------------------------------------------------------
/src/mcp_rocq/__init__.py:
--------------------------------------------------------------------------------
```python
1 | """MCP server for advanced logical reasoning using Coq"""
2 | __version__ = "0.1.0"
```
--------------------------------------------------------------------------------
/src/mcp_rocq/cli.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | Command-line interface for MCP RoCQ server
 3 | """
 4 | from pathlib import Path
 5 | from .__main__ import run
 6 | 
 7 | def main():
 8 |     """Entry point for console script"""
 9 |     run()
10 | 
11 | if __name__ == '__main__':
12 |     main()
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/__init__.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | Handler modules for Coq integration
 3 | """
 4 | from .coq_session import CoqSession
 5 | from .type_checker import TypeChecker
 6 | from .inductive_types import InductiveTypeHandler
 7 | from .prover import ProofHandler
 8 | 
 9 | __all__ = [
10 |     'CoqSession',
11 |     'TypeChecker',
12 |     'InductiveTypeHandler', 
13 |     'ProofHandler'
14 | ]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
 1 | [project]
 2 | name = "mcp_rocq"  
 3 | version = "0.1.0"
 4 | description = "MCP server for advanced logical reasoning using Coq"
 5 | requires-python = ">=3.12"
 6 | dependencies = [
 7 |     "mcp>=1.0.0",
 8 |     "pydantic>=2.0.0"
 9 | ]
10 | 
11 | [project.scripts]
12 | mcp_rocq = "mcp_rocq.server:cli"
13 | 
14 | [build-system]
15 | requires = ["hatchling"]
16 | build-backend = "hatchling.build"
17 | 
18 | [tool.hatch.build.targets.wheel]
19 | packages = ["src/mcp_rocq"]
20 | 
21 | [tool.pytest.ini_options]
22 | testpaths = ["tests"]
```
--------------------------------------------------------------------------------
/src/mcp_rocq/__main__.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | Entry point for MCP RoCQ server
 3 | """
 4 | import asyncio
 5 | import argparse
 6 | from pathlib import Path
 7 | from .server import serve
 8 | 
 9 | def run():
10 |     parser = argparse.ArgumentParser(description='MCP RoCQ Server')
11 |     parser.add_argument('--coq-path', type=str,
12 |                     # example "F:/Coq-Platform~8.19~2024.10/bin/coqtop.exe",
13 |                       help='Path to coqtop executable')
14 |     parser.add_argument('--lib-path', type=str,
15 |                     # example "F:/Coq-Platform~8.19~2024.10/lib/coq", 
16 |                       help='Path to Coq library directory')
17 |     
18 |     args = parser.parse_args()
19 |     asyncio.run(serve(Path(args.coq_path), Path(args.lib_path)))
20 | 
21 | if __name__ == '__main__':
22 |     run()
```
--------------------------------------------------------------------------------
/src/mcp_rocq/parsers/xml_parser.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | XML Parser for Coq responses.
 3 | Handles structured communication with Coq's XML protocol.
 4 | """
 5 | from typing import Dict
 6 | from xml.etree import ElementTree as ET
 7 | 
 8 | class CoqXMLParser:
 9 |     @staticmethod
10 |     def parse_response(xml_str: str) -> Dict:
11 |         """
12 |         Parse Coq's XML protocol response into structured data
13 |         
14 |         Args:
15 |             xml_str: Raw XML response from Coq
16 |             
17 |         Returns:
18 |             Dict containing parsed response with status and message
19 |         """
20 |         try:
21 |             root = ET.fromstring(xml_str)
22 |             if root.tag == "value":
23 |                 return {
24 |                     "status": "success",
25 |                     "message": root.text,
26 |                     "response_type": root.get("val")
27 |                 }
28 |             elif root.tag == "feedback":
29 |                 return {
30 |                     "status": "feedback",
31 |                     "message": root.find("message").text,
32 |                     "level": root.get("object")
33 |                 }
34 |             elif root.tag == "error":
35 |                 return {
36 |                     "status": "error", 
37 |                     "message": root.find("message").text
38 |                 }
39 |             
40 |             return {"status": "unknown", "message": xml_str}
41 |             
42 |         except ET.ParseError as e:
43 |             return {"status": "error", "message": f"XML parse error: {e}"}
44 | 
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/type_checker.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | Type checking handler for Coq integration
 3 | Supports dependent type checking and verification
 4 | """
 5 | from typing import Optional, Dict, List
 6 | import logging
 7 | 
 8 | from .coq_session import CoqSession
 9 | 
10 | logger = logging.getLogger(__name__)
11 | 
12 | class TypeChecker:
13 |     def __init__(self, coq: CoqSession):
14 |         self.coq = coq
15 |         
16 |     async def check_type(self, term: str, expected_type: Optional[str] = None,
17 |                         context: Optional[List[str]] = None) -> Dict:
18 |         """
19 |         Check types using Coq's dependent type system
20 |         
21 |         Args:
22 |             term: Term to type check
23 |             expected_type: Optional expected type to verify against
24 |             context: Optional list of modules to import
25 |             
26 |         Returns:
27 |             Dict containing type checking result
28 |         """
29 |         try:
30 |             # Import context modules if provided
31 |             if context:
32 |                 for module in context:
33 |                     result = await self.coq.send_command(f'Require Import {module}.')
34 |                     if not result:
35 |                         return result.parsed
36 |             
37 |             # First check if term is well-formed
38 |             check_cmd = f'Check ({term}).'
39 |             result = await self.coq.send_command(check_cmd)
40 |             if not result:
41 |                 return result.parsed
42 |                 
43 |             # Verify against expected type if provided
44 |             if expected_type:
45 |                 verify_cmd = f'Check ({term}) : {expected_type}.'
46 |                 result = await self.coq.send_command(verify_cmd)
47 |                 
48 |             return result.parsed
49 |             
50 |         except Exception as e:
51 |             logger.error(f"Type check error: {e}")
52 |             return {"status": "error", "message": str(e)}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/inductive_types.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | Inductive type definition and verification handler
 3 | Supports creating and verifying inductive data types in Coq
 4 | """
 5 | from typing import Dict, List
 6 | import logging
 7 | 
 8 | from .coq_session import CoqSession
 9 | 
10 | logger = logging.getLogger(__name__)
11 | 
12 | class InductiveTypeHandler:
13 |     def __init__(self, coq: CoqSession):
14 |         self.coq = coq
15 |         
16 |     async def define_inductive(self, name: str, constructors: List[str],
17 |                              verify: bool = False) -> Dict:
18 |         """
19 |         Define and optionally verify an inductive type
20 |         
21 |         Args:
22 |             name: Name of the inductive type
23 |             constructors: List of constructor definitions
24 |             verify: Whether to verify key properties
25 |             
26 |         Returns:
27 |             Dict containing definition result
28 |         """
29 |         try:
30 |             # Build inductive definition
31 |             def_str = f"Inductive {name} : Type :=\n"
32 |             def_str += " | ".join(constructors) + "."
33 |             
34 |             result = await self.coq.send_command(def_str)
35 |             if not result:
36 |                 return result.parsed
37 |                 
38 |             if verify:
39 |                 # Verify constructor properties
40 |                 for cons in constructors:
41 |                     cons_name = cons.split(":")[0].strip()
42 |                     
43 |                     # Check constructor is injective
44 |                     verify_cmd = (f"Lemma {cons_name}_injective: "
45 |                                 f"forall x y, {cons_name} x = {cons_name} y -> x = y.")
46 |                     result = await self.coq.send_command(verify_cmd)
47 |                     if not result:
48 |                         return result.parsed
49 |                         
50 |                     result = await self.coq.send_command("intros; injection H; auto.")
51 |                     if not result:
52 |                         return result.parsed
53 |                         
54 |                     result = await self.coq.send_command("Qed.")
55 |                     if not result:
56 |                         return result.parsed
57 |                     
58 |                     # Could add more verification here like:
59 |                     # - Discriminative properties between constructors 
60 |                     # - Structural induction principles
61 |                     
62 |             return {
63 |                 "status": "success",
64 |                 "message": f"Defined and verified {name}"
65 |             }
66 |             
67 |         except Exception as e:
68 |             logger.error(f"Inductive definition error: {e}")
69 |             return {"status": "error", "message": str(e)}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/prover.py:
--------------------------------------------------------------------------------
```python
 1 | """
 2 | Property proving handler with custom tactics support
 3 | Provides automated and custom tactic-based proving capabilities
 4 | """
 5 | from typing import Dict, List, Optional
 6 | import logging
 7 | 
 8 | from .coq_session import CoqSession
 9 | 
10 | logger = logging.getLogger(__name__)
11 | 
12 | class ProofHandler:
13 |     def __init__(self, coq: CoqSession):
14 |         self.coq = coq
15 |         
16 |     async def prove_property(self, property_stmt: str, 
17 |                            tactics: Optional[List[str]] = None,
18 |                            use_automation: bool = True) -> Dict:
19 |         """
20 |         Prove properties using custom tactics and automation
21 |         
22 |         Args:
23 |             property_stmt: Property to prove
24 |             tactics: Optional list of custom tactics
25 |             use_automation: Whether to try automated proving
26 |             
27 |         Returns:
28 |             Dict containing proof result
29 |         """
30 |         try:
31 |             # Start theorem
32 |             result = await self.coq.send_command(f'Theorem property: {property_stmt}.')
33 |             if not result:
34 |                 return result.parsed
35 |             
36 |             # Apply custom tactics if provided
37 |             if tactics:
38 |                 for tactic in tactics:
39 |                     result = await self.coq.send_command(tactic)
40 |                     if not result:
41 |                         await self.coq.send_command("Abort.")
42 |                         return result.parsed
43 |                     
44 |             if use_automation:
45 |                 # Try automated proving strategies
46 |                 auto_tactics = [
47 |                     "auto with *.",
48 |                     "firstorder.",
49 |                     "tauto.",
50 |                     "intuition auto.",
51 |                     "congruence."
52 |                 ]
53 |                 
54 |                 for tactic in auto_tactics:
55 |                     result = await self.coq.send_command(tactic)
56 |                     if "No more subgoals" in result.message:
57 |                         break
58 |                         
59 |                     # If automation fails, try next tactic
60 |                     if not result:
61 |                         continue
62 |                         
63 |             # Check if there are remaining subgoals
64 |             result = await self.get_proof_state()
65 |             if "no more subgoals" in result.message.lower():
66 |                 # Complete proof
67 |                 result = await self.coq.send_command("Qed.")
68 |                 return result.parsed
69 |             else:
70 |                 # Abort incomplete proof
71 |                 await self.coq.send_command("Abort.")
72 |                 return {
73 |                     "status": "error", 
74 |                     "message": "Could not complete proof automatically"
75 |                 }
76 |             
77 |         except Exception as e:
78 |             logger.error(f"Proof error: {e}")
79 |             await self.coq.send_command("Abort.")  # Clean up
80 |             return {"status": "error", "message": str(e)}
81 |             
82 |     async def get_proof_state(self) -> Dict:
83 |         """Get current proof state and goals"""
84 |         try:
85 |             result = await self.coq.send_command("Show.")
86 |             return result.parsed
87 |         except Exception as e:
88 |             logger.error(f"Get proof state error: {e}")
89 |             return {"status": "error", "message": str(e)}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/coq_session.py:
--------------------------------------------------------------------------------
```python
  1 | """Manages a Coq session and XML protocol communication"""
  2 | import asyncio
  3 | import logging
  4 | from pathlib import Path
  5 | import subprocess
  6 | from typing import Optional, Dict
  7 | from xml.etree import ElementTree as ET
  8 | 
  9 | logger = logging.getLogger(__name__)
 10 | 
 11 | class CoqResponse:
 12 |     """Structured response from Coq"""
 13 |     def __init__(self, xml_str: str):
 14 |         self.raw = xml_str
 15 |         self.parsed = self._parse_xml(xml_str)
 16 |         
 17 |     def _parse_xml(self, xml_str: str) -> Dict:
 18 |         """Parse Coq's XML response"""
 19 |         try:
 20 |             root = ET.fromstring(xml_str)
 21 |             if root.tag == "value":
 22 |                 return {
 23 |                     "status": "success",
 24 |                     "message": root.text,
 25 |                     "response_type": root.get("val")
 26 |                 }
 27 |             elif root.tag == "feedback":
 28 |                 return {
 29 |                     "status": "feedback",
 30 |                     "message": root.find("message").text,
 31 |                     "level": root.get("object")  
 32 |                 }
 33 |             elif root.tag == "error":
 34 |                 return {
 35 |                     "status": "error",
 36 |                     "message": root.find("message").text
 37 |                 }
 38 |             return {"status": "unknown", "message": xml_str}
 39 |             
 40 |         except ET.ParseError as e:
 41 |             return {"status": "error", "message": f"XML parse error: {e}"}
 42 | 
 43 |     @property
 44 |     def status(self) -> str:
 45 |         return self.parsed["status"]
 46 |         
 47 |     @property
 48 |     def message(self) -> str:
 49 |         return self.parsed["message"]
 50 |         
 51 |     def __bool__(self) -> bool:
 52 |         return self.status == "success"
 53 | 
 54 | class CoqSession:
 55 |     """Manages interaction with a Coq process"""
 56 |     
 57 |     def __init__(self, coq_path: Path, lib_path: Path):
 58 |         self.coq_path = coq_path
 59 |         self.lib_path = lib_path
 60 |         self.process: Optional[subprocess.Popen] = None
 61 |         self._start_coq()
 62 | 
 63 |     def _start_coq(self):
 64 |         """Start Coq process with XML protocol"""
 65 |         try:
 66 |             cmd = [
 67 |                 str(self.coq_path),
 68 |                 "-xml",  # Use XML protocol
 69 |                 "-Q", str(self.lib_path), "Coq"
 70 |             ]
 71 |             
 72 |             self.process = subprocess.Popen(
 73 |                 cmd,
 74 |                 stdin=subprocess.PIPE,
 75 |                 stdout=subprocess.PIPE,
 76 |                 stderr=subprocess.PIPE,
 77 |                 text=True
 78 |             )
 79 |             logger.info("Coq process started successfully")
 80 |             
 81 |         except Exception as e:
 82 |             logger.error(f"Failed to start Coq: {e}")
 83 |             raise
 84 | 
 85 |     async def send_command(self, cmd: str) -> CoqResponse:
 86 |         """Send command to Coq and get response"""
 87 |         if not self.process:
 88 |             raise RuntimeError("Coq process not running")
 89 | 
 90 |         try:
 91 |             # Send command
 92 |             self.process.stdin.write(cmd + "\n")
 93 |             self.process.stdin.flush()
 94 |             
 95 |             # Get response until </value> tag
 96 |             response = []
 97 |             while True:
 98 |                 line = self.process.stdout.readline()
 99 |                 if not line:
100 |                     break
101 |                 response.append(line)
102 |                 if "</value>" in line:
103 |                     break
104 |                     
105 |             return CoqResponse("".join(response))
106 |             
107 |         except Exception as e:
108 |             logger.error(f"Command error: {e}")
109 |             return CoqResponse(f'<error><message>{str(e)}</message></error>')
110 | 
111 |     async def close(self):
112 |         """Clean up Coq process"""
113 |         if self.process:
114 |             try:
115 |                 self.process.terminate()
116 |                 self.process.wait(timeout=5)
117 |             except Exception as e:
118 |                 logger.error(f"Error closing Coq process: {e}")
119 | 
```
--------------------------------------------------------------------------------
/src/mcp_rocq/patch.py:
--------------------------------------------------------------------------------
```python
  1 | """Manages a Coq session and XML protocol communication"""
  2 | import asyncio
  3 | import logging
  4 | from pathlib import Path
  5 | import subprocess
  6 | import xml.etree.ElementTree as ET
  7 | from typing import Optional, Dict, Union, List
  8 | 
  9 | logger = logging.getLogger(__name__)
 10 | 
 11 | class CoqCommand:
 12 |     """XML formatted Coq command"""
 13 |     @staticmethod
 14 |     def init() -> str:
 15 |         return '<call val="init"><option val="none"/></call>'
 16 |         
 17 |     @staticmethod
 18 |     def interp(cmd: str) -> str:
 19 |         escaped_cmd = cmd.replace('"', '"')
 20 |         return f'''
 21 |         <call val="interp">
 22 |             <pair>
 23 |                 <string>{escaped_cmd}</string>
 24 |                 <union val="in_script"><unit/></union>
 25 |             </pair>
 26 |         </call>
 27 |         '''.strip()
 28 |         
 29 |     @staticmethod
 30 |     def check(term: str) -> str:
 31 |         return CoqCommand.interp(f"Check {term}.")
 32 |         
 33 |     @staticmethod
 34 |     def require(module: str) -> str:
 35 |         return CoqCommand.interp(f"Require Import {module}.")
 36 | 
 37 | class CoqResponse:
 38 |     """Structured response from Coq's XML protocol"""
 39 |     def __init__(self, xml_str: str):
 40 |         self.raw = xml_str
 41 |         self.parsed = self._parse_xml(xml_str)
 42 |         
 43 |     def _parse_xml(self, xml_str: str) -> Dict:
 44 |         try:
 45 |             root = ET.fromstring(xml_str)
 46 |             
 47 |             # Handle different response types
 48 |             if root.tag == "value":
 49 |                 val_type = root.get("val", "")
 50 |                 if val_type == "good":
 51 |                     # Success response
 52 |                     return {
 53 |                         "status": "success",
 54 |                         "message": self._extract_message(root),
 55 |                         "response_type": val_type
 56 |                     }
 57 |                 elif val_type == "fail":
 58 |                     # Error response
 59 |                     return {
 60 |                         "status": "error",
 61 |                         "message": self._extract_message(root),
 62 |                         "error_type": root.find(".//string").get("val", "unknown")
 63 |                     }
 64 |                     
 65 |             elif root.tag == "feedback":
 66 |                 # Feedback message
 67 |                 return {
 68 |                     "status": "feedback",
 69 |                     "level": root.get("object", "info"),
 70 |                     "message": self._extract_message(root.find("message"))
 71 |                 }
 72 |                 
 73 |             return {"status": "unknown", "message": xml_str}
 74 |             
 75 |         except ET.ParseError as e:
 76 |             return {"status": "error", "message": f"XML parse error: {e}"}
 77 |             
 78 |     def _extract_message(self, elem: ET.Element) -> str:
 79 |         """Extract message content from XML element"""
 80 |         if elem is None:
 81 |             return ""
 82 |         if elem.text:
 83 |             return elem.text.strip()
 84 |         msg_elem = elem.find(".//string")
 85 |         if msg_elem is not None and msg_elem.text:
 86 |             return msg_elem.text.strip()
 87 |         return ""
 88 | 
 89 |     @property
 90 |     def status(self) -> str:
 91 |         return self.parsed["status"]
 92 |         
 93 |     @property
 94 |     def message(self) -> str:
 95 |         return self.parsed["message"]
 96 |         
 97 |     def __bool__(self) -> bool:
 98 |         return self.status == "success"
 99 | 
100 | class CoqSession:
101 |     """Manages interaction with a Coq process using XML protocol"""
102 |     
103 |     def __init__(self, coq_path: Path, lib_path: Path):
104 |         self.coq_path = coq_path
105 |         self.lib_path = lib_path
106 |         self.process: Optional[subprocess.Popen] = None
107 |         self._start_coq()
108 |         self._init_session()
109 | 
110 |     def _start_coq(self):
111 |         """Start Coq process with XML protocol"""
112 |         try:
113 |             cmd = [
114 |                 str(self.coq_path),
115 |                 "-xml",  # Use XML protocol
116 |                 "-Q", str(self.lib_path), "Coq"
117 |             ]
118 |             
119 |             self.process = subprocess.Popen(
120 |                 cmd,
121 |                 stdin=subprocess.PIPE,
122 |                 stdout=subprocess.PIPE,
123 |                 stderr=subprocess.PIPE,
124 |                 text=True
125 |             )
126 |             logger.info("Coq process started successfully")
127 |             
128 |             # Read initial banner
129 |             self._read_until_prompt()
130 |             
131 |         except Exception as e:
132 |             logger.error(f"Failed to start Coq: {e}")
133 |             raise
134 |             
135 |     def _init_session(self):
136 |         """Initialize Coq session with XML protocol"""
137 |         init_cmd = CoqCommand.init()
138 |         response = self._send_raw(init_cmd)
139 |         if not response:
140 |             raise RuntimeError(f"Failed to initialize Coq session: {response.message}")
141 |             
142 |     def _read_until_prompt(self) -> List[str]:
143 |         """Read output until we get a complete XML response"""
144 |         response = []
145 |         depth = 0
146 |         
147 |         while True:
148 |             line = self.process.stdout.readline()
149 |             if not line:
150 |                 break
151 |                 
152 |             response.append(line)
153 |             
154 |             # Track XML element depth
155 |             depth += line.count("<") - line.count("</")
156 |             if depth == 0 and response:
157 |                 break
158 |                 
159 |         return response
160 | 
161 |     async def send_command(self, cmd: str) -> CoqResponse:
162 |         """Send command to Coq and get response"""
163 |         if not self.process:
164 |             raise RuntimeError("Coq process not running")
165 |             
166 |         xml_cmd = CoqCommand.interp(cmd)
167 |         return self._send_raw(xml_cmd)
168 |         
169 |     def _send_raw(self, xml: str) -> CoqResponse:
170 |         """Send raw XML command and get response"""
171 |         try:
172 |             self.process.stdin.write(xml + "\n")
173 |             self.process.stdin.flush()
174 |             
175 |             response = self._read_until_prompt()
176 |             return CoqResponse("".join(response))
177 |             
178 |         except Exception as e:
179 |             logger.error(f"Command error: {e}")
180 |             return CoqResponse(f'<value val="fail"><string>Error: {str(e)}</string></value>')
181 | 
182 |     async def close(self):
183 |         """Clean up Coq process"""
184 |         if self.process:
185 |             try:
186 |                 self.process.terminate()
187 |                 await asyncio.sleep(0.1)  # Give process time to terminate
188 |                 self.process.wait(timeout=5)
189 |             except Exception as e:
190 |                 logger.error(f"Error closing Coq process: {e}")
191 |                 if self.process:
192 |                     self.process.kill()  # Force kill if needed
```
--------------------------------------------------------------------------------
/src/mcp_rocq/server.py:
--------------------------------------------------------------------------------
```python
  1 | """
  2 | MCP Server for Coq Integration
  3 | Provides advanced logical reasoning capabilities through Coq
  4 | """
  5 | import asyncio
  6 | import logging
  7 | import argparse
  8 | from pathlib import Path
  9 | from typing import Any, List, Dict
 10 | 
 11 | from mcp.server import Server, NotificationOptions
 12 | from mcp.server.models import InitializationOptions
 13 | import mcp.types as types
 14 | import mcp.server.stdio
 15 | 
 16 | from .handlers.coq_session import CoqSession
 17 | from .handlers.type_checker import TypeChecker
 18 | from .handlers.inductive_types import InductiveTypeHandler
 19 | from .handlers.prover import ProofHandler
 20 | 
 21 | # Set up logging
 22 | logging.basicConfig(level=logging.DEBUG)
 23 | logger = logging.getLogger('mcp_rocq')
 24 | 
 25 | class RoCQServer(Server):
 26 |     def __init__(self, coq_path: Path, lib_path: Path):
 27 |         super().__init__("rocq")
 28 |         
 29 |         # Initialize Coq session
 30 |         self.coq = CoqSession(coq_path, lib_path)
 31 |         
 32 |         # Initialize handlers
 33 |         self.type_checker = TypeChecker(self.coq)
 34 |         self.inductive_handler = InductiveTypeHandler(self.coq)
 35 |         self.proof_handler = ProofHandler(self.coq)
 36 | 
 37 |     async def cleanup(self):
 38 |         """Clean up resources"""
 39 |         if self.coq:
 40 |             await self.coq.close()
 41 |         await super().cleanup()
 42 | 
 43 | async def main(coq_path: Path, lib_path: Path):
 44 |     """Start the RoCQ server"""
 45 |     logger.info(f"Starting RoCQ Server with Coq at: {coq_path}")
 46 | 
 47 |     server = RoCQServer(coq_path, lib_path)
 48 | 
 49 |     @server.list_tools()
 50 |     async def handle_list_tools() -> list[types.Tool]:
 51 |         """List available tools"""
 52 |         return [
 53 |             types.Tool(
 54 |                 name="type_check",
 55 |                 description="Check type of a term with optional expected type", 
 56 |                 inputSchema={
 57 |                     "type": "object",
 58 |                     "properties": {
 59 |                         "term": {
 60 |                             "type": "string",
 61 |                             "description": "Term to type check"
 62 |                         },
 63 |                         "expected_type": {
 64 |                             "type": "string",
 65 |                             "description": "Optional expected type"
 66 |                         },
 67 |                         "context": {
 68 |                             "type": "array",
 69 |                             "items": {"type": "string"},
 70 |                             "description": "Optional list of modules to import"
 71 |                         }
 72 |                     },
 73 |                     "required": ["term"]
 74 |                 }
 75 |             ),
 76 |             types.Tool(
 77 |                 name="define_inductive",
 78 |                 description="Define and verify an inductive type",
 79 |                 inputSchema={
 80 |                     "type": "object",
 81 |                     "properties": {
 82 |                         "name": {
 83 |                             "type": "string",
 84 |                             "description": "Name of the inductive type"
 85 |                         },
 86 |                         "constructors": {
 87 |                             "type": "array",
 88 |                             "items": {"type": "string"},
 89 |                             "description": "List of constructor definitions"
 90 |                         },
 91 |                         "verify": {
 92 |                             "type": "boolean",
 93 |                             "description": "Whether to verify key properties",
 94 |                             "default": False
 95 |                         }
 96 |                     },
 97 |                     "required": ["name", "constructors"]
 98 |                 }
 99 |             ),
100 |             types.Tool(
101 |                 name="prove_property",
102 |                 description="Prove a logical property",
103 |                 inputSchema={
104 |                     "type": "object",
105 |                     "properties": {
106 |                         "property_stmt": {
107 |                             "type": "string",
108 |                             "description": "Property to prove"
109 |                         },
110 |                         "tactics": {
111 |                             "type": "array",
112 |                             "items": {"type": "string"},
113 |                             "description": "Optional list of tactics to apply"
114 |                         },
115 |                         "use_automation": {
116 |                             "type": "boolean",
117 |                             "description": "Whether to try automated proving",
118 |                             "default": True
119 |                         }
120 |                     },
121 |                     "required": ["property_stmt"]
122 |                 }
123 |             )
124 |         ]
125 | 
126 |     @server.call_tool()
127 |     async def handle_call_tool(name: str,
128 |                              arguments: dict[str, Any] | None
129 |                              ) -> list[types.TextContent | types.ImageContent]:
130 |         """Handle tool execution requests"""
131 |         try:
132 |             if name == "type_check":
133 |                 result = await server.type_checker.check_type(
134 |                     term=arguments["term"],
135 |                     expected_type=arguments.get("expected_type"),
136 |                     context=arguments.get("context")
137 |                 )
138 |                 return [types.TextContent(type="text", text=str(result))]
139 | 
140 |             elif name == "define_inductive":
141 |                 result = await server.inductive_handler.define_inductive(
142 |                     name=arguments["name"],
143 |                     constructors=arguments["constructors"],
144 |                     verify=arguments.get("verify", False)
145 |                 )
146 |                 return [types.TextContent(type="text", text=str(result))]
147 | 
148 |             elif name == "prove_property":
149 |                 result = await server.proof_handler.prove_property(
150 |                     property_stmt=arguments["property_stmt"],
151 |                     tactics=arguments.get("tactics"),
152 |                     use_automation=arguments.get("use_automation", True)
153 |                 )
154 |                 return [types.TextContent(type="text", text=str(result))]
155 | 
156 |             else:
157 |                 raise ValueError(f"Unknown tool: {name}")
158 | 
159 |         except Exception as e:
160 |             logger.error(f"Tool error: {e}")
161 |             return [types.TextContent(type="text", text=f"Error: {str(e)}")]
162 | 
163 |     async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
164 |         logger.info("Server running with stdio transport")
165 | 
166 |         await server.run(
167 |             read_stream,
168 |             write_stream,
169 |             InitializationOptions(
170 |                 server_name="rocq",
171 |                 server_version="0.1.0",
172 |                 capabilities=server.get_capabilities(
173 |                     notification_options=NotificationOptions(),
174 |                     experimental_capabilities={}
175 |                 ),
176 |             ),
177 |         )
178 | 
179 | def cli():
180 |     """CLI entry point"""
181 |     parser = argparse.ArgumentParser(description='MCP RoCQ Server')
182 |     parser.add_argument('--coq-path', type=str,
183 |                       default="F:/Coq-Platform~8.19~2024.10/bin/coqtop.exe",
184 |                       help='Path to coqtop executable')
185 |     parser.add_argument('--lib-path', type=str,
186 |                       default="F:/Coq-Platform~8.19~2024.10/lib/coq",
187 |                       help='Path to Coq library directory')
188 |     args = parser.parse_args()
189 | 
190 |     try:
191 |         asyncio.run(main(Path(args.coq_path), Path(args.lib_path)))
192 |     except KeyboardInterrupt:
193 |         logger.info("Server stopped")
194 | 
195 | if __name__ == "__main__":
196 |     cli()
```