#
tokens: 11607/50000 9/9 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── pyproject.toml
├── README.md
├── src
│   └── mitmproxy_mcp
│       ├── __init__.py
│       ├── flow_utils.py
│       ├── json_utils.py
│       ├── protection_analysis.py
│       └── server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.11
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | 
12 | # mitmdumps
13 | 
14 | dumps
15 | 
16 | playground*
17 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # mitmproxy-mcp MCP server
  2 | 
  3 | A MCP server project
  4 | 
  5 | ## Components
  6 | 
  7 | ### Resources
  8 | 
  9 | The server implements a simple note storage system with:
 10 | - Custom note:// URI scheme for accessing individual notes
 11 | - Each note resource has a name, description and text/plain mimetype
 12 | 
 13 | ### Prompts
 14 | 
 15 | The server provides a single prompt:
 16 | - summarize-notes: Creates summaries of all stored notes
 17 |   - Optional "style" argument to control detail level (brief/detailed)
 18 |   - Generates prompt combining all current notes with style preference
 19 | 
 20 | ### Tools
 21 | 
 22 | The server implements one tool:
 23 | - add-note: Adds a new note to the server
 24 |   - Takes "name" and "content" as required string arguments
 25 |   - Updates server state and notifies clients of resource changes
 26 | 
 27 | ## Configuration
 28 | 
 29 | [TODO: Add configuration details specific to your implementation]
 30 | 
 31 | ## Quickstart
 32 | 
 33 | ### Install
 34 | 
 35 | #### Claude Desktop
 36 | 
 37 | On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
 38 | On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
 39 | 
 40 | <details>
 41 |   <summary>Development/Unpublished Servers Configuration</summary>
 42 |   ```
 43 |   "mcpServers": {
 44 |     "mitmproxy-mcp": {
 45 |       "command": "uv",
 46 |       "args": [
 47 |         "--directory",
 48 |         "/Users/lucas/Coding/mitmproxy-mcp",
 49 |         "run",
 50 |         "mitmproxy-mcp"
 51 |       ]
 52 |     }
 53 |   }
 54 |   ```
 55 | </details>
 56 | 
 57 | <details>
 58 |   <summary>Published Servers Configuration</summary>
 59 |   ```
 60 |   "mcpServers": {
 61 |     "mitmproxy-mcp": {
 62 |       "command": "uvx",
 63 |       "args": [
 64 |         "mitmproxy-mcp"
 65 |       ]
 66 |     }
 67 |   }
 68 |   ```
 69 | </details>
 70 | 
 71 | ## Development
 72 | 
 73 | ### Building and Publishing
 74 | 
 75 | To prepare the package for distribution:
 76 | 
 77 | 1. Sync dependencies and update lockfile:
 78 | ```bash
 79 | uv sync
 80 | ```
 81 | 
 82 | 2. Build package distributions:
 83 | ```bash
 84 | uv build
 85 | ```
 86 | 
 87 | This will create source and wheel distributions in the `dist/` directory.
 88 | 
 89 | 3. Publish to PyPI:
 90 | ```bash
 91 | uv publish
 92 | ```
 93 | 
 94 | Note: You'll need to set PyPI credentials via environment variables or command flags:
 95 | - Token: `--token` or `UV_PUBLISH_TOKEN`
 96 | - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
 97 | 
 98 | ### Debugging
 99 | 
100 | Since MCP servers run over stdio, debugging can be challenging. For the best debugging
101 | experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
102 | 
103 | 
104 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
105 | 
106 | ```bash
107 | npx @modelcontextprotocol/inspector uv --directory /Users/lucas/Coding/mitmproxy-mcp run mitmproxy-mcp
108 | ```
109 | 
110 | 
111 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```

--------------------------------------------------------------------------------
/src/mitmproxy_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
1 | from . import server
2 | import asyncio
3 | 
4 | def main():
5 |     """Main entry point for the package."""
6 |     asyncio.run(server.main())
7 | 
8 | # Optionally expose other important items at package level
9 | __all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "mitmproxy-mcp"
 3 | version = "0.1.0"
 4 | description = "A MCP server project"
 5 | readme = "README.md"
 6 | requires-python = ">=3.11"
 7 | dependencies = [
 8 |  "mcp>=1.3.0",
 9 |  "mitmproxy>=11.0.2",
10 | ]
11 | [[project.authors]]
12 | name = "Lucas Soeth"
13 | email = "[email protected]"
14 | 
15 | [build-system]
16 | requires = [ "hatchling",]
17 | build-backend = "hatchling.build"
18 | 
19 | [project.scripts]
20 | mitmproxy-mcp = "mitmproxy_mcp:main"
21 | 
```

--------------------------------------------------------------------------------
/src/mitmproxy_mcp/flow_utils.py:
--------------------------------------------------------------------------------

