# Directory Structure
```
├── .gitignore
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│ └── mcp_rocq
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli.py
│ ├── handlers
│ │ ├── __init__.py
│ │ ├── coq_session.py
│ │ ├── inductive_types.py
│ │ ├── prover.py
│ │ └── type_checker.py
│ ├── parsers
│ │ └── xml_parser.py
│ ├── patch.py
│ └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python virtual environments
.venv/
venv/
ENV/
# Python cache files
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
dist/
build/
*.egg-info/
*.egg
# Downloaded websites and their assets (project-specific)
downloads/
website_library/
# Test cache
.pytest_cache/
.coverage
htmlcov/
# IDE settings
.vscode/
.idea/
*.swp
*.swo
# Environment variables
.env
.env.local
# Operating System
.DS_Store
Thumbs.db
# uv package manager
.uv/
tests/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# MCP-RoCQ (Coq Reasoning Server)
# Currently shows tools but Claude can't use it properly for some reason- invalid syntax generally seems the issue but there could be something else.
There may be a better way to set this up with the coq cli or something.
Anyone want to try and fix it who knows what they are doing would be great.
MCP-RoCQ is a Model Context Protocol server that provides advanced logical reasoning capabilities through integration with the Coq proof assistant. It enables automated dependent type checking, inductive type definitions, and property proving with both custom tactics and automation.
## Features
- **Automated Dependent Type Checking**: Verify terms against complex dependent types
- **Inductive Type Definition**: Define and automatically verify custom inductive data types
- **Property Proving**: Prove logical properties using custom tactics and automation
- **XML Protocol Integration**: Reliable structured communication with Coq
- **Rich Error Handling**: Detailed feedback for type errors and failed proofs
## Installation
1. Install the Coq Platform 8.19 (2024.10)
Coq is a formal proof management system. It provides a formal language to write mathematical definitions, executable algorithms and theorems together with an environment for semi-interactive development of machine-checked proofs.
[https://github.com/coq/platform](https://github.com/coq/platform)
2. Clone this repository:
```bash
git clone https://github.com/angrysky56/mcp-rocq.git
```
cd to the repo
```bash
uv venv
./venv/Scripts/activate
uv pip install -e .
```
# JSON for the Claude App or mcphost config- set your paths according to how you installed coq and the repository.
```json
"mcp-rocq": {
"command": "uv",
"args": [
"--directory",
"F:/GithubRepos/mcp-rocq",
"run",
"mcp_rocq",
"--coq-path",
"F:/Coq-Platform~8.19~2024.10/bin/coqtop.exe",
"--lib-path",
"F:/Coq-Platform~8.19~2024.10/lib/coq"
]
},
```
# This might work- I got it going with uv and most of this could be hallucinatory though:
3. Install dependencies:
```bash
pip install -r requirements.txt
```
## Usage
The server provides three main capabilities:
### 1. Type Checking
```python
{
"tool": "type_check",
"args": {
"term": "<term to check>",
"expected_type": "<type>",
"context": ["relevant", "modules"]
}
}
```
### 2. Inductive Types
```python
{
"tool": "define_inductive",
"args": {
"name": "Tree",
"constructors": [
"Leaf : Tree",
"Node : Tree -> Tree -> Tree"
],
"verify": true
}
}
```
### 3. Property Proving
```python
{
"tool": "prove_property",
"args": {
"property": "<statement>",
"tactics": ["<tactic sequence>"],
"use_automation": true
}
}
```
## License
This project is licensed under the MIT License - see the LICENSE file for details.
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
pytest==8.0.0
pytest-asyncio==0.23.5
black==24.1.0
mypy==1.8.0
```
--------------------------------------------------------------------------------
/src/mcp_rocq/__init__.py:
--------------------------------------------------------------------------------
```python
"""MCP server for advanced logical reasoning using Coq"""
__version__ = "0.1.0"
```
--------------------------------------------------------------------------------
/src/mcp_rocq/cli.py:
--------------------------------------------------------------------------------
```python
"""
Command-line interface for MCP RoCQ server
"""
from pathlib import Path
from .__main__ import run
def main():
"""Entry point for console script"""
run()
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Handler modules for Coq integration
"""
from .coq_session import CoqSession
from .type_checker import TypeChecker
from .inductive_types import InductiveTypeHandler
from .prover import ProofHandler
__all__ = [
'CoqSession',
'TypeChecker',
'InductiveTypeHandler',
'ProofHandler'
]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp_rocq"
version = "0.1.0"
description = "MCP server for advanced logical reasoning using Coq"
requires-python = ">=3.12"
dependencies = [
"mcp>=1.0.0",
"pydantic>=2.0.0"
]
[project.scripts]
mcp_rocq = "mcp_rocq.server:cli"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_rocq"]
[tool.pytest.ini_options]
testpaths = ["tests"]
```
--------------------------------------------------------------------------------
/src/mcp_rocq/__main__.py:
--------------------------------------------------------------------------------
```python
"""
Entry point for MCP RoCQ server
"""
import asyncio
import argparse
from pathlib import Path
from .server import serve
def run():
parser = argparse.ArgumentParser(description='MCP RoCQ Server')
parser.add_argument('--coq-path', type=str,
# example "F:/Coq-Platform~8.19~2024.10/bin/coqtop.exe",
help='Path to coqtop executable')
parser.add_argument('--lib-path', type=str,
# example "F:/Coq-Platform~8.19~2024.10/lib/coq",
help='Path to Coq library directory')
args = parser.parse_args()
asyncio.run(serve(Path(args.coq_path), Path(args.lib_path)))
if __name__ == '__main__':
run()
```
--------------------------------------------------------------------------------
/src/mcp_rocq/parsers/xml_parser.py:
--------------------------------------------------------------------------------
```python
"""
XML Parser for Coq responses.
Handles structured communication with Coq's XML protocol.
"""
from typing import Dict
from xml.etree import ElementTree as ET
class CoqXMLParser:
@staticmethod
def parse_response(xml_str: str) -> Dict:
"""
Parse Coq's XML protocol response into structured data
Args:
xml_str: Raw XML response from Coq
Returns:
Dict containing parsed response with status and message
"""
try:
root = ET.fromstring(xml_str)
if root.tag == "value":
return {
"status": "success",
"message": root.text,
"response_type": root.get("val")
}
elif root.tag == "feedback":
return {
"status": "feedback",
"message": root.find("message").text,
"level": root.get("object")
}
elif root.tag == "error":
return {
"status": "error",
"message": root.find("message").text
}
return {"status": "unknown", "message": xml_str}
except ET.ParseError as e:
return {"status": "error", "message": f"XML parse error: {e}"}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/type_checker.py:
--------------------------------------------------------------------------------
```python
"""
Type checking handler for Coq integration
Supports dependent type checking and verification
"""
from typing import Optional, Dict, List
import logging
from .coq_session import CoqSession
logger = logging.getLogger(__name__)
class TypeChecker:
def __init__(self, coq: CoqSession):
self.coq = coq
async def check_type(self, term: str, expected_type: Optional[str] = None,
context: Optional[List[str]] = None) -> Dict:
"""
Check types using Coq's dependent type system
Args:
term: Term to type check
expected_type: Optional expected type to verify against
context: Optional list of modules to import
Returns:
Dict containing type checking result
"""
try:
# Import context modules if provided
if context:
for module in context:
result = await self.coq.send_command(f'Require Import {module}.')
if not result:
return result.parsed
# First check if term is well-formed
check_cmd = f'Check ({term}).'
result = await self.coq.send_command(check_cmd)
if not result:
return result.parsed
# Verify against expected type if provided
if expected_type:
verify_cmd = f'Check ({term}) : {expected_type}.'
result = await self.coq.send_command(verify_cmd)
return result.parsed
except Exception as e:
logger.error(f"Type check error: {e}")
return {"status": "error", "message": str(e)}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/inductive_types.py:
--------------------------------------------------------------------------------
```python
"""
Inductive type definition and verification handler
Supports creating and verifying inductive data types in Coq
"""
from typing import Dict, List
import logging
from .coq_session import CoqSession
logger = logging.getLogger(__name__)
class InductiveTypeHandler:
def __init__(self, coq: CoqSession):
self.coq = coq
async def define_inductive(self, name: str, constructors: List[str],
verify: bool = False) -> Dict:
"""
Define and optionally verify an inductive type
Args:
name: Name of the inductive type
constructors: List of constructor definitions
verify: Whether to verify key properties
Returns:
Dict containing definition result
"""
try:
# Build inductive definition
def_str = f"Inductive {name} : Type :=\n"
def_str += " | ".join(constructors) + "."
result = await self.coq.send_command(def_str)
if not result:
return result.parsed
if verify:
# Verify constructor properties
for cons in constructors:
cons_name = cons.split(":")[0].strip()
# Check constructor is injective
verify_cmd = (f"Lemma {cons_name}_injective: "
f"forall x y, {cons_name} x = {cons_name} y -> x = y.")
result = await self.coq.send_command(verify_cmd)
if not result:
return result.parsed
result = await self.coq.send_command("intros; injection H; auto.")
if not result:
return result.parsed
result = await self.coq.send_command("Qed.")
if not result:
return result.parsed
# Could add more verification here like:
# - Discriminative properties between constructors
# - Structural induction principles
return {
"status": "success",
"message": f"Defined and verified {name}"
}
except Exception as e:
logger.error(f"Inductive definition error: {e}")
return {"status": "error", "message": str(e)}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/prover.py:
--------------------------------------------------------------------------------
```python
"""
Property proving handler with custom tactics support
Provides automated and custom tactic-based proving capabilities
"""
from typing import Dict, List, Optional
import logging
from .coq_session import CoqSession
logger = logging.getLogger(__name__)
class ProofHandler:
def __init__(self, coq: CoqSession):
self.coq = coq
async def prove_property(self, property_stmt: str,
tactics: Optional[List[str]] = None,
use_automation: bool = True) -> Dict:
"""
Prove properties using custom tactics and automation
Args:
property_stmt: Property to prove
tactics: Optional list of custom tactics
use_automation: Whether to try automated proving
Returns:
Dict containing proof result
"""
try:
# Start theorem
result = await self.coq.send_command(f'Theorem property: {property_stmt}.')
if not result:
return result.parsed
# Apply custom tactics if provided
if tactics:
for tactic in tactics:
result = await self.coq.send_command(tactic)
if not result:
await self.coq.send_command("Abort.")
return result.parsed
if use_automation:
# Try automated proving strategies
auto_tactics = [
"auto with *.",
"firstorder.",
"tauto.",
"intuition auto.",
"congruence."
]
for tactic in auto_tactics:
result = await self.coq.send_command(tactic)
if "No more subgoals" in result.message:
break
# If automation fails, try next tactic
if not result:
continue
# Check if there are remaining subgoals
result = await self.get_proof_state()
if "no more subgoals" in result.message.lower():
# Complete proof
result = await self.coq.send_command("Qed.")
return result.parsed
else:
# Abort incomplete proof
await self.coq.send_command("Abort.")
return {
"status": "error",
"message": "Could not complete proof automatically"
}
except Exception as e:
logger.error(f"Proof error: {e}")
await self.coq.send_command("Abort.") # Clean up
return {"status": "error", "message": str(e)}
async def get_proof_state(self) -> Dict:
"""Get current proof state and goals"""
try:
result = await self.coq.send_command("Show.")
return result.parsed
except Exception as e:
logger.error(f"Get proof state error: {e}")
return {"status": "error", "message": str(e)}
```
--------------------------------------------------------------------------------
/src/mcp_rocq/handlers/coq_session.py:
--------------------------------------------------------------------------------
```python
"""Manages a Coq session and XML protocol communication"""
import asyncio
import logging
from pathlib import Path
import subprocess
from typing import Optional, Dict
from xml.etree import ElementTree as ET
logger = logging.getLogger(__name__)
class CoqResponse:
"""Structured response from Coq"""
def __init__(self, xml_str: str):
self.raw = xml_str
self.parsed = self._parse_xml(xml_str)
def _parse_xml(self, xml_str: str) -> Dict:
"""Parse Coq's XML response"""
try:
root = ET.fromstring(xml_str)
if root.tag == "value":
return {
"status": "success",
"message": root.text,
"response_type": root.get("val")
}
elif root.tag == "feedback":
return {
"status": "feedback",
"message": root.find("message").text,
"level": root.get("object")
}
elif root.tag == "error":
return {
"status": "error",
"message": root.find("message").text
}
return {"status": "unknown", "message": xml_str}
except ET.ParseError as e:
return {"status": "error", "message": f"XML parse error: {e}"}
@property
def status(self) -> str:
return self.parsed["status"]
@property
def message(self) -> str:
return self.parsed["message"]
def __bool__(self) -> bool:
return self.status == "success"
class CoqSession:
"""Manages interaction with a Coq process"""
def __init__(self, coq_path: Path, lib_path: Path):
self.coq_path = coq_path
self.lib_path = lib_path
self.process: Optional[subprocess.Popen] = None
self._start_coq()
def _start_coq(self):
"""Start Coq process with XML protocol"""
try:
cmd = [
str(self.coq_path),
"-xml", # Use XML protocol
"-Q", str(self.lib_path), "Coq"
]
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.info("Coq process started successfully")
except Exception as e:
logger.error(f"Failed to start Coq: {e}")
raise
async def send_command(self, cmd: str) -> CoqResponse:
"""Send command to Coq and get response"""
if not self.process:
raise RuntimeError("Coq process not running")
try:
# Send command
self.process.stdin.write(cmd + "\n")
self.process.stdin.flush()
# Get response until </value> tag
response = []
while True:
line = self.process.stdout.readline()
if not line:
break
response.append(line)
if "</value>" in line:
break
return CoqResponse("".join(response))
except Exception as e:
logger.error(f"Command error: {e}")
return CoqResponse(f'<error><message>{str(e)}</message></error>')
async def close(self):
"""Clean up Coq process"""
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
except Exception as e:
logger.error(f"Error closing Coq process: {e}")
```
--------------------------------------------------------------------------------
/src/mcp_rocq/patch.py:
--------------------------------------------------------------------------------
```python
"""Manages a Coq session and XML protocol communication"""
import asyncio
import logging
from pathlib import Path
import subprocess
import xml.etree.ElementTree as ET
from typing import Optional, Dict, Union, List
logger = logging.getLogger(__name__)
class CoqCommand:
"""XML formatted Coq command"""
@staticmethod
def init() -> str:
return '<call val="init"><option val="none"/></call>'
@staticmethod
def interp(cmd: str) -> str:
escaped_cmd = cmd.replace('"', '"')
return f'''
<call val="interp">
<pair>
<string>{escaped_cmd}</string>
<union val="in_script"><unit/></union>
</pair>
</call>
'''.strip()
@staticmethod
def check(term: str) -> str:
return CoqCommand.interp(f"Check {term}.")
@staticmethod
def require(module: str) -> str:
return CoqCommand.interp(f"Require Import {module}.")
class CoqResponse:
"""Structured response from Coq's XML protocol"""
def __init__(self, xml_str: str):
self.raw = xml_str
self.parsed = self._parse_xml(xml_str)
def _parse_xml(self, xml_str: str) -> Dict:
try:
root = ET.fromstring(xml_str)
# Handle different response types
if root.tag == "value":
val_type = root.get("val", "")
if val_type == "good":
# Success response
return {
"status": "success",
"message": self._extract_message(root),
"response_type": val_type
}
elif val_type == "fail":
# Error response
return {
"status": "error",
"message": self._extract_message(root),
"error_type": root.find(".//string").get("val", "unknown")
}
elif root.tag == "feedback":
# Feedback message
return {
"status": "feedback",
"level": root.get("object", "info"),
"message": self._extract_message(root.find("message"))
}
return {"status": "unknown", "message": xml_str}
except ET.ParseError as e:
return {"status": "error", "message": f"XML parse error: {e}"}
def _extract_message(self, elem: ET.Element) -> str:
"""Extract message content from XML element"""
if elem is None:
return ""
if elem.text:
return elem.text.strip()
msg_elem = elem.find(".//string")
if msg_elem is not None and msg_elem.text:
return msg_elem.text.strip()
return ""
@property
def status(self) -> str:
return self.parsed["status"]
@property
def message(self) -> str:
return self.parsed["message"]
def __bool__(self) -> bool:
return self.status == "success"
class CoqSession:
"""Manages interaction with a Coq process using XML protocol"""
def __init__(self, coq_path: Path, lib_path: Path):
self.coq_path = coq_path
self.lib_path = lib_path
self.process: Optional[subprocess.Popen] = None
self._start_coq()
self._init_session()
def _start_coq(self):
"""Start Coq process with XML protocol"""
try:
cmd = [
str(self.coq_path),
"-xml", # Use XML protocol
"-Q", str(self.lib_path), "Coq"
]
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.info("Coq process started successfully")
# Read initial banner
self._read_until_prompt()
except Exception as e:
logger.error(f"Failed to start Coq: {e}")
raise
def _init_session(self):
"""Initialize Coq session with XML protocol"""
init_cmd = CoqCommand.init()
response = self._send_raw(init_cmd)
if not response:
raise RuntimeError(f"Failed to initialize Coq session: {response.message}")
def _read_until_prompt(self) -> List[str]:
"""Read output until we get a complete XML response"""
response = []
depth = 0
while True:
line = self.process.stdout.readline()
if not line:
break
response.append(line)
# Track XML element depth
depth += line.count("<") - line.count("</")
if depth == 0 and response:
break
return response
async def send_command(self, cmd: str) -> CoqResponse:
"""Send command to Coq and get response"""
if not self.process:
raise RuntimeError("Coq process not running")
xml_cmd = CoqCommand.interp(cmd)
return self._send_raw(xml_cmd)
def _send_raw(self, xml: str) -> CoqResponse:
"""Send raw XML command and get response"""
try:
self.process.stdin.write(xml + "\n")
self.process.stdin.flush()
response = self._read_until_prompt()
return CoqResponse("".join(response))
except Exception as e:
logger.error(f"Command error: {e}")
return CoqResponse(f'<value val="fail"><string>Error: {str(e)}</string></value>')
async def close(self):
"""Clean up Coq process"""
if self.process:
try:
self.process.terminate()
await asyncio.sleep(0.1) # Give process time to terminate
self.process.wait(timeout=5)
except Exception as e:
logger.error(f"Error closing Coq process: {e}")
if self.process:
self.process.kill() # Force kill if needed
```
--------------------------------------------------------------------------------
/src/mcp_rocq/server.py:
--------------------------------------------------------------------------------
```python
"""
MCP Server for Coq Integration
Provides advanced logical reasoning capabilities through Coq
"""
import asyncio
import logging
import argparse
from pathlib import Path
from typing import Any, List, Dict
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.types as types
import mcp.server.stdio
from .handlers.coq_session import CoqSession
from .handlers.type_checker import TypeChecker
from .handlers.inductive_types import InductiveTypeHandler
from .handlers.prover import ProofHandler
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('mcp_rocq')
class RoCQServer(Server):
def __init__(self, coq_path: Path, lib_path: Path):
super().__init__("rocq")
# Initialize Coq session
self.coq = CoqSession(coq_path, lib_path)
# Initialize handlers
self.type_checker = TypeChecker(self.coq)
self.inductive_handler = InductiveTypeHandler(self.coq)
self.proof_handler = ProofHandler(self.coq)
async def cleanup(self):
"""Clean up resources"""
if self.coq:
await self.coq.close()
await super().cleanup()
async def main(coq_path: Path, lib_path: Path):
"""Start the RoCQ server"""
logger.info(f"Starting RoCQ Server with Coq at: {coq_path}")
server = RoCQServer(coq_path, lib_path)
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="type_check",
description="Check type of a term with optional expected type",
inputSchema={
"type": "object",
"properties": {
"term": {
"type": "string",
"description": "Term to type check"
},
"expected_type": {
"type": "string",
"description": "Optional expected type"
},
"context": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of modules to import"
}
},
"required": ["term"]
}
),
types.Tool(
name="define_inductive",
description="Define and verify an inductive type",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the inductive type"
},
"constructors": {
"type": "array",
"items": {"type": "string"},
"description": "List of constructor definitions"
},
"verify": {
"type": "boolean",
"description": "Whether to verify key properties",
"default": False
}
},
"required": ["name", "constructors"]
}
),
types.Tool(
name="prove_property",
description="Prove a logical property",
inputSchema={
"type": "object",
"properties": {
"property_stmt": {
"type": "string",
"description": "Property to prove"
},
"tactics": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of tactics to apply"
},
"use_automation": {
"type": "boolean",
"description": "Whether to try automated proving",
"default": True
}
},
"required": ["property_stmt"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str,
arguments: dict[str, Any] | None
) -> list[types.TextContent | types.ImageContent]:
"""Handle tool execution requests"""
try:
if name == "type_check":
result = await server.type_checker.check_type(
term=arguments["term"],
expected_type=arguments.get("expected_type"),
context=arguments.get("context")
)
return [types.TextContent(type="text", text=str(result))]
elif name == "define_inductive":
result = await server.inductive_handler.define_inductive(
name=arguments["name"],
constructors=arguments["constructors"],
verify=arguments.get("verify", False)
)
return [types.TextContent(type="text", text=str(result))]
elif name == "prove_property":
result = await server.proof_handler.prove_property(
property_stmt=arguments["property_stmt"],
tactics=arguments.get("tactics"),
use_automation=arguments.get("use_automation", True)
)
return [types.TextContent(type="text", text=str(result))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Tool error: {e}")
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="rocq",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
),
),
)
def cli():
"""CLI entry point"""
parser = argparse.ArgumentParser(description='MCP RoCQ Server')
parser.add_argument('--coq-path', type=str,
default="F:/Coq-Platform~8.19~2024.10/bin/coqtop.exe",
help='Path to coqtop executable')
parser.add_argument('--lib-path', type=str,
default="F:/Coq-Platform~8.19~2024.10/lib/coq",
help='Path to Coq library directory')
args = parser.parse_args()
try:
asyncio.run(main(Path(args.coq_path), Path(args.lib_path)))
except KeyboardInterrupt:
logger.info("Server stopped")
if __name__ == "__main__":
cli()
```