# Directory Structure
```
├── .gitignore
├── .python-version
├── autocad_manager.py
├── CAD-mpc简单实现.mp4
├── db_manager.py
├── main.py
├── pyproject.toml
├── README (2).md
├── README.md
├── requirements.txt
├── server.py
├── server.spec
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
Python
pycache/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
.env
.venv
venv/
ENV/
PyInstaller
dist/
build/
Database
*.db
*.sqlite
*.sqlite3
AutoCAD files
*.dwg
*.dxf
*.bak
IDE files
.idea/
.vscode/
*.swp
*.swo
OS specific
.DS_Store
Thumbs.db
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp>=1.2.0
pywin32
comtypes
pyautocad
sqlalchemy
pyinstaller
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
def main():
print("Hello from autocad-mcp-server!")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "autocad-mcp-server"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"comtypes>=1.4.10",
"mcp[cli]>=1.4.1",
"psycopg2-binary>=2.9.10",
"pyautocad>=0.2.0",
"pyinstaller>=6.12.0",
"pywin32>=309",
"sqlalchemy>=2.0.39",
]
```
--------------------------------------------------------------------------------
/db_manager.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy import create_engine, MetaData, Table, Column, inspect
from sqlalchemy.exc import SQLAlchemyError
class DatabaseManager:
def __init__(self, connection_string):
"""初始化数据库连接管理器"""
self.connection_string = connection_string
self.engine = None
self.metadata = None
self.inspector = None
def connect(self):
"""连接到数据库"""
try:
self.engine = create_engine(self.connection_string)
self.metadata = MetaData()
self.metadata.reflect(bind=self.engine)
self.inspector = inspect(self.engine)
return True
except Exception as e:
print(f"数据库连接失败: {str(e)}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.engine:
self.engine.dispose()
def get_all_tables(self):
"""获取所有表名"""
try:
return self.inspector.get_table_names()
except SQLAlchemyError as e:
return f"获取表列表失败: {str(e)}"
def get_table_schema(self, table_name):
"""获取指定表的结构信息"""
try:
if table_name not in self.metadata.tables:
return f"表 '{table_name}' 不存在"
columns = []
for column in self.inspector.get_columns(table_name):
columns.append({
"name": column['name'],
"type": str(column['type']),
"nullable": column['nullable'],
"default": str(column['default']) if column['default'] else None
})
# 获取主键
primary_keys = self.inspector.get_pk_constraint(table_name)
# 获取外键
foreign_keys = []
for fk in self.inspector.get_foreign_keys(table_name):
foreign_keys.append({
"name": fk['name'],
"referred_table": fk['referred_table'],
"referred_columns": fk['referred_columns'],
"constrained_columns": fk['constrained_columns']
})
# 获取索引
indices = []
for index in self.inspector.get_indexes(table_name):
indices.append({
"name": index['name'],
"columns": index['column_names'],
"unique": index['unique']
})
return {
"table_name": table_name,
"columns": columns,
"primary_key": primary_keys,
"foreign_keys": foreign_keys,
"indices": indices
}
except SQLAlchemyError as e:
return f"获取表 '{table_name}' 结构失败: {str(e)}"
def execute_query(self, query, params=None):
"""执行自定义查询"""
try:
with self.engine.connect() as connection:
if params:
result = connection.execute(query, params)
else:
result = connection.execute(query)
# 检查是否是SELECT查询
if result.returns_rows:
columns = result.keys()
rows = [dict(zip(columns, row)) for row in result]
return {"columns": columns, "rows": rows}
else:
return {"affected_rows": result.rowcount}
except SQLAlchemyError as e:
return f"执行查询失败: {str(e)}"
```