```python
 1 | import os
 2 | import json
 3 | from typing import Any, Dict, List, Union
 4 | from mitmproxy import io
 5 | 
 6 | # Directory where mitmproxy dump files are stored
 7 | DUMP_DIR = "/Users/lucas/Coding/mitmproxy-mcp/dumps"
 8 | 
 9 | # Cache for storing flows per session
10 | FLOW_CACHE = {}
11 | 
12 | async def get_flows_from_dump(session_id: str) -> list:
13 |     """
14 |     Retrieves flows from the dump file, using the cache if available.
15 |     """
16 |     dump_file = os.path.join(DUMP_DIR, f"{session_id}.dump")
17 |     if not os.path.exists(dump_file):
18 |         raise FileNotFoundError("Session not found")
19 | 
20 |     if session_id in FLOW_CACHE:
21 |         return FLOW_CACHE[session_id]
22 |     else:
23 |         with open(dump_file, "rb") as f:
24 |             reader = io.FlowReader(f)
25 |             flows = list(reader.stream())
26 |         FLOW_CACHE[session_id] = flows
27 |         return flows
28 | 
29 | def parse_json_content(content: bytes, headers: dict) -> Union[Dict, str, bytes]:
30 |     """
31 |     Attempts to parse content as JSON if the content type indicates JSON.
32 |     Returns the parsed JSON or the raw content if parsing fails.
33 |     """
34 |     content_type = headers.get("Content-Type", "").lower() if headers else ""
35 |     
36 |     if "application/json" in content_type or "text/json" in content_type:
37 |         try:
38 |             return json.loads(content.decode(errors="ignore"))
39 |         except json.JSONDecodeError:
40 |             return content.decode(errors="ignore")
41 |     return content.decode(errors="ignore")
42 | 
```

--------------------------------------------------------------------------------
/src/mitmproxy_mcp/json_utils.py:
--------------------------------------------------------------------------------

```python
  1 | from typing import Any, Dict, List, Union
  2 | 
  3 | def generate_json_structure(json_data: Any, max_depth: int = 2, current_depth: int = 0) -> Any:
  4 |     """
  5 |     Generate a simplified structure of JSON content, showing keys and types
  6 |     but replacing actual values with type indicators after a certain depth.
  7 |     """
  8 |     if current_depth >= max_depth:
  9 |         if isinstance(json_data, dict):
 10 |             return {"...": f"{len(json_data)} keys"}
 11 |         elif isinstance(json_data, list):
 12 |             return f"[{len(json_data)} items]"
 13 |         else:
 14 |             return f"({type(json_data).__name__})"
 15 |     
 16 |     if isinstance(json_data, dict):
 17 |         result = {}
 18 |         for key, value in json_data.items():
 19 |             result[key] = generate_json_structure(value, max_depth, current_depth + 1)
 20 |         return result
 21 |     elif isinstance(json_data, list):
 22 |         if not json_data:
 23 |             return []
 24 |         # For lists, show structure of first item and count
 25 |         sample = generate_json_structure(json_data[0], max_depth, current_depth + 1)
 26 |         return [sample, f"... ({len(json_data)-1} more items)"] if len(json_data) > 1 else [sample]
 27 |     else:
 28 |         return f"({type(json_data).__name__})"
 29 | 
 30 | def extract_with_jsonpath(json_data: Any, path: str) -> Any:
 31 |     """
 32 |     Basic implementation of JSONPath extraction.
 33 |     Supports simple dot notation and array indexing.
 34 |     For more complex cases, consider using a full JSONPath library.
 35 |     """
 36 |     # Handle root object reference
 37 |     if path == "$":
 38 |         return json_data
 39 |     
 40 |     # Strip leading $ if present
 41 |     if path.startswith("$"):
 42 |         path = path[1:]
 43 |     if path.startswith("."):
 44 |         path = path[1:]
 45 |         
 46 |     parts = []
 47 |     # Parse the path - handle both dot notation and brackets
 48 |     current = ""
 49 |     in_brackets = False
 50 |     for char in path:
 51 |         if char == "[":
 52 |             if current:
 53 |                 parts.append(current)
 54 |                 current = ""
 55 |             in_brackets = True
 56 |         elif char == "]":
 57 |             if in_brackets:
 58 |                 try:
 59 |                     # Handle array index
 60 |                     parts.append(int(current.strip()))
 61 |                 except ValueError:
 62 |                     # Handle quoted key
 63 |                     quoted = current.strip()
 64 |                     if (quoted.startswith("'") and quoted.endswith("'")) or \
 65 |                        (quoted.startswith('"') and quoted.endswith('"')):
 66 |                         parts.append(quoted[1:-1])
 67 |                     else:
 68 |                         parts.append(quoted)
 69 |                 current = ""
 70 |                 in_brackets = False
 71 |         elif char == "." and not in_brackets:
 72 |             if current:
 73 |                 parts.append(current)
 74 |                 current = ""
 75 |         else:
 76 |             current += char
 77 |     
 78 |     if current:
 79 |         parts.append(current)
 80 |     
 81 |     # Navigate through the data
 82 |     result = json_data
 83 |     for part in parts:
 84 |         try:
 85 |             if isinstance(result, dict):
 86 |                 result = result.get(part)
 87 |             elif isinstance(result, list) and isinstance(part, int):
 88 |                 if 0 <= part < len(result):
 89 |                     result = result[part]
 90 |                 else:
 91 |                     return None
 92 |             else:
 93 |                 return None
 94 |             
 95 |             if result is None:
 96 |                 break
 97 |         except Exception:
 98 |             return None
 99 |     
100 |     return result
101 | 
```

--------------------------------------------------------------------------------
/src/mitmproxy_mcp/protection_analysis.py:
--------------------------------------------------------------------------------

