# Directory Structure ``` ├── .gitignore ├── .python-version ├── autocad_manager.py ├── CAD-mpc简单实现.mp4 ├── db_manager.py ├── main.py ├── pyproject.toml ├── README (2).md ├── README.md ├── requirements.txt ├── server.py ├── server.spec └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` Python pycache/ *.py[cod] *$py.class *.so .Python env/ build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ *.egg-info/ .installed.cfg *.egg .env .venv venv/ ENV/ PyInstaller dist/ build/ Database *.db *.sqlite *.sqlite3 AutoCAD files *.dwg *.dxf *.bak IDE files .idea/ .vscode/ *.swp *.swo OS specific .DS_Store Thumbs.db ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` mcp>=1.2.0 pywin32 comtypes pyautocad sqlalchemy pyinstaller ``` -------------------------------------------------------------------------------- /main.py: -------------------------------------------------------------------------------- ```python def main(): print("Hello from autocad-mcp-server!") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "autocad-mcp-server" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" dependencies = [ "comtypes>=1.4.10", "mcp[cli]>=1.4.1", "psycopg2-binary>=2.9.10", "pyautocad>=0.2.0", "pyinstaller>=6.12.0", "pywin32>=309", "sqlalchemy>=2.0.39", ] ``` -------------------------------------------------------------------------------- /db_manager.py: -------------------------------------------------------------------------------- ```python from sqlalchemy import create_engine, MetaData, Table, Column, inspect from sqlalchemy.exc import SQLAlchemyError class DatabaseManager: def __init__(self, connection_string): """初始化数据库连接管理器""" self.connection_string = connection_string self.engine = None self.metadata = None self.inspector = None def connect(self): """连接到数据库""" try: self.engine = create_engine(self.connection_string) self.metadata = MetaData() self.metadata.reflect(bind=self.engine) self.inspector = inspect(self.engine) return True except Exception as e: print(f"数据库连接失败: {str(e)}") return False def disconnect(self): """断开数据库连接""" if self.engine: self.engine.dispose() def get_all_tables(self): """获取所有表名""" try: return self.inspector.get_table_names() except SQLAlchemyError as e: return f"获取表列表失败: {str(e)}" def get_table_schema(self, table_name): """获取指定表的结构信息""" try: if table_name not in self.metadata.tables: return f"表 '{table_name}' 不存在" columns = [] for column in self.inspector.get_columns(table_name): columns.append({ "name": column['name'], "type": str(column['type']), "nullable": column['nullable'], "default": str(column['default']) if column['default'] else None }) # 获取主键 primary_keys = self.inspector.get_pk_constraint(table_name) # 获取外键 foreign_keys = [] for fk in self.inspector.get_foreign_keys(table_name): foreign_keys.append({ "name": fk['name'], "referred_table": fk['referred_table'], "referred_columns": fk['referred_columns'], "constrained_columns": fk['constrained_columns'] }) # 获取索引 indices = [] for index in self.inspector.get_indexes(table_name): indices.append({ "name": index['name'], "columns": index['column_names'], "unique": index['unique'] }) return { "table_name": table_name, "columns": columns, "primary_key": primary_keys, "foreign_keys": foreign_keys, "indices": indices } except SQLAlchemyError as e: return f"获取表 '{table_name}' 结构失败: {str(e)}" def execute_query(self, query, params=None): """执行自定义查询""" try: with self.engine.connect() as connection: if params: result = connection.execute(query, params) else: result = connection.execute(query) # 检查是否是SELECT查询 if result.returns_rows: columns = result.keys() rows = [dict(zip(columns, row)) for row in result] return {"columns": columns, "rows": rows} else: return {"affected_rows": result.rowcount} except SQLAlchemyError as e: return f"执行查询失败: {str(e)}" ```