# Directory Structure
```
├── .gitignore
├── .python-version
├── main.py
├── mcpserver-config.json
├── MemMCP-n8n.json
├── pyproject.toml
├── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.12
2 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python-generated files
2 | __pycache__/
3 | *.py[oc]
4 | build/
5 | dist/
6 | wheels/
7 | *.egg-info
8 |
9 | # Virtual environments
10 | .venv
11 | .qodo
12 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # MemMCP
2 | Cheat Engine-like but MCP (This is a PoC of the feature that this library provides, as-is.)
3 |
4 | ## Demo
5 | https://youtu.be/NlY_R0lm5-M <br>
6 | https://youtu.be/ivDTdxdTAQo
7 |
8 | ## Note
9 | Don't forget to change the value inside `mcpserver-config.json` to match your environment
10 |
```
--------------------------------------------------------------------------------
/mcpserver-config.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "mcpServers": {
3 | "mem-mcp": {
4 | "command": "uv",
5 | "args": [
6 | "--directory",
7 | "D:\\Projects\\MCP\\mem-mcp",
8 | "run",
9 | "main.py"
10 | ]
11 | }
12 | }
13 | }
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "mem-mcp"
3 | version = "0.1.0"
4 | description = "Cheat Engine-like but MCP"
5 | readme = "README.md"
6 | requires-python = ">=3.12"
7 | dependencies = [
8 | "mcp[cli]>=1.6.0",
9 | "psutil>=7.0.0",
10 | "pymem>=1.14.0",
11 | ]
12 |
```
--------------------------------------------------------------------------------
/MemMCP-n8n.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "name": "MemMCP",
3 | "nodes": [
4 | {
5 | "parameters": {
6 | "options": {}
7 | },
8 | "type": "@n8n/n8n-nodes-langchain.chatTrigger",
9 | "typeVersion": 1.1,
10 | "position": [
11 | 0,
12 | 0
13 | ],
14 | "id": "ee713ce3-45dd-48a5-9d1e-d59178b2a7a5",
15 | "name": "When chat message received",
16 | "webhookId": "2c8f5251-b0ba-4811-b3ea-ab5b83508952"
17 | },
18 | {
19 | "parameters": {
20 | "options": {}
21 | },
22 | "type": "@n8n/n8n-nodes-langchain.agent",
23 | "typeVersion": 1.8,
24 | "position": [
25 | 252,
26 | 0
27 | ],
28 | "id": "d5fae538-7e95-4fbe-8088-1b0c2dc4c58e",
29 | "name": "AI Agent"
30 | },
31 | {
32 | "parameters": {
33 | "sseEndpoint": "http://127.0.0.1:8000/sse"
34 | },
35 | "type": "@n8n/n8n-nodes-langchain.mcpClientTool",
36 | "typeVersion": 1,
37 | "position": [
38 | 460,
39 | 220
40 | ],
41 | "id": "8fbde586-7ad6-45b2-a93f-d88b04402157",
42 | "name": "MCP Client"
43 | },
44 | {
45 | "parameters": {
46 | "model": {
47 | "__rl": true,
48 | "value": "gpt-4o",
49 | "mode": "list",
50 | "cachedResultName": "gpt-4o"
51 | },
52 | "options": {}
53 | },
54 | "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
55 | "typeVersion": 1.2,
56 | "position": [
57 | 180,
58 | 220
59 | ],
60 | "id": "ddcd9764-cce9-4161-822f-b44f3bc11cac",
61 | "name": "OpenAI Chat Model",
62 | "credentials": {
63 | "openAiApi": {
64 | "id": "kfRgY3O5fofzXymr",
65 | "name": "OpenAi account"
66 | }
67 | }
68 | },
69 | {
70 | "parameters": {},
71 | "type": "@n8n/n8n-nodes-langchain.memoryBufferWindow",
72 | "typeVersion": 1.3,
73 | "position": [
74 | 340,
75 | 220
76 | ],
77 | "id": "aa327afe-dce6-4313-8a95-4bc0afdbfefb",
78 | "name": "Simple Memory"
79 | }
80 | ],
81 | "pinData": {},
82 | "connections": {
83 | "When chat message received": {
84 | "main": [
85 | [
86 | {
87 | "node": "AI Agent",
88 | "type": "main",
89 | "index": 0
90 | }
91 | ]
92 | ]
93 | },
94 | "MCP Client": {
95 | "ai_tool": [
96 | [
97 | {
98 | "node": "AI Agent",
99 | "type": "ai_tool",
100 | "index": 0
101 | }
102 | ]
103 | ]
104 | },
105 | "OpenAI Chat Model": {
106 | "ai_languageModel": [
107 | [
108 | {
109 | "node": "AI Agent",
110 | "type": "ai_languageModel",
111 | "index": 0
112 | }
113 | ]
114 | ]
115 | },
116 | "Simple Memory": {
117 | "ai_memory": [
118 | [
119 | {
120 | "node": "AI Agent",
121 | "type": "ai_memory",
122 | "index": 0
123 | }
124 | ]
125 | ]
126 | }
127 | },
128 | "active": false,
129 | "settings": {
130 | "executionOrder": "v1"
131 | },
132 | "versionId": "dd40b2fc-fa4c-4c6f-a34c-24c609b67cc2",
133 | "meta": {
134 | "templateCredsSetupCompleted": true,
135 | "instanceId": "65b136f90644bb1f1f1f931d45eaf81ad7e00af2501d345c233d0fbea670b7ab"
136 | },
137 | "id": "jy7YfdtliTUui5Ls",
138 | "tags": []
139 | }
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
1 | from mcp.server.fastmcp import FastMCP
2 | import pymem
3 | import ctypes
4 | from pymem.ressources.structure import MEMORY_BASIC_INFORMATION
5 | import struct
6 |
7 | # Create an MCP server instance
8 | mcp = FastMCP("MemMCP")
9 |
10 | # Global state to store process handle, addresses, and last scanned data type
11 | state = {
12 | "pm": None,
13 | "process_name": None,
14 | "addresses": [],
15 | "data_type": None # Track the data type used in the last scan
16 | }
17 |
18 | def get_value_bytes(value, data_type):
19 | """Convert a value to bytes based on the specified data type."""
20 | if data_type == "int":
21 | return int(value).to_bytes(4, byteorder='little', signed=True)
22 | elif data_type == "float":
23 | return struct.pack('<f', float(value)) # Little-endian float
24 | elif data_type == "double":
25 | return struct.pack('<d', float(value)) # Little-endian double
26 | elif data_type == "bytes":
27 | try:
28 | # Expect value as a hex string (e.g., "DEADBEEF") or comma-separated bytes (e.g., "222,173,190,239")
29 | if isinstance(value, str) and ',' in value:
30 | byte_list = [int(b.strip()) & 0xFF for b in value.split(',')]
31 | return bytes(byte_list)
32 | return bytes.fromhex(value.replace(' ', '')) # Convert hex string to bytes
33 | except:
34 | raise ValueError("Invalid bytes format. Use hex string (e.g., 'DEADBEEF') or comma-separated bytes (e.g., '222,173,190,239').")
35 | else:
36 | raise ValueError("Unsupported data type. Use 'int', 'float', 'double', or 'bytes'.")
37 |
38 | def read_value(pm, address, data_type):
39 | """Read a value from memory based on the specified data type."""
40 | if data_type == "int":
41 | return pm.read_int(address)
42 | elif data_type == "float":
43 | return pm.read_float(address)
44 | elif data_type == "double":
45 | return pm.read_double(address)
46 | elif data_type == "bytes":
47 | byte_length = len(state["last_value_bytes"]) # Use length from initial scan
48 | return pm.read_bytes(address, byte_length)
49 | else:
50 | raise ValueError("Unsupported data type.")
51 |
52 | def write_value(pm, address, value, data_type):
53 | """Write a value to memory based on the specified data type."""
54 | if data_type == "int":
55 | pm.write_int(address, int(value))
56 | elif data_type == "float":
57 | pm.write_float(address, float(value))
58 | elif data_type == "double":
59 | pm.write_double(address, float(value))
60 | elif data_type == "bytes":
61 | if isinstance(value, str):
62 | if ',' in value:
63 | byte_list = [int(b.strip()) & 0xFF for b in value.split(',')]
64 | pm.write_bytes(address, bytes(byte_list))
65 | else:
66 | pm.write_bytes(address, bytes.fromhex(value.replace(' ', '')))
67 | else:
68 | raise ValueError("Value for bytes must be a hex string or comma-separated bytes.")
69 | else:
70 | raise ValueError("Unsupported data type.")
71 |
72 | @mcp.tool(
73 | name="scan",
74 | description="Scans the process memory for a specified value of a given data type."
75 | )
76 | def scan(process_name: str, value: str, data_type: str = "int") -> str:
77 | """
78 | Scans the process memory for a value of the specified data type.
79 | Args:
80 | process_name: Name of the process (e.g., "popcapgame1.exe")
81 | value: Value to search for (e.g., "25" for int, "3.14" for float, "DEADBEEF" for bytes)
82 | data_type: Type of value ("int", "float", "double", "bytes") [default: "int"]
83 | Returns:
84 | A string with the number of found addresses and instructions.
85 | """
86 | global state
87 |
88 | if state["pm"] and state["process_name"] != process_name:
89 | state["pm"].close_process()
90 | state["pm"] = None
91 |
92 | if not state["pm"]:
93 | try:
94 | state["pm"] = pymem.Pymem(process_name)
95 | state["process_name"] = process_name
96 | state["addresses"] = []
97 | result = f"Successfully attached to {process_name}."
98 | except pymem.exception.ProcessNotFound:
99 | return f"Process {process_name} not found. Ensure it’s running."
100 | except Exception as e:
101 | return f"Error attaching to {process_name}: {str(e)}"
102 | else:
103 | result = f"Using existing attachment to {process_name}."
104 |
105 | try:
106 | value_bytes = get_value_bytes(value, data_type)
107 | state["data_type"] = data_type
108 | state["last_value_bytes"] = value_bytes # Store for bytes length reference
109 | except ValueError as e:
110 | return f"{result}\nError: {str(e)}"
111 |
112 | matches = []
113 | process_handle = state["pm"].process_handle
114 | address = 0x00000000
115 | max_address = 0x7FFFFFFF
116 |
117 | mbi = MEMORY_BASIC_INFORMATION()
118 | while address < max_address:
119 | try:
120 | ctypes.windll.kernel32.VirtualQueryEx(process_handle, ctypes.c_void_p(address),
121 | ctypes.byref(mbi), ctypes.sizeof(mbi))
122 | if mbi.State == 0x1000 and mbi.Protect in (0x04, 0x02, 0x20, 0x40):
123 | try:
124 | region = state["pm"].read_bytes(address, mbi.RegionSize)
125 | byte_length = len(value_bytes)
126 | for i in range(len(region) - byte_length + 1):
127 | if region[i:i+byte_length] == value_bytes:
128 | matches.append(address + i)
129 | except:
130 | pass
131 | address += mbi.RegionSize
132 | except:
133 | address += 0x1000
134 |
135 | state["addresses"] = matches
136 | count = len(matches)
137 | if count == 0:
138 | return f"{result}\nNo addresses found with {data_type} value {value}."
139 | return f"{result}\nFound {count} matching addresses with {data_type} value {value}. Call filter() with the new value after changing it in-game."
140 |
141 | @mcp.tool(
142 | name="filter",
143 | description="Filters previously found addresses based on a new value of the same data type."
144 | )
145 | def filter(new_value: str) -> str:
146 | """
147 | Filters previously found addresses based on a new value of the same data type.
148 | Args:
149 | new_value: New value to filter by (e.g., "125" for int, "6.28" for float, "CAFEBABE" for bytes)
150 | Returns:
151 | A string with the number of remaining addresses and their details.
152 | """
153 | global state
154 |
155 | if not state["pm"]:
156 | return "No process attached. Run scan() first with a process name and initial value."
157 | if not state["addresses"]:
158 | return "No addresses to filter. Run scan() first with an initial value."
159 | if not state["data_type"]:
160 | return "No data type set. Run scan() first to specify a data type."
161 |
162 | try:
163 | value_bytes = get_value_bytes(new_value, state["data_type"])
164 | except ValueError as e:
165 | return f"Error: {str(e)}"
166 |
167 | filtered = []
168 | for addr in state["addresses"]:
169 | try:
170 | current_value = read_value(state["pm"], addr, state["data_type"])
171 | expected_value = struct.unpack('<f', value_bytes)[0] if state["data_type"] == "float" else \
172 | struct.unpack('<d', value_bytes)[0] if state["data_type"] == "double" else \
173 | value_bytes if state["data_type"] == "bytes" else int(new_value)
174 | if current_value == expected_value:
175 | filtered.append(addr)
176 | except:
177 | continue
178 |
179 | state["addresses"] = filtered
180 | count = len(filtered)
181 |
182 | if count == 0:
183 | return f"No addresses match the new {state['data_type']} value {new_value}. Try scanning again with scan()."
184 | elif count <= 5:
185 | details = "\nFinal candidates:\n" + "\n".join(
186 | [f" 0x{addr:X} -> {read_value(state['pm'], addr, state['data_type'])}"
187 | if read_value(state['pm'], addr, state['data_type']) is not None else f" 0x{addr:X} -> <unreadable>"
188 | for addr in filtered]
189 | )
190 | return f"Filtered to {count} addresses with {state['data_type']} value {new_value}.{details}\nCall edit() to modify these values or filter() again."
191 | else:
192 | return f"Filtered to {count} addresses with {state['data_type']} value {new_value}. Change the value in-game and call filter() again."
193 |
194 | @mcp.tool(
195 | name="edit",
196 | description="Edits the values at the current list of addresses with the specified data type."
197 | )
198 | def edit(new_value: str) -> str:
199 | """
200 | Edits the values at the current list of addresses with the specified data type.
201 | Args:
202 | new_value: New value to write (e.g., "999" for int, "9.99" for float, "DEADBEEF" for bytes)
203 | Returns:
204 | A string with the results of the edit operation.
205 | """
206 | global state
207 |
208 | if not state["pm"]:
209 | return "No process attached. Run scan() first."
210 | if not state["addresses"]:
211 | return "No addresses to edit. Run scan() and filter() first."
212 | if not state["data_type"]:
213 | return "No data type set. Run scan() first to specify a data type."
214 |
215 | results = []
216 | for addr in state["addresses"]:
217 | try:
218 | write_value(state["pm"], addr, new_value, state["data_type"])
219 | results.append(f" [✔] 0x{addr:X} updated to {new_value}")
220 | except Exception as e:
221 | results.append(f" [✘] Failed to write to 0x{addr:X}: {str(e)}")
222 |
223 | return f"Edit results:\n" + "\n".join(results)
224 |
225 | @mcp.tool(
226 | name="reset",
227 | description="Resets the state and closes the process handle."
228 | )
229 | def reset() -> str:
230 | """
231 | Resets the state and closes the process handle.
232 | """
233 | global state
234 | if state["pm"]:
235 | state["pm"].close_process()
236 | state = {"pm": None, "process_name": None, "addresses": [], "data_type": None}
237 | return "State reset and process handle closed."
238 |
239 | @mcp.tool(
240 | name="get_addresses",
241 | description="Returns a specified number of addresses from the current list."
242 | )
243 | def get_addresses(count: int) -> str:
244 | """
245 | Returns a specified number of addresses from the current list.
246 | Args:
247 | count: Number of addresses to return (e.g., 5)
248 | Returns:
249 | A string with the requested number of addresses in hex format.
250 | """
251 | global state
252 |
253 | if not state["addresses"]:
254 | return "No addresses available. Run scan() and filter() first."
255 |
256 | available_count = len(state["addresses"])
257 | if count <= 0:
258 | return "Count must be greater than 0."
259 |
260 | return_count = min(count, available_count)
261 | addresses = state["addresses"][:return_count]
262 | formatted_addresses = [f"0x{addr:X}" for addr in addresses]
263 | return f"Returning {return_count} of {available_count} addresses:\n" + "\n".join(formatted_addresses)
264 |
265 | # Run the server
266 | if __name__ == "__main__":
267 | mcp.run()
268 | # for sse (endpoint: http://127.0.0.1:8000/sse)
269 | # mcp.run(transport="sse")
270 |
```