```python
  1 | import re
  2 | from typing import Any, Dict, List
  3 | 
  4 | # Known bot protection systems and their signatures
  5 | BOT_PROTECTION_SIGNATURES = {
  6 |     "Cloudflare": [
  7 |         r"cf-ray",  # Cloudflare Ray ID header
  8 |         r"__cf_bm",  # Cloudflare Bot Management cookie
  9 |         r"cf_clearance",  # Cloudflare challenge clearance cookie
 10 |         r"\"why_captcha\"",  # Common in Cloudflare challenge responses
 11 |         r"challenge-platform",  # Used in challenge scripts
 12 |         r"turnstile\.js",  # Cloudflare Turnstile
 13 |     ],
 14 |     "Akamai Bot Manager": [
 15 |         r"_abck=",  # Akamai Bot Manager cookie
 16 |         r"akam_", # Akamai cookie prefix
 17 |         r"bm_sz", # Bot Manager cookie
 18 |         r"sensor_data", # Bot detection data
 19 |     ],
 20 |     "PerimeterX": [
 21 |         r"_px\d?=",  # PerimeterX cookies
 22 |         r"px\.js", # PerimeterX script
 23 |         r"px-captcha", # PerimeterX captcha
 24 |     ],
 25 |     "DataDome": [
 26 |         r"datadome=",  # DataDome cookie
 27 |         r"datadome\.js", # DataDome script
 28 |         r"_dd_s",  # DataDome session cookie
 29 |     ],
 30 |     "reCAPTCHA": [
 31 |         r"google\.com/recaptcha",
 32 |         r"recaptcha\.net",
 33 |         r"g-recaptcha",
 34 |     ],
 35 |     "hCaptcha": [
 36 |         r"hcaptcha\.com",
 37 |         r"h-captcha",
 38 |     ],
 39 |     "Generic Bot Detection": [
 40 |         r"bot=",  # Generic bot cookie
 41 |         r"captcha", # Generic captcha reference
 42 |         r"challenge",  # Generic challenge term
 43 |         r"detected automated traffic", # Common message
 44 |         r"verify you are human", # Common message
 45 |     ]
 46 | }
 47 | 
 48 | def extract_javascript(html_content: str) -> List[Dict[str, Any]]:
 49 |     """
 50 |     Extract JavaScript from HTML content and provide basic analysis.
 51 |     Returns list of dictionaries with script info.
 52 |     """
 53 |     scripts = []
 54 |     
 55 |     # Extract inline scripts
 56 |     inline_pattern = r'<script[^>]*>(.*?)</script>'
 57 |     inline_scripts = re.findall(inline_pattern, html_content, re.DOTALL)
 58 |     
 59 |     for i, script in enumerate(inline_scripts):
 60 |         if len(script.strip()) > 0:
 61 |             script_info = {
 62 |                 "type": "inline",
 63 |                 "index": i,
 64 |                 "size": len(script),
 65 |                 "content": script if len(script) < 1000 else script[:1000] + "... [truncated]",
 66 |                 "summary": analyze_script(script)
 67 |             }
 68 |             scripts.append(script_info)
 69 |     
 70 |     # Extract external script references
 71 |     src_pattern = r'<script[^>]*src=[\'"]([^\'"]+)[\'"][^>]*>'
 72 |     external_scripts = re.findall(src_pattern, html_content)
 73 |     
 74 |     for i, src in enumerate(external_scripts):
 75 |         script_info = {
 76 |             "type": "external",
 77 |             "index": i,
 78 |             "src": src,
 79 |             "suspicious": any(term in src.lower() for term in [
 80 |                 "captcha", "challenge", "bot", "protect", "security", 
 81 |                 "verify", "check", "shield", "defend", "guard"
 82 |             ])
 83 |         }
 84 |         scripts.append(script_info)
 85 |     
 86 |     return scripts
 87 | 
 88 | def analyze_script(script: str) -> Dict[str, Any]:
 89 |     """
 90 |     Analyze JavaScript content for common protection patterns.
 91 |     """
 92 |     analysis = {
 93 |         "potential_protection": False,
 94 |         "fingerprinting_indicators": [],
 95 |         "token_generation_indicators": [],
 96 |         "obfuscation_level": "none",
 97 |         "key_functions": []
 98 |     }
 99 |     
100 |     # Check for fingerprinting techniques
101 |     fingerprinting_patterns = [
102 |         (r'navigator\.', "Browser navigator object"),
103 |         (r'screen\.', "Screen properties"),
104 |         (r'canvas', "Canvas fingerprinting"),
105 |         (r'webgl', "WebGL fingerprinting"),
106 |         (r'font', "Font enumeration"),
107 |         (r'audio', "Audio fingerprinting"),
108 |         (r'plugins', "Plugin enumeration"),
109 |         (r'User-Agent', "User-Agent checking"),
110 |         (r'platform', "Platform detection")
111 |     ]
112 |     
113 |     for pattern, description in fingerprinting_patterns:
114 |         if re.search(pattern, script, re.IGNORECASE):
115 |             analysis["fingerprinting_indicators"].append(description)
116 |     
117 |     # Check for token generation
118 |     token_patterns = [
119 |         (r'(token|captcha|challenge|clearance)', "Token/challenge reference"),
120 |         (r'(generate|calculate|compute)', "Computation terms"),
121 |         (r'(Math\.random|crypto)', "Random generation"),
122 |         (r'(cookie|setCookie|document\.cookie)', "Cookie manipulation"),
123 |         (r'(xhr|XMLHttpRequest|fetch)', "Request sending")
124 |     ]
125 |     
126 |     for pattern, description in token_patterns:
127 |         if re.search(pattern, script, re.IGNORECASE):
128 |             analysis["token_generation_indicators"].append(description)
129 |     
130 |     # Check for common obfuscation techniques
131 |     if len(re.findall(r'eval\(', script)) > 3:
132 |         analysis["obfuscation_level"] = "high"
133 |     elif len(re.findall(r'\\x[0-9a-f]{2}', script)) > 10:
134 |         analysis["obfuscation_level"] = "high"
135 |     elif len(re.findall(r'String\.fromCharCode', script)) > 3:
136 |         analysis["obfuscation_level"] = "high"
137 |     elif re.search(r'function\(\w{1,2},\w{1,2},\w{1,2}\)\{', script):
138 |         analysis["obfuscation_level"] = "medium"
139 |     elif sum(1 for c in script if c == ';') > len(script) / 10:
140 |         analysis["obfuscation_level"] = "medium"
141 |     elif sum(len(w) > 30 for w in re.findall(r'\w+', script)) > 10:
142 |         analysis["obfuscation_level"] = "medium"
143 |     
144 |     # Extract potential key function names
145 |     function_pattern = r'function\s+(\w+)\s*\('
146 |     functions = re.findall(function_pattern, script)
147 |     
148 |     suspicious_terms = ["challenge", "token", "captcha", "verify", "bot", "check", "security"]
149 |     for func in functions:
150 |         if any(term in func.lower() for term in suspicious_terms):
151 |             analysis["key_functions"].append(func)
152 |     
153 |     # Determine if this is potentially protection-related
154 |     analysis["potential_protection"] = (
155 |         len(analysis["fingerprinting_indicators"]) > 2 or
156 |         len(analysis["token_generation_indicators"]) > 2 or
157 |         analysis["obfuscation_level"] != "none" or
158 |         len(analysis["key_functions"]) > 0
159 |     )
160 |     
161 |     return analysis
162 | 
163 | def analyze_cookies(headers: Dict[str, str]) -> List[Dict[str, Any]]:
164 |     """
165 |     Analyze cookies for common protection-related patterns.
166 |     """
167 |     cookie_header = headers.get("Cookie", "") or headers.get("Set-Cookie", "")
168 |     if not cookie_header:
169 |         return []
170 |     
171 |     # Split multiple cookies
172 |     cookies = []
173 |     for cookie_str in cookie_header.split(";"):
174 |         parts = cookie_str.strip().split("=", 1)
175 |         if len(parts) == 2:
176 |             name, value = parts
177 |             cookie = {
178 |                 "name": name.strip(),
179 |                 "value": value.strip() if len(value.strip()) < 50 else value.strip()[:50] + "... [truncated]",
180 |                 "protection_related": False,
181 |                 "vendor": "unknown"
182 |             }
183 |             
184 |             # Check if this is a known protection cookie
185 |             for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
186 |                 for sig in signatures:
187 |                     if re.search(sig, name, re.IGNORECASE):
188 |                         cookie["protection_related"] = True
189 |                         cookie["vendor"] = vendor
190 |                         break
191 |                 if cookie["protection_related"]:
192 |                     break
193 |             
194 |             cookies.append(cookie)
195 |     
196 |     return cookies
197 | 
198 | def identify_protection_system(flow) -> List[Dict[str, Any]]:
199 |     """
200 |     Identify potential bot protection systems based on signatures.
201 |     """
202 |     protections = []
203 |     
204 |     # Combine all searchable content
205 |     searchable_content = ""
206 |     # Add request headers
207 |     for k, v in flow.request.headers.items():
208 |         searchable_content += f"{k}: {v}\n"
209 |     
210 |     # Check response if available
211 |     if flow.response:
212 |         # Add response headers
213 |         for k, v in flow.response.headers.items():
214 |             searchable_content += f"{k}: {v}\n"
215 |         
216 |         # Add response content if it's text
217 |         content_type = flow.response.headers.get("Content-Type", "")
218 |         if "text" in content_type or "javascript" in content_type or "json" in content_type:
219 |             try:
220 |                 searchable_content += flow.response.content.decode('utf-8', errors='ignore')
221 |             except Exception:
222 |                 pass
223 |     
224 |     # Check for protection signatures
225 |     for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
226 |         matches = []
227 |         for sig in signatures:
228 |             if re.search(sig, searchable_content, re.IGNORECASE):
229 |                 matches.append(sig)
230 |         
231 |         if matches:
232 |             protections.append({
233 |                 "vendor": vendor,
234 |                 "confidence": len(matches) / len(signatures) * 100,
235 |                 "matching_signatures": matches
236 |             })
237 |     
238 |     return sorted(protections, key=lambda x: x["confidence"], reverse=True)
239 | 
240 | def analyze_response_for_challenge(flow) -> Dict[str, Any]:
241 |     """
242 |     Analyze a response to determine if it contains a challenge.
243 |     """
244 |     if not flow.response:
245 |         return {"is_challenge": False}
246 |     
247 |     result = {
248 |         "is_challenge": False,
249 |         "challenge_indicators": [],
250 |         "status_code": flow.response.status_code,
251 |         "challenge_type": "unknown"
252 |     }
253 |     
254 |     # Check status code
255 |     if flow.response.status_code in [403, 429, 503]:
256 |         result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}")
257 |     
258 |     # Check for challenge headers
259 |     challenge_headers = {
260 |         "cf-mitigated": "Cloudflare mitigation",
261 |         "cf-chl-bypass": "Cloudflare challenge bypass",
262 |         "x-datadome": "DataDome protection",
263 |         "x-px": "PerimeterX",
264 |         "x-amz-captcha": "AWS WAF Captcha"
265 |     }
266 |     
267 |     for header, description in challenge_headers.items():
268 |         if any(h.lower() == header.lower() for h in flow.response.headers.keys()):
269 |             result["challenge_indicators"].append(f"Challenge header: {description}")
270 |     
271 |     # Check for challenge content patterns
272 |     content = flow.response.content.decode('utf-8', errors='ignore')
273 |     challenge_patterns = [
274 |         (r'captcha', "CAPTCHA"),
275 |         (r'challenge', "Challenge term"),
276 |         (r'blocked', "Blocking message"),
277 |         (r'verify.*human', "Human verification"),
278 |         (r'suspicious.*activity', "Suspicious activity message"),
279 |         (r'security.*check', "Security check message"),
280 |         (r'ddos', "DDoS protection message"),
281 |         (r'automated.*request', "Automated request detection")
282 |     ]
283 |     
284 |     for pattern, description in challenge_patterns:
285 |         if re.search(pattern, content, re.IGNORECASE):
286 |             result["challenge_indicators"].append(f"Content indicator: {description}")
287 |     
288 |     # Determine if this is a challenge response
289 |     result["is_challenge"] = len(result["challenge_indicators"]) > 0
290 |     
291 |     # Determine challenge type
292 |     if "CAPTCHA" in " ".join(result["challenge_indicators"]):
293 |         result["challenge_type"] = "captcha"
294 |     elif "JavaScript" in content and result["is_challenge"]:
295 |         result["challenge_type"] = "javascript"
296 |     elif result["is_challenge"]:
297 |         result["challenge_type"] = "other"
298 |     
299 |     return result
300 | 
301 | def generate_suggestions(analysis: Dict[str, Any]) -> List[str]:
302 |     """
303 |     Generate remediation suggestions based on the protection analysis.
304 |     """
305 |     suggestions = []
306 |     
307 |     # Check if any protection system was detected
308 |     if analysis.get("protection_systems"):
309 |         top_system = analysis["protection_systems"][0]["vendor"]
310 |         confidence = analysis["protection_systems"][0]["confidence"]
311 |         
312 |         if confidence > 50:
313 |             suggestions.append(f"Detected {top_system} with {confidence:.1f}% confidence.")
314 |             
315 |             # Add vendor-specific suggestions
316 |             if "Cloudflare" in top_system:
317 |                 suggestions.append("Cloudflare often uses JavaScript challenges. Check for cf_clearance cookie.")
318 |                 suggestions.append("Consider using proven techniques like cfscrape or cloudscraper libraries.")
319 |             elif "Akamai" in top_system:
320 |                 suggestions.append("Akamai uses sensor_data for browser fingerprinting.")
321 |                 suggestions.append("Focus on _abck cookie which contains browser verification data.")
322 |             elif "PerimeterX" in top_system:
323 |                 suggestions.append("PerimeterX relies on JavaScript execution and browser fingerprinting.")
324 |                 suggestions.append("Look for _px cookies which are essential for session validation.")
325 |             elif "DataDome" in top_system:
326 |                 suggestions.append("DataDome uses advanced behavioral and fingerprinting techniques.")
327 |                 suggestions.append("The datadome cookie is critical for maintaining sessions.")
328 |             elif "CAPTCHA" in top_system:
329 |                 suggestions.append("This site uses CAPTCHA challenges which may require manual solving or specialized services.")
330 |     
331 |     # Add suggestions based on challenge type
332 |     if analysis.get("challenge_analysis", {}).get("is_challenge", False):
333 |         challenge_type = analysis.get("challenge_analysis", {}).get("challenge_type", "unknown")
334 |         
335 |         if challenge_type == "javascript":
336 |             suggestions.append("This response contains a JavaScript challenge that must be solved.")
337 |             suggestions.append("Consider using a headless browser to execute the challenge JavaScript.")
338 |             
339 |             # If we have script analysis, add more specific suggestions
340 |             if "scripts" in analysis:
341 |                 obfuscated_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("obfuscation_level") in ["medium", "high"]]
342 |                 if obfuscated_scripts:
343 |                     suggestions.append(f"Found {len(obfuscated_scripts)} obfuscated script(s) that likely contain challenge logic.")
344 |                 
345 |                 fingerprinting_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("fingerprinting_indicators")]
346 |                 if fingerprinting_scripts:
347 |                     techniques = set()
348 |                     for script in fingerprinting_scripts:
349 |                         techniques.update(script.get("summary", {}).get("fingerprinting_indicators", []))
350 |                     suggestions.append(f"Detected browser fingerprinting techniques: {', '.join(techniques)}.")
351 |                     
352 |         elif challenge_type == "captcha":
353 |             suggestions.append("This response contains a CAPTCHA challenge.")
354 |             suggestions.append("Consider using a CAPTCHA solving service or manual intervention.")
355 |     
356 |     # Check for important cookies
357 |     protection_cookies = [c for c in analysis.get("response_cookies", []) if c.get("protection_related")]
358 |     if protection_cookies:
359 |         cookie_names = [c["name"] for c in protection_cookies]
360 |         suggestions.append(f"Important protection cookies to maintain: {', '.join(cookie_names)}.")
361 |     
362 |     # General suggestions
363 |     if analysis.get("protection_systems") or analysis.get("challenge_analysis", {}).get("is_challenge", False):
364 |         suggestions.append("General recommendations:")
365 |         suggestions.append("- Maintain consistent User-Agent between requests")
366 |         suggestions.append("- Preserve all cookies from the session")
367 |         suggestions.append("- Add appropriate referer and origin headers")
368 |         suggestions.append("- Consider adding delays between requests to avoid rate limiting")
369 |         suggestions.append("- Use rotating IP addresses if available")
370 |     
371 |     return suggestions
372 | 
```

