#
tokens: 3954/50000 7/7 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── main.py
├── mcpserver-config.json
├── MemMCP-n8n.json
├── pyproject.toml
├── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
.qodo

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MemMCP
Cheat Engine-like but MCP (This is a PoC of the feature that this library provides, as-is.)

## Demo
https://youtu.be/NlY_R0lm5-M <br>
https://youtu.be/ivDTdxdTAQo

## Note
Don't forget to change the value inside `mcpserver-config.json` to match your environment

```

--------------------------------------------------------------------------------
/mcpserver-config.json:
--------------------------------------------------------------------------------

```json
{
    "mcpServers": {
      "mem-mcp": {
        "command": "uv",
        "args": [
          "--directory",
          "D:\\Projects\\MCP\\mem-mcp",
          "run",
          "main.py"
        ]
      }
    }
  }
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mem-mcp"
version = "0.1.0"
description = "Cheat Engine-like but MCP"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "mcp[cli]>=1.6.0",
    "psutil>=7.0.0",
    "pymem>=1.14.0",
]

```

--------------------------------------------------------------------------------
/MemMCP-n8n.json:
--------------------------------------------------------------------------------

```json
{
  "name": "MemMCP",
  "nodes": [
    {
      "parameters": {
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.chatTrigger",
      "typeVersion": 1.1,
      "position": [
        0,
        0
      ],
      "id": "ee713ce3-45dd-48a5-9d1e-d59178b2a7a5",
      "name": "When chat message received",
      "webhookId": "2c8f5251-b0ba-4811-b3ea-ab5b83508952"
    },
    {
      "parameters": {
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.agent",
      "typeVersion": 1.8,
      "position": [
        252,
        0
      ],
      "id": "d5fae538-7e95-4fbe-8088-1b0c2dc4c58e",
      "name": "AI Agent"
    },
    {
      "parameters": {
        "sseEndpoint": "http://127.0.0.1:8000/sse"
      },
      "type": "@n8n/n8n-nodes-langchain.mcpClientTool",
      "typeVersion": 1,
      "position": [
        460,
        220
      ],
      "id": "8fbde586-7ad6-45b2-a93f-d88b04402157",
      "name": "MCP Client"
    },
    {
      "parameters": {
        "model": {
          "__rl": true,
          "value": "gpt-4o",
          "mode": "list",
          "cachedResultName": "gpt-4o"
        },
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
      "typeVersion": 1.2,
      "position": [
        180,
        220
      ],
      "id": "ddcd9764-cce9-4161-822f-b44f3bc11cac",
      "name": "OpenAI Chat Model",
      "credentials": {
        "openAiApi": {
          "id": "kfRgY3O5fofzXymr",
          "name": "OpenAi account"
        }
      }
    },
    {
      "parameters": {},
      "type": "@n8n/n8n-nodes-langchain.memoryBufferWindow",
      "typeVersion": 1.3,
      "position": [
        340,
        220
      ],
      "id": "aa327afe-dce6-4313-8a95-4bc0afdbfefb",
      "name": "Simple Memory"
    }
  ],
  "pinData": {},
  "connections": {
    "When chat message received": {
      "main": [
        [
          {
            "node": "AI Agent",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "MCP Client": {
      "ai_tool": [
        [
          {
            "node": "AI Agent",
            "type": "ai_tool",
            "index": 0
          }
        ]
      ]
    },
    "OpenAI Chat Model": {
      "ai_languageModel": [
        [
          {
            "node": "AI Agent",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "Simple Memory": {
      "ai_memory": [
        [
          {
            "node": "AI Agent",
            "type": "ai_memory",
            "index": 0
          }
        ]
      ]
    }
  },
  "active": false,
  "settings": {
    "executionOrder": "v1"
  },
  "versionId": "dd40b2fc-fa4c-4c6f-a34c-24c609b67cc2",
  "meta": {
    "templateCredsSetupCompleted": true,
    "instanceId": "65b136f90644bb1f1f1f931d45eaf81ad7e00af2501d345c233d0fbea670b7ab"
  },
  "id": "jy7YfdtliTUui5Ls",
  "tags": []
}
```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
import pymem
import ctypes
from pymem.ressources.structure import MEMORY_BASIC_INFORMATION
import struct

# Create an MCP server instance
mcp = FastMCP("MemMCP")

# Global state to store process handle, addresses, and last scanned data type
state = {
    "pm": None,
    "process_name": None,
    "addresses": [],
    "data_type": None  # Track the data type used in the last scan
}

def get_value_bytes(value, data_type):
    """Convert a value to bytes based on the specified data type."""
    if data_type == "int":
        return int(value).to_bytes(4, byteorder='little', signed=True)
    elif data_type == "float":
        return struct.pack('<f', float(value))  # Little-endian float
    elif data_type == "double":
        return struct.pack('<d', float(value))  # Little-endian double
    elif data_type == "bytes":
        try:
            # Expect value as a hex string (e.g., "DEADBEEF") or comma-separated bytes (e.g., "222,173,190,239")
            if isinstance(value, str) and ',' in value:
                byte_list = [int(b.strip()) & 0xFF for b in value.split(',')]
                return bytes(byte_list)
            return bytes.fromhex(value.replace(' ', ''))  # Convert hex string to bytes
        except:
            raise ValueError("Invalid bytes format. Use hex string (e.g., 'DEADBEEF') or comma-separated bytes (e.g., '222,173,190,239').")
    else:
        raise ValueError("Unsupported data type. Use 'int', 'float', 'double', or 'bytes'.")

def read_value(pm, address, data_type):
    """Read a value from memory based on the specified data type."""
    if data_type == "int":
        return pm.read_int(address)
    elif data_type == "float":
        return pm.read_float(address)
    elif data_type == "double":
        return pm.read_double(address)
    elif data_type == "bytes":
        byte_length = len(state["last_value_bytes"])  # Use length from initial scan
        return pm.read_bytes(address, byte_length)
    else:
        raise ValueError("Unsupported data type.")

def write_value(pm, address, value, data_type):
    """Write a value to memory based on the specified data type."""
    if data_type == "int":
        pm.write_int(address, int(value))
    elif data_type == "float":
        pm.write_float(address, float(value))
    elif data_type == "double":
        pm.write_double(address, float(value))
    elif data_type == "bytes":
        if isinstance(value, str):
            if ',' in value:
                byte_list = [int(b.strip()) & 0xFF for b in value.split(',')]
                pm.write_bytes(address, bytes(byte_list))
            else:
                pm.write_bytes(address, bytes.fromhex(value.replace(' ', '')))
        else:
            raise ValueError("Value for bytes must be a hex string or comma-separated bytes.")
    else:
        raise ValueError("Unsupported data type.")

@mcp.tool(
    name="scan",
    description="Scans the process memory for a specified value of a given data type."
)
def scan(process_name: str, value: str, data_type: str = "int") -> str:
    """
    Scans the process memory for a value of the specified data type.
    Args:
        process_name: Name of the process (e.g., "popcapgame1.exe")
        value: Value to search for (e.g., "25" for int, "3.14" for float, "DEADBEEF" for bytes)
        data_type: Type of value ("int", "float", "double", "bytes") [default: "int"]
    Returns:
        A string with the number of found addresses and instructions.
    """
    global state
    
    if state["pm"] and state["process_name"] != process_name:
        state["pm"].close_process()
        state["pm"] = None
    
    if not state["pm"]:
        try:
            state["pm"] = pymem.Pymem(process_name)
            state["process_name"] = process_name
            state["addresses"] = []
            result = f"Successfully attached to {process_name}."
        except pymem.exception.ProcessNotFound:
            return f"Process {process_name} not found. Ensure it’s running."
        except Exception as e:
            return f"Error attaching to {process_name}: {str(e)}"
    else:
        result = f"Using existing attachment to {process_name}."

    try:
        value_bytes = get_value_bytes(value, data_type)
        state["data_type"] = data_type
        state["last_value_bytes"] = value_bytes  # Store for bytes length reference
    except ValueError as e:
        return f"{result}\nError: {str(e)}"

    matches = []
    process_handle = state["pm"].process_handle
    address = 0x00000000
    max_address = 0x7FFFFFFF
    
    mbi = MEMORY_BASIC_INFORMATION()
    while address < max_address:
        try:
            ctypes.windll.kernel32.VirtualQueryEx(process_handle, ctypes.c_void_p(address),
                                                  ctypes.byref(mbi), ctypes.sizeof(mbi))
            if mbi.State == 0x1000 and mbi.Protect in (0x04, 0x02, 0x20, 0x40):
                try:
                    region = state["pm"].read_bytes(address, mbi.RegionSize)
                    byte_length = len(value_bytes)
                    for i in range(len(region) - byte_length + 1):
                        if region[i:i+byte_length] == value_bytes:
                            matches.append(address + i)
                except:
                    pass
            address += mbi.RegionSize
        except:
            address += 0x1000
    
    state["addresses"] = matches
    count = len(matches)
    if count == 0:
        return f"{result}\nNo addresses found with {data_type} value {value}."
    return f"{result}\nFound {count} matching addresses with {data_type} value {value}. Call filter() with the new value after changing it in-game."

@mcp.tool(
    name="filter",
    description="Filters previously found addresses based on a new value of the same data type."
)
def filter(new_value: str) -> str:
    """
    Filters previously found addresses based on a new value of the same data type.
    Args:
        new_value: New value to filter by (e.g., "125" for int, "6.28" for float, "CAFEBABE" for bytes)
    Returns:
        A string with the number of remaining addresses and their details.
    """
    global state
    
    if not state["pm"]:
        return "No process attached. Run scan() first with a process name and initial value."
    if not state["addresses"]:
        return "No addresses to filter. Run scan() first with an initial value."
    if not state["data_type"]:
        return "No data type set. Run scan() first to specify a data type."
    
    try:
        value_bytes = get_value_bytes(new_value, state["data_type"])
    except ValueError as e:
        return f"Error: {str(e)}"
    
    filtered = []
    for addr in state["addresses"]:
        try:
            current_value = read_value(state["pm"], addr, state["data_type"])
            expected_value = struct.unpack('<f', value_bytes)[0] if state["data_type"] == "float" else \
                            struct.unpack('<d', value_bytes)[0] if state["data_type"] == "double" else \
                            value_bytes if state["data_type"] == "bytes" else int(new_value)
            if current_value == expected_value:
                filtered.append(addr)
        except:
            continue
    
    state["addresses"] = filtered
    count = len(filtered)
    
    if count == 0:
        return f"No addresses match the new {state['data_type']} value {new_value}. Try scanning again with scan()."
    elif count <= 5:
        details = "\nFinal candidates:\n" + "\n".join(
            [f"  0x{addr:X} -> {read_value(state['pm'], addr, state['data_type'])}"
             if read_value(state['pm'], addr, state['data_type']) is not None else f"  0x{addr:X} -> <unreadable>"
             for addr in filtered]
        )
        return f"Filtered to {count} addresses with {state['data_type']} value {new_value}.{details}\nCall edit() to modify these values or filter() again."
    else:
        return f"Filtered to {count} addresses with {state['data_type']} value {new_value}. Change the value in-game and call filter() again."

@mcp.tool(
    name="edit",
    description="Edits the values at the current list of addresses with the specified data type."
)
def edit(new_value: str) -> str:
    """
    Edits the values at the current list of addresses with the specified data type.
    Args:
        new_value: New value to write (e.g., "999" for int, "9.99" for float, "DEADBEEF" for bytes)
    Returns:
        A string with the results of the edit operation.
    """
    global state
    
    if not state["pm"]:
        return "No process attached. Run scan() first."
    if not state["addresses"]:
        return "No addresses to edit. Run scan() and filter() first."
    if not state["data_type"]:
        return "No data type set. Run scan() first to specify a data type."
    
    results = []
    for addr in state["addresses"]:
        try:
            write_value(state["pm"], addr, new_value, state["data_type"])
            results.append(f"  [✔] 0x{addr:X} updated to {new_value}")
        except Exception as e:
            results.append(f"  [✘] Failed to write to 0x{addr:X}: {str(e)}")
    
    return f"Edit results:\n" + "\n".join(results)

@mcp.tool(
    name="reset",
    description="Resets the state and closes the process handle."
)
def reset() -> str:
    """
    Resets the state and closes the process handle.
    """
    global state
    if state["pm"]:
        state["pm"].close_process()
    state = {"pm": None, "process_name": None, "addresses": [], "data_type": None}
    return "State reset and process handle closed."

@mcp.tool(
    name="get_addresses",
    description="Returns a specified number of addresses from the current list."
)
def get_addresses(count: int) -> str:
    """
    Returns a specified number of addresses from the current list.
    Args:
        count: Number of addresses to return (e.g., 5)
    Returns:
        A string with the requested number of addresses in hex format.
    """
    global state
    
    if not state["addresses"]:
        return "No addresses available. Run scan() and filter() first."
    
    available_count = len(state["addresses"])
    if count <= 0:
        return "Count must be greater than 0."
    
    return_count = min(count, available_count)
    addresses = state["addresses"][:return_count]
    formatted_addresses = [f"0x{addr:X}" for addr in addresses]
    return f"Returning {return_count} of {available_count} addresses:\n" + "\n".join(formatted_addresses)

# Run the server
if __name__ == "__main__":
    mcp.run()
    # for sse (endpoint: http://127.0.0.1:8000/sse)
    # mcp.run(transport="sse") 

```