--------------------------------------------------------------------------------
/src/mitmproxy_mcp/server.py:
--------------------------------------------------------------------------------

```python
  1 | import asyncio
  2 | import os
  3 | import json
  4 | import re
  5 | import base64
  6 | from typing import Any, Dict, List, Optional, Union, Tuple
  7 | from mitmproxy import io
  8 | 
  9 | from mcp.server.models import InitializationOptions
 10 | import mcp.types as types
 11 | from mcp.server import NotificationOptions, Server
 12 | import mcp.server.stdio
 13 | 
 14 | from mitmproxy_mcp.flow_utils import get_flows_from_dump, parse_json_content
 15 | from mitmproxy_mcp.json_utils import generate_json_structure, extract_with_jsonpath
 16 | from mitmproxy_mcp.protection_analysis import (
 17 |     analyze_response_for_challenge,
 18 |     analyze_script,
 19 |     analyze_cookies,
 20 |     extract_javascript,
 21 |     generate_suggestions,
 22 |     identify_protection_system,
 23 |     BOT_PROTECTION_SIGNATURES,
 24 | )
 25 | 
 26 | # Maximum content size in bytes before switching to structure preview
 27 | MAX_CONTENT_SIZE = 2000
 28 | 
 29 | server = Server("mitmproxy-mcp")
 30 | 
 31 | @server.list_tools()
 32 | async def handle_list_tools() -> list[types.Tool]:
 33 |     """
 34 |     List available tools.
 35 |     Each tool specifies its arguments using JSON Schema validation.
 36 |     """
 37 |     return [
 38 |         types.Tool(
 39 |             name="list_flows",
 40 |             description="Retrieves detailed HTTP request/response data including headers, content (or structure preview for large JSON), and metadata from specified flows",
 41 |             inputSchema={
 42 |                 "type": "object",
 43 |                 "properties": {
 44 |                     "session_id": {
 45 |                         "type": "string",
 46 |                         "description": "The ID of the session to list flows from"
 47 |                     }
 48 |                 },
 49 |                 "required": ["session_id"]
 50 |             }
 51 |         ),
 52 |         types.Tool(
 53 |             name="get_flow_details",
 54 |             description="Lists HTTP requests/responses from a mitmproxy capture session, showing method, URL, and status codes",
 55 |             inputSchema={
 56 |                 "type": "object",
 57 |                 "properties": {
 58 |                     "session_id": {
 59 |                         "type": "string",
 60 |                         "description": "The ID of the session"
 61 |                     },
 62 |                     "flow_indexes": {
 63 |                         "type": "array",
 64 |                         "items": {
 65 |                             "type": "integer"
 66 |                         },
 67 |                         "description": "The indexes of the flows"
 68 |                     },
 69 |                     "include_content": {
 70 |                         "type": "boolean",
 71 |                         "description": "Whether to include full content in the response (default: true)",
 72 |                         "default": True
 73 |                     }
 74 |                 },
 75 |                 "required": ["session_id", "flow_indexes"]
 76 |             }
 77 |         ),
 78 |         types.Tool(
 79 |             name="extract_json_fields",
 80 |             description="Extract specific fields from JSON content in a flow using JSONPath expressions",
 81 |             inputSchema={
 82 |                 "type": "object",
 83 |                 "properties": {
 84 |                     "session_id": {
 85 |                         "type": "string",
 86 |                         "description": "The ID of the session"
 87 |                     },
 88 |                     "flow_index": {
 89 |                         "type": "integer",
 90 |                         "description": "The index of the flow"
 91 |                     },
 92 |                     "content_type": {
 93 |                         "type": "string",
 94 |                         "enum": ["request", "response"],
 95 |                         "description": "Whether to extract from request or response content"
 96 |                     },
 97 |                     "json_paths": {
 98 |                         "type": "array",
 99 |                         "items": {
100 |                             "type": "string"
101 |                         },
102 |                         "description": "JSONPath expressions to extract (e.g. ['$.data.users', '$.metadata.timestamp'])"
103 |                     }
104 |                 },
105 |                 "required": ["session_id", "flow_index", "content_type", "json_paths"]
106 |             }
107 |         ),
108 |         types.Tool(
109 |             name="analyze_protection",
110 |             description="Analyze flow for bot protection mechanisms and extract challenge details",
111 |             inputSchema={
112 |                 "type": "object",
113 |                 "properties": {
114 |                     "session_id": {
115 |                         "type": "string",
116 |                         "description": "The ID of the session"
117 |                     },
118 |                     "flow_index": {
119 |                         "type": "integer",
120 |                         "description": "The index of the flow to analyze"
121 |                     },
122 |                     "extract_scripts": {
123 |                         "type": "boolean",
124 |                         "description": "Whether to extract and analyze JavaScript from the response (default: true)",
125 |                         "default": True
126 |                     }
127 |                 },
128 |                 "required": ["session_id", "flow_index"]
129 |             }
130 |         )
131 |     ]
132 | 
133 | async def list_flows(arguments: dict) -> list[types.TextContent]:
134 |     """
135 |     Lists HTTP flows from a mitmproxy dump file.
136 |     """
137 |     session_id = arguments.get("session_id")
138 |     if not session_id:
139 |         return [types.TextContent(type="text", text="Error: Missing session_id")]
140 | 
141 |     try:
142 |         flows = await get_flows_from_dump(session_id)
143 | 
144 |         flow_list = []
145 |         for i, flow in enumerate(flows):
146 |             if flow.type == "http":
147 |                 request = flow.request
148 |                 response = flow.response
149 |                 flow_info = {
150 |                     "index": i,
151 |                     "method": request.method,
152 |                     "url": request.url,
153 |                     "status": response.status_code if response else None
154 |                 }
155 |                 flow_list.append(flow_info)
156 | 
157 |         return [types.TextContent(type="text", text=json.dumps(flow_list, indent=2))]
158 |     except FileNotFoundError:
159 |         return [types.TextContent(type="text", text="Error: Session not found")]
160 |     except Exception as e:
161 |         return [types.TextContent(type="text", text=f"Error reading flows: {str(e)}")]
162 | 
163 | async def get_flow_details(arguments: dict) -> list[types.TextContent]:
164 |     """
165 |     Gets details of specific flows from a mitmproxy dump file.
166 |     For large JSON content, returns structure preview instead of full content.
167 |     """
168 |     session_id = arguments.get("session_id")
169 |     flow_indexes = arguments.get("flow_indexes")
170 |     include_content = arguments.get("include_content", True)
171 | 
172 |     if not session_id:
173 |         return [types.TextContent(type="text", text="Error: Missing session_id")]
174 |     if not flow_indexes:
175 |         return [types.TextContent(type="text", text="Error: Missing flow_indexes")]
176 | 
177 |     try:
178 |         flows = await get_flows_from_dump(session_id)
179 |         flow_details_list = []
180 | 
181 |         for flow_index in flow_indexes:
182 |             try:
183 |                 flow = flows[flow_index]
184 | 
185 |                 if flow.type == "http":
186 |                     request = flow.request
187 |                     response = flow.response
188 | 
189 |                     # Parse content
190 |                     request_content = parse_json_content(request.content, dict(request.headers))
191 |                     response_content = None
192 |                     if response:
193 |                         response_content = parse_json_content(response.content, dict(response.headers))
194 |                     
195 |                     # Handle large content
196 |                     request_content_preview = None
197 |                     response_content_preview = None
198 | 
199 |                     flow_details = {}
200 |                     
201 |                     # Check if request content is large and is JSON
202 |                     if include_content and len(request.content) > MAX_CONTENT_SIZE and isinstance(request_content, dict):
203 |                         request_content_preview = generate_json_structure(request_content)
204 |                         request_content = None  # Don't include full content
205 |                     elif include_content and len(request.content) > MAX_CONTENT_SIZE:
206 |                         if isinstance(request_content, str):
207 |                             request_content = request_content[:MAX_CONTENT_SIZE] + " ...[truncated]"
208 |                         else:
209 |                             request_content = request_content[:MAX_CONTENT_SIZE].decode(errors="ignore") + " ...[truncated]"
210 |                         flow_details["request_content_note"] = f"Request content truncated to {MAX_CONTENT_SIZE} bytes."
211 |                     
212 |                     # Check if response content is large and is JSON
213 |                     if response and include_content and len(response.content) > MAX_CONTENT_SIZE and isinstance(response_content, dict):
214 |                         response_content_preview = generate_json_structure(response_content)
215 |                         response_content = None  # Don't include full content
216 |                     elif response and include_content and len(response.content) > MAX_CONTENT_SIZE:
217 |                         if isinstance(response_content, str):
218 |                             response_content = response_content[:MAX_CONTENT_SIZE] + " ...[truncated]"
219 |                         else:
220 |                             response_content = response_content[:MAX_CONTENT_SIZE].decode(errors="ignore") + " ...[truncated]"
221 |                         flow_details["response_content_note"] = f"Response content truncated to {MAX_CONTENT_SIZE} bytes."
222 | 
223 |                     # Build flow details
224 |                     flow_details.update( {
225 |                         "index": flow_index,
226 |                         "method": request.method,
227 |                         "url": request.url,
228 |                         "request_headers": dict(request.headers),
229 |                         "status": response.status_code if response else None,
230 |                         "response_headers": dict(response.headers) if response else None,
231 |                     })
232 |                     
233 |                     # Add content or previews based on size
234 |                     if include_content:
235 |                         if request_content is not None:
236 |                             flow_details["request_content"] = request_content
237 |                         if request_content_preview is not None:
238 |                             flow_details["request_content_preview"] = request_content_preview
239 |                             flow_details["request_content_size"] = len(request.content)
240 |                             flow_details["request_content_note"] = "Content too large to display. Use extract_json_fields tool to get specific values."
241 |                             
242 |                         if response_content is not None:
243 |                             flow_details["response_content"] = response_content
244 |                         if response_content_preview is not None:
245 |                             flow_details["response_content_preview"] = response_content_preview
246 |                             flow_details["response_content_size"] = len(response.content) if response else 0
247 |                             flow_details["response_content_note"] = "Content too large to display. Use extract_json_fields tool to get specific values."
248 |                     
249 |                     flow_details_list.append(flow_details)
250 |                 else:
251 |                     flow_details_list.append({"error": f"Flow {flow_index} is not an HTTP flow"})
252 | 
253 |             except IndexError:
254 |                 flow_details_list.append({"error": f"Flow index {flow_index} out of range"})
255 | 
256 |         return [types.TextContent(type="text", text=json.dumps(flow_details_list, indent=2))]
257 | 
258 |     except FileNotFoundError:
259 |         return [types.TextContent(type="text", text="Error: Session not found")]
260 |     except Exception as e:
261 |         return [types.TextContent(type="text", text=f"Error reading flow details: {str(e)}")]
262 | 
263 | async def extract_json_fields(arguments: dict) -> list[types.TextContent]:
264 |     """
265 |     Extract specific fields from JSON content in a flow using JSONPath expressions.
266 |     """
267 |     session_id = arguments.get("session_id")
268 |     flow_index = arguments.get("flow_index")
269 |     content_type = arguments.get("content_type")
270 |     json_paths = arguments.get("json_paths")
271 | 
272 |     if not session_id:
273 |         return [types.TextContent(type="text", text="Error: Missing session_id")]
274 |     if flow_index is None:
275 |         return [types.TextContent(type="text", text="Error: Missing flow_index")]
276 |     if not content_type:
277 |         return [types.TextContent(type="text", text="Error: Missing content_type")]
278 |     if not json_paths:
279 |         return [types.TextContent(type="text", text="Error: Missing json_paths")]
280 | 
281 |     try:
282 |         flows = await get_flows_from_dump(session_id)
283 |         
284 |         try:
285 |             flow = flows[flow_index]
286 |             
287 |             if flow.type != "http":
288 |                 return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")]
289 |             
290 |             request = flow.request
291 |             response = flow.response
292 |             
293 |             # Determine which content to extract from
294 |             content = None
295 |             headers = None
296 |             if content_type == "request":
297 |                 content = request.content
298 |                 headers = dict(request.headers)
299 |             elif content_type == "response":
300 |                 if not response:
301 |                     return [types.TextContent(type="text", text=f"Error: Flow {flow_index} has no response")]
302 |                 content = response.content
303 |                 headers = dict(response.headers)
304 |             else:
305 |                 return [types.TextContent(type="text", text=f"Error: Invalid content_type. Must be 'request' or 'response'")]
306 |             
307 |             # Parse the content
308 |             json_content = parse_json_content(content, headers)
309 |             
310 |             # Only extract from JSON content
311 |             if not isinstance(json_content, (dict, list)):
312 |                 return [types.TextContent(type="text", text=f"Error: The {content_type} content is not valid JSON")]
313 |             
314 |             # Extract fields
315 |             result = {}
316 |             for path in json_paths:
317 |                 try:
318 |                     extracted = extract_with_jsonpath(json_content, path)
319 |                     result[path] = extracted
320 |                 except Exception as e:
321 |                     result[path] = f"Error extracting path: {str(e)}"
322 |             
323 |             return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
324 |             
325 |         except IndexError:
326 |             return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")]
327 |             
328 |     except FileNotFoundError:
329 |         return [types.TextContent(type="text", text="Error: Session not found")]
330 |     except Exception as e:
331 |         return [types.TextContent(type="text", text=f"Error extracting JSON fields: {str(e)}")]
332 | 
333 | async def analyze_protection(arguments: dict) -> list[types.TextContent]:
334 |     """
335 |     Analyze a flow for bot protection mechanisms and extract challenge details.
336 |     """
337 |     session_id = arguments.get("session_id")
338 |     flow_index = arguments.get("flow_index")
339 |     extract_scripts = arguments.get("extract_scripts", True)
340 |     
341 |     if not session_id:
342 |         return [types.TextContent(type="text", text="Error: Missing session_id")]
343 |     if flow_index is None:
344 |         return [types.TextContent(type="text", text="Error: Missing flow_index")]
345 |     
346 |     try:
347 |         flows = await get_flows_from_dump(session_id)
348 |         
349 |         try:
350 |             flow = flows[flow_index]
351 |             
352 |             if flow.type != "http":
353 |                 return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")]
354 |             
355 |             # Analyze the flow for protection mechanisms
356 |             analysis = {
357 |                 "flow_index": flow_index,
358 |                 "method": flow.request.method,
359 |                 "url": flow.request.url,
360 |                 "protection_systems": identify_protection_system(flow),
361 |                 "request_cookies": analyze_cookies(dict(flow.request.headers)),
362 |                 "has_response": flow.response is not None,
363 |             }
364 |             
365 |             if flow.response:
366 |                 # Add response analysis
367 |                 content_type = flow.response.headers.get("Content-Type", "")
368 |                 is_html = "text/html" in content_type
369 |                 
370 |                 analysis.update({
371 |                     "status_code": flow.response.status_code,
372 |                     "response_cookies": analyze_cookies(dict(flow.response.headers)),
373 |                     "challenge_analysis": analyze_response_for_challenge(flow),
374 |                     "content_type": content_type,
375 |                     "is_html": is_html,
376 |                 })
377 |                 
378 |                 # If HTML and script extraction is requested, extract and analyze JavaScript
379 |                 if is_html and extract_scripts:
380 |                     try:
381 |                         html_content = flow.response.content.decode('utf-8', errors='ignore')
382 |                         analysis["scripts"] = extract_javascript(html_content)
383 |                     except Exception as e:
384 |                         analysis["script_extraction_error"] = str(e)
385 |             
386 |             # Add remediation suggestions based on findings
387 |             analysis["suggestions"] = generate_suggestions(analysis)
388 |             
389 |             return [types.TextContent(type="text", text=json.dumps(analysis, indent=2))]
390 |             
391 |         except IndexError:
392 |             return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")]
393 |             
394 |     except FileNotFoundError:
395 |         return [types.TextContent(type="text", text="Error: Session not found")]
396 |     except Exception as e:
397 |         return [types.TextContent(type="text", text=f"Error analyzing protection: {str(e)}")]
398 | 
399 | @server.call_tool()
400 | async def handle_call_tool(
401 |     name: str, arguments: dict | None
402 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
403 |     """
404 |     Handle tool execution requests.
405 |     Delegates to specific functions based on the tool name.
406 |     """
407 |     if not arguments:
408 |         raise ValueError("Missing arguments")
409 | 
410 |     if name == "list_flows":
411 |         return await list_flows(arguments)
412 |     elif name == "get_flow_details":
413 |         return await get_flow_details(arguments)
414 |     elif name == "extract_json_fields":
415 |         return await extract_json_fields(arguments)
416 |     elif name == "analyze_protection":
417 |         return await analyze_protection(arguments)
418 |     else:
419 |         raise ValueError(f"Unknown tool: {name}")
420 | 
421 | async def main():
422 |     # Run the server using stdin/stdout streams
423 |     async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
424 |         await server.run(
425 |             read_stream,
426 |             write_stream,
427 |             server.create_initialization_options(),
428 |         )
```