# Directory Structure
```
├── .gitignore
├── .python-version
├── pyproject.toml
├── README.md
├── src
│ └── mitmproxy_mcp
│ ├── __init__.py
│ ├── flow_utils.py
│ ├── json_utils.py
│ ├── protection_analysis.py
│ └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.11
2 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python-generated files
2 | __pycache__/
3 | *.py[oc]
4 | build/
5 | dist/
6 | wheels/
7 | *.egg-info
8 |
9 | # Virtual environments
10 | .venv
11 |
12 | # mitmdumps
13 |
14 | dumps
15 |
16 | playground*
17 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # mitmproxy-mcp MCP server
2 |
3 | A MCP server project
4 |
5 | ## Components
6 |
7 | ### Resources
8 |
9 | The server implements a simple note storage system with:
10 | - Custom note:// URI scheme for accessing individual notes
11 | - Each note resource has a name, description and text/plain mimetype
12 |
13 | ### Prompts
14 |
15 | The server provides a single prompt:
16 | - summarize-notes: Creates summaries of all stored notes
17 | - Optional "style" argument to control detail level (brief/detailed)
18 | - Generates prompt combining all current notes with style preference
19 |
20 | ### Tools
21 |
22 | The server implements one tool:
23 | - add-note: Adds a new note to the server
24 | - Takes "name" and "content" as required string arguments
25 | - Updates server state and notifies clients of resource changes
26 |
27 | ## Configuration
28 |
29 | [TODO: Add configuration details specific to your implementation]
30 |
31 | ## Quickstart
32 |
33 | ### Install
34 |
35 | #### Claude Desktop
36 |
37 | On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
38 | On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
39 |
40 | <details>
41 | <summary>Development/Unpublished Servers Configuration</summary>
42 | ```
43 | "mcpServers": {
44 | "mitmproxy-mcp": {
45 | "command": "uv",
46 | "args": [
47 | "--directory",
48 | "/Users/lucas/Coding/mitmproxy-mcp",
49 | "run",
50 | "mitmproxy-mcp"
51 | ]
52 | }
53 | }
54 | ```
55 | </details>
56 |
57 | <details>
58 | <summary>Published Servers Configuration</summary>
59 | ```
60 | "mcpServers": {
61 | "mitmproxy-mcp": {
62 | "command": "uvx",
63 | "args": [
64 | "mitmproxy-mcp"
65 | ]
66 | }
67 | }
68 | ```
69 | </details>
70 |
71 | ## Development
72 |
73 | ### Building and Publishing
74 |
75 | To prepare the package for distribution:
76 |
77 | 1. Sync dependencies and update lockfile:
78 | ```bash
79 | uv sync
80 | ```
81 |
82 | 2. Build package distributions:
83 | ```bash
84 | uv build
85 | ```
86 |
87 | This will create source and wheel distributions in the `dist/` directory.
88 |
89 | 3. Publish to PyPI:
90 | ```bash
91 | uv publish
92 | ```
93 |
94 | Note: You'll need to set PyPI credentials via environment variables or command flags:
95 | - Token: `--token` or `UV_PUBLISH_TOKEN`
96 | - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
97 |
98 | ### Debugging
99 |
100 | Since MCP servers run over stdio, debugging can be challenging. For the best debugging
101 | experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
102 |
103 |
104 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
105 |
106 | ```bash
107 | npx @modelcontextprotocol/inspector uv --directory /Users/lucas/Coding/mitmproxy-mcp run mitmproxy-mcp
108 | ```
109 |
110 |
111 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```
--------------------------------------------------------------------------------
/src/mitmproxy_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
1 | from . import server
2 | import asyncio
3 |
4 | def main():
5 | """Main entry point for the package."""
6 | asyncio.run(server.main())
7 |
8 | # Optionally expose other important items at package level
9 | __all__ = ['main', 'server']
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "mitmproxy-mcp"
3 | version = "0.1.0"
4 | description = "A MCP server project"
5 | readme = "README.md"
6 | requires-python = ">=3.11"
7 | dependencies = [
8 | "mcp>=1.3.0",
9 | "mitmproxy>=11.0.2",
10 | ]
11 | [[project.authors]]
12 | name = "Lucas Soeth"
13 | email = "[email protected]"
14 |
15 | [build-system]
16 | requires = [ "hatchling",]
17 | build-backend = "hatchling.build"
18 |
19 | [project.scripts]
20 | mitmproxy-mcp = "mitmproxy_mcp:main"
21 |
```
--------------------------------------------------------------------------------
/src/mitmproxy_mcp/flow_utils.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import json
3 | from typing import Any, Dict, List, Union
4 | from mitmproxy import io
5 |
6 | # Directory where mitmproxy dump files are stored
7 | DUMP_DIR = "/Users/lucas/Coding/mitmproxy-mcp/dumps"
8 |
9 | # Cache for storing flows per session
10 | FLOW_CACHE = {}
11 |
12 | async def get_flows_from_dump(session_id: str) -> list:
13 | """
14 | Retrieves flows from the dump file, using the cache if available.
15 | """
16 | dump_file = os.path.join(DUMP_DIR, f"{session_id}.dump")
17 | if not os.path.exists(dump_file):
18 | raise FileNotFoundError("Session not found")
19 |
20 | if session_id in FLOW_CACHE:
21 | return FLOW_CACHE[session_id]
22 | else:
23 | with open(dump_file, "rb") as f:
24 | reader = io.FlowReader(f)
25 | flows = list(reader.stream())
26 | FLOW_CACHE[session_id] = flows
27 | return flows
28 |
29 | def parse_json_content(content: bytes, headers: dict) -> Union[Dict, str, bytes]:
30 | """
31 | Attempts to parse content as JSON if the content type indicates JSON.
32 | Returns the parsed JSON or the raw content if parsing fails.
33 | """
34 | content_type = headers.get("Content-Type", "").lower() if headers else ""
35 |
36 | if "application/json" in content_type or "text/json" in content_type:
37 | try:
38 | return json.loads(content.decode(errors="ignore"))
39 | except json.JSONDecodeError:
40 | return content.decode(errors="ignore")
41 | return content.decode(errors="ignore")
42 |
```
--------------------------------------------------------------------------------
/src/mitmproxy_mcp/json_utils.py:
--------------------------------------------------------------------------------
```python
1 | from typing import Any, Dict, List, Union
2 |
3 | def generate_json_structure(json_data: Any, max_depth: int = 2, current_depth: int = 0) -> Any:
4 | """
5 | Generate a simplified structure of JSON content, showing keys and types
6 | but replacing actual values with type indicators after a certain depth.
7 | """
8 | if current_depth >= max_depth:
9 | if isinstance(json_data, dict):
10 | return {"...": f"{len(json_data)} keys"}
11 | elif isinstance(json_data, list):
12 | return f"[{len(json_data)} items]"
13 | else:
14 | return f"({type(json_data).__name__})"
15 |
16 | if isinstance(json_data, dict):
17 | result = {}
18 | for key, value in json_data.items():
19 | result[key] = generate_json_structure(value, max_depth, current_depth + 1)
20 | return result
21 | elif isinstance(json_data, list):
22 | if not json_data:
23 | return []
24 | # For lists, show structure of first item and count
25 | sample = generate_json_structure(json_data[0], max_depth, current_depth + 1)
26 | return [sample, f"... ({len(json_data)-1} more items)"] if len(json_data) > 1 else [sample]
27 | else:
28 | return f"({type(json_data).__name__})"
29 |
30 | def extract_with_jsonpath(json_data: Any, path: str) -> Any:
31 | """
32 | Basic implementation of JSONPath extraction.
33 | Supports simple dot notation and array indexing.
34 | For more complex cases, consider using a full JSONPath library.
35 | """
36 | # Handle root object reference
37 | if path == "$":
38 | return json_data
39 |
40 | # Strip leading $ if present
41 | if path.startswith("$"):
42 | path = path[1:]
43 | if path.startswith("."):
44 | path = path[1:]
45 |
46 | parts = []
47 | # Parse the path - handle both dot notation and brackets
48 | current = ""
49 | in_brackets = False
50 | for char in path:
51 | if char == "[":
52 | if current:
53 | parts.append(current)
54 | current = ""
55 | in_brackets = True
56 | elif char == "]":
57 | if in_brackets:
58 | try:
59 | # Handle array index
60 | parts.append(int(current.strip()))
61 | except ValueError:
62 | # Handle quoted key
63 | quoted = current.strip()
64 | if (quoted.startswith("'") and quoted.endswith("'")) or \
65 | (quoted.startswith('"') and quoted.endswith('"')):
66 | parts.append(quoted[1:-1])
67 | else:
68 | parts.append(quoted)
69 | current = ""
70 | in_brackets = False
71 | elif char == "." and not in_brackets:
72 | if current:
73 | parts.append(current)
74 | current = ""
75 | else:
76 | current += char
77 |
78 | if current:
79 | parts.append(current)
80 |
81 | # Navigate through the data
82 | result = json_data
83 | for part in parts:
84 | try:
85 | if isinstance(result, dict):
86 | result = result.get(part)
87 | elif isinstance(result, list) and isinstance(part, int):
88 | if 0 <= part < len(result):
89 | result = result[part]
90 | else:
91 | return None
92 | else:
93 | return None
94 |
95 | if result is None:
96 | break
97 | except Exception:
98 | return None
99 |
100 | return result
101 |
```
--------------------------------------------------------------------------------
/src/mitmproxy_mcp/protection_analysis.py:
--------------------------------------------------------------------------------
```python
1 | import re
2 | from typing import Any, Dict, List
3 |
4 | # Known bot protection systems and their signatures
5 | BOT_PROTECTION_SIGNATURES = {
6 | "Cloudflare": [
7 | r"cf-ray", # Cloudflare Ray ID header
8 | r"__cf_bm", # Cloudflare Bot Management cookie
9 | r"cf_clearance", # Cloudflare challenge clearance cookie
10 | r"\"why_captcha\"", # Common in Cloudflare challenge responses
11 | r"challenge-platform", # Used in challenge scripts
12 | r"turnstile\.js", # Cloudflare Turnstile
13 | ],
14 | "Akamai Bot Manager": [
15 | r"_abck=", # Akamai Bot Manager cookie
16 | r"akam_", # Akamai cookie prefix
17 | r"bm_sz", # Bot Manager cookie
18 | r"sensor_data", # Bot detection data
19 | ],
20 | "PerimeterX": [
21 | r"_px\d?=", # PerimeterX cookies
22 | r"px\.js", # PerimeterX script
23 | r"px-captcha", # PerimeterX captcha
24 | ],
25 | "DataDome": [
26 | r"datadome=", # DataDome cookie
27 | r"datadome\.js", # DataDome script
28 | r"_dd_s", # DataDome session cookie
29 | ],
30 | "reCAPTCHA": [
31 | r"google\.com/recaptcha",
32 | r"recaptcha\.net",
33 | r"g-recaptcha",
34 | ],
35 | "hCaptcha": [
36 | r"hcaptcha\.com",
37 | r"h-captcha",
38 | ],
39 | "Generic Bot Detection": [
40 | r"bot=", # Generic bot cookie
41 | r"captcha", # Generic captcha reference
42 | r"challenge", # Generic challenge term
43 | r"detected automated traffic", # Common message
44 | r"verify you are human", # Common message
45 | ]
46 | }
47 |
48 | def extract_javascript(html_content: str) -> List[Dict[str, Any]]:
49 | """
50 | Extract JavaScript from HTML content and provide basic analysis.
51 | Returns list of dictionaries with script info.
52 | """
53 | scripts = []
54 |
55 | # Extract inline scripts
56 | inline_pattern = r'<script[^>]*>(.*?)</script>'
57 | inline_scripts = re.findall(inline_pattern, html_content, re.DOTALL)
58 |
59 | for i, script in enumerate(inline_scripts):
60 | if len(script.strip()) > 0:
61 | script_info = {
62 | "type": "inline",
63 | "index": i,
64 | "size": len(script),
65 | "content": script if len(script) < 1000 else script[:1000] + "... [truncated]",
66 | "summary": analyze_script(script)
67 | }
68 | scripts.append(script_info)
69 |
70 | # Extract external script references
71 | src_pattern = r'<script[^>]*src=[\'"]([^\'"]+)[\'"][^>]*>'
72 | external_scripts = re.findall(src_pattern, html_content)
73 |
74 | for i, src in enumerate(external_scripts):
75 | script_info = {
76 | "type": "external",
77 | "index": i,
78 | "src": src,
79 | "suspicious": any(term in src.lower() for term in [
80 | "captcha", "challenge", "bot", "protect", "security",
81 | "verify", "check", "shield", "defend", "guard"
82 | ])
83 | }
84 | scripts.append(script_info)
85 |
86 | return scripts
87 |
88 | def analyze_script(script: str) -> Dict[str, Any]:
89 | """
90 | Analyze JavaScript content for common protection patterns.
91 | """
92 | analysis = {
93 | "potential_protection": False,
94 | "fingerprinting_indicators": [],
95 | "token_generation_indicators": [],
96 | "obfuscation_level": "none",
97 | "key_functions": []
98 | }
99 |
100 | # Check for fingerprinting techniques
101 | fingerprinting_patterns = [
102 | (r'navigator\.', "Browser navigator object"),
103 | (r'screen\.', "Screen properties"),
104 | (r'canvas', "Canvas fingerprinting"),
105 | (r'webgl', "WebGL fingerprinting"),
106 | (r'font', "Font enumeration"),
107 | (r'audio', "Audio fingerprinting"),
108 | (r'plugins', "Plugin enumeration"),
109 | (r'User-Agent', "User-Agent checking"),
110 | (r'platform', "Platform detection")
111 | ]
112 |
113 | for pattern, description in fingerprinting_patterns:
114 | if re.search(pattern, script, re.IGNORECASE):
115 | analysis["fingerprinting_indicators"].append(description)
116 |
117 | # Check for token generation
118 | token_patterns = [
119 | (r'(token|captcha|challenge|clearance)', "Token/challenge reference"),
120 | (r'(generate|calculate|compute)', "Computation terms"),
121 | (r'(Math\.random|crypto)', "Random generation"),
122 | (r'(cookie|setCookie|document\.cookie)', "Cookie manipulation"),
123 | (r'(xhr|XMLHttpRequest|fetch)', "Request sending")
124 | ]
125 |
126 | for pattern, description in token_patterns:
127 | if re.search(pattern, script, re.IGNORECASE):
128 | analysis["token_generation_indicators"].append(description)
129 |
130 | # Check for common obfuscation techniques
131 | if len(re.findall(r'eval\(', script)) > 3:
132 | analysis["obfuscation_level"] = "high"
133 | elif len(re.findall(r'\\x[0-9a-f]{2}', script)) > 10:
134 | analysis["obfuscation_level"] = "high"
135 | elif len(re.findall(r'String\.fromCharCode', script)) > 3:
136 | analysis["obfuscation_level"] = "high"
137 | elif re.search(r'function\(\w{1,2},\w{1,2},\w{1,2}\)\{', script):
138 | analysis["obfuscation_level"] = "medium"
139 | elif sum(1 for c in script if c == ';') > len(script) / 10:
140 | analysis["obfuscation_level"] = "medium"
141 | elif sum(len(w) > 30 for w in re.findall(r'\w+', script)) > 10:
142 | analysis["obfuscation_level"] = "medium"
143 |
144 | # Extract potential key function names
145 | function_pattern = r'function\s+(\w+)\s*\('
146 | functions = re.findall(function_pattern, script)
147 |
148 | suspicious_terms = ["challenge", "token", "captcha", "verify", "bot", "check", "security"]
149 | for func in functions:
150 | if any(term in func.lower() for term in suspicious_terms):
151 | analysis["key_functions"].append(func)
152 |
153 | # Determine if this is potentially protection-related
154 | analysis["potential_protection"] = (
155 | len(analysis["fingerprinting_indicators"]) > 2 or
156 | len(analysis["token_generation_indicators"]) > 2 or
157 | analysis["obfuscation_level"] != "none" or
158 | len(analysis["key_functions"]) > 0
159 | )
160 |
161 | return analysis
162 |
163 | def analyze_cookies(headers: Dict[str, str]) -> List[Dict[str, Any]]:
164 | """
165 | Analyze cookies for common protection-related patterns.
166 | """
167 | cookie_header = headers.get("Cookie", "") or headers.get("Set-Cookie", "")
168 | if not cookie_header:
169 | return []
170 |
171 | # Split multiple cookies
172 | cookies = []
173 | for cookie_str in cookie_header.split(";"):
174 | parts = cookie_str.strip().split("=", 1)
175 | if len(parts) == 2:
176 | name, value = parts
177 | cookie = {
178 | "name": name.strip(),
179 | "value": value.strip() if len(value.strip()) < 50 else value.strip()[:50] + "... [truncated]",
180 | "protection_related": False,
181 | "vendor": "unknown"
182 | }
183 |
184 | # Check if this is a known protection cookie
185 | for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
186 | for sig in signatures:
187 | if re.search(sig, name, re.IGNORECASE):
188 | cookie["protection_related"] = True
189 | cookie["vendor"] = vendor
190 | break
191 | if cookie["protection_related"]:
192 | break
193 |
194 | cookies.append(cookie)
195 |
196 | return cookies
197 |
198 | def identify_protection_system(flow) -> List[Dict[str, Any]]:
199 | """
200 | Identify potential bot protection systems based on signatures.
201 | """
202 | protections = []
203 |
204 | # Combine all searchable content
205 | searchable_content = ""
206 | # Add request headers
207 | for k, v in flow.request.headers.items():
208 | searchable_content += f"{k}: {v}\n"
209 |
210 | # Check response if available
211 | if flow.response:
212 | # Add response headers
213 | for k, v in flow.response.headers.items():
214 | searchable_content += f"{k}: {v}\n"
215 |
216 | # Add response content if it's text
217 | content_type = flow.response.headers.get("Content-Type", "")
218 | if "text" in content_type or "javascript" in content_type or "json" in content_type:
219 | try:
220 | searchable_content += flow.response.content.decode('utf-8', errors='ignore')
221 | except Exception:
222 | pass
223 |
224 | # Check for protection signatures
225 | for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
226 | matches = []
227 | for sig in signatures:
228 | if re.search(sig, searchable_content, re.IGNORECASE):
229 | matches.append(sig)
230 |
231 | if matches:
232 | protections.append({
233 | "vendor": vendor,
234 | "confidence": len(matches) / len(signatures) * 100,
235 | "matching_signatures": matches
236 | })
237 |
238 | return sorted(protections, key=lambda x: x["confidence"], reverse=True)
239 |
240 | def analyze_response_for_challenge(flow) -> Dict[str, Any]:
241 | """
242 | Analyze a response to determine if it contains a challenge.
243 | """
244 | if not flow.response:
245 | return {"is_challenge": False}
246 |
247 | result = {
248 | "is_challenge": False,
249 | "challenge_indicators": [],
250 | "status_code": flow.response.status_code,
251 | "challenge_type": "unknown"
252 | }
253 |
254 | # Check status code
255 | if flow.response.status_code in [403, 429, 503]:
256 | result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}")
257 |
258 | # Check for challenge headers
259 | challenge_headers = {
260 | "cf-mitigated": "Cloudflare mitigation",
261 | "cf-chl-bypass": "Cloudflare challenge bypass",
262 | "x-datadome": "DataDome protection",
263 | "x-px": "PerimeterX",
264 | "x-amz-captcha": "AWS WAF Captcha"
265 | }
266 |
267 | for header, description in challenge_headers.items():
268 | if any(h.lower() == header.lower() for h in flow.response.headers.keys()):
269 | result["challenge_indicators"].append(f"Challenge header: {description}")
270 |
271 | # Check for challenge content patterns
272 | content = flow.response.content.decode('utf-8', errors='ignore')
273 | challenge_patterns = [
274 | (r'captcha', "CAPTCHA"),
275 | (r'challenge', "Challenge term"),
276 | (r'blocked', "Blocking message"),
277 | (r'verify.*human', "Human verification"),
278 | (r'suspicious.*activity', "Suspicious activity message"),
279 | (r'security.*check', "Security check message"),
280 | (r'ddos', "DDoS protection message"),
281 | (r'automated.*request', "Automated request detection")
282 | ]
283 |
284 | for pattern, description in challenge_patterns:
285 | if re.search(pattern, content, re.IGNORECASE):
286 | result["challenge_indicators"].append(f"Content indicator: {description}")
287 |
288 | # Determine if this is a challenge response
289 | result["is_challenge"] = len(result["challenge_indicators"]) > 0
290 |
291 | # Determine challenge type
292 | if "CAPTCHA" in " ".join(result["challenge_indicators"]):
293 | result["challenge_type"] = "captcha"
294 | elif "JavaScript" in content and result["is_challenge"]:
295 | result["challenge_type"] = "javascript"
296 | elif result["is_challenge"]:
297 | result["challenge_type"] = "other"
298 |
299 | return result
300 |
301 | def generate_suggestions(analysis: Dict[str, Any]) -> List[str]:
302 | """
303 | Generate remediation suggestions based on the protection analysis.
304 | """
305 | suggestions = []
306 |
307 | # Check if any protection system was detected
308 | if analysis.get("protection_systems"):
309 | top_system = analysis["protection_systems"][0]["vendor"]
310 | confidence = analysis["protection_systems"][0]["confidence"]
311 |
312 | if confidence > 50:
313 | suggestions.append(f"Detected {top_system} with {confidence:.1f}% confidence.")
314 |
315 | # Add vendor-specific suggestions
316 | if "Cloudflare" in top_system:
317 | suggestions.append("Cloudflare often uses JavaScript challenges. Check for cf_clearance cookie.")
318 | suggestions.append("Consider using proven techniques like cfscrape or cloudscraper libraries.")
319 | elif "Akamai" in top_system:
320 | suggestions.append("Akamai uses sensor_data for browser fingerprinting.")
321 | suggestions.append("Focus on _abck cookie which contains browser verification data.")
322 | elif "PerimeterX" in top_system:
323 | suggestions.append("PerimeterX relies on JavaScript execution and browser fingerprinting.")
324 | suggestions.append("Look for _px cookies which are essential for session validation.")
325 | elif "DataDome" in top_system:
326 | suggestions.append("DataDome uses advanced behavioral and fingerprinting techniques.")
327 | suggestions.append("The datadome cookie is critical for maintaining sessions.")
328 | elif "CAPTCHA" in top_system:
329 | suggestions.append("This site uses CAPTCHA challenges which may require manual solving or specialized services.")
330 |
331 | # Add suggestions based on challenge type
332 | if analysis.get("challenge_analysis", {}).get("is_challenge", False):
333 | challenge_type = analysis.get("challenge_analysis", {}).get("challenge_type", "unknown")
334 |
335 | if challenge_type == "javascript":
336 | suggestions.append("This response contains a JavaScript challenge that must be solved.")
337 | suggestions.append("Consider using a headless browser to execute the challenge JavaScript.")
338 |
339 | # If we have script analysis, add more specific suggestions
340 | if "scripts" in analysis:
341 | obfuscated_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("obfuscation_level") in ["medium", "high"]]
342 | if obfuscated_scripts:
343 | suggestions.append(f"Found {len(obfuscated_scripts)} obfuscated script(s) that likely contain challenge logic.")
344 |
345 | fingerprinting_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("fingerprinting_indicators")]
346 | if fingerprinting_scripts:
347 | techniques = set()
348 | for script in fingerprinting_scripts:
349 | techniques.update(script.get("summary", {}).get("fingerprinting_indicators", []))
350 | suggestions.append(f"Detected browser fingerprinting techniques: {', '.join(techniques)}.")
351 |
352 | elif challenge_type == "captcha":
353 | suggestions.append("This response contains a CAPTCHA challenge.")
354 | suggestions.append("Consider using a CAPTCHA solving service or manual intervention.")
355 |
356 | # Check for important cookies
357 | protection_cookies = [c for c in analysis.get("response_cookies", []) if c.get("protection_related")]
358 | if protection_cookies:
359 | cookie_names = [c["name"] for c in protection_cookies]
360 | suggestions.append(f"Important protection cookies to maintain: {', '.join(cookie_names)}.")
361 |
362 | # General suggestions
363 | if analysis.get("protection_systems") or analysis.get("challenge_analysis", {}).get("is_challenge", False):
364 | suggestions.append("General recommendations:")
365 | suggestions.append("- Maintain consistent User-Agent between requests")
366 | suggestions.append("- Preserve all cookies from the session")
367 | suggestions.append("- Add appropriate referer and origin headers")
368 | suggestions.append("- Consider adding delays between requests to avoid rate limiting")
369 | suggestions.append("- Use rotating IP addresses if available")
370 |
371 | return suggestions
372 |
```
--------------------------------------------------------------------------------
/src/mitmproxy_mcp/server.py:
--------------------------------------------------------------------------------
```python
1 | import asyncio
2 | import os
3 | import json
4 | import re
5 | import base64
6 | from typing import Any, Dict, List, Optional, Union, Tuple
7 | from mitmproxy import io
8 |
9 | from mcp.server.models import InitializationOptions
10 | import mcp.types as types
11 | from mcp.server import NotificationOptions, Server
12 | import mcp.server.stdio
13 |
14 | from mitmproxy_mcp.flow_utils import get_flows_from_dump, parse_json_content
15 | from mitmproxy_mcp.json_utils import generate_json_structure, extract_with_jsonpath
16 | from mitmproxy_mcp.protection_analysis import (
17 | analyze_response_for_challenge,
18 | analyze_script,
19 | analyze_cookies,
20 | extract_javascript,
21 | generate_suggestions,
22 | identify_protection_system,
23 | BOT_PROTECTION_SIGNATURES,
24 | )
25 |
26 | # Maximum content size in bytes before switching to structure preview
27 | MAX_CONTENT_SIZE = 2000
28 |
29 | server = Server("mitmproxy-mcp")
30 |
31 | @server.list_tools()
32 | async def handle_list_tools() -> list[types.Tool]:
33 | """
34 | List available tools.
35 | Each tool specifies its arguments using JSON Schema validation.
36 | """
37 | return [
38 | types.Tool(
39 | name="list_flows",
40 | description="Retrieves detailed HTTP request/response data including headers, content (or structure preview for large JSON), and metadata from specified flows",
41 | inputSchema={
42 | "type": "object",
43 | "properties": {
44 | "session_id": {
45 | "type": "string",
46 | "description": "The ID of the session to list flows from"
47 | }
48 | },
49 | "required": ["session_id"]
50 | }
51 | ),
52 | types.Tool(
53 | name="get_flow_details",
54 | description="Lists HTTP requests/responses from a mitmproxy capture session, showing method, URL, and status codes",
55 | inputSchema={
56 | "type": "object",
57 | "properties": {
58 | "session_id": {
59 | "type": "string",
60 | "description": "The ID of the session"
61 | },
62 | "flow_indexes": {
63 | "type": "array",
64 | "items": {
65 | "type": "integer"
66 | },
67 | "description": "The indexes of the flows"
68 | },
69 | "include_content": {
70 | "type": "boolean",
71 | "description": "Whether to include full content in the response (default: true)",
72 | "default": True
73 | }
74 | },
75 | "required": ["session_id", "flow_indexes"]
76 | }
77 | ),
78 | types.Tool(
79 | name="extract_json_fields",
80 | description="Extract specific fields from JSON content in a flow using JSONPath expressions",
81 | inputSchema={
82 | "type": "object",
83 | "properties": {
84 | "session_id": {
85 | "type": "string",
86 | "description": "The ID of the session"
87 | },
88 | "flow_index": {
89 | "type": "integer",
90 | "description": "The index of the flow"
91 | },
92 | "content_type": {
93 | "type": "string",
94 | "enum": ["request", "response"],
95 | "description": "Whether to extract from request or response content"
96 | },
97 | "json_paths": {
98 | "type": "array",
99 | "items": {
100 | "type": "string"
101 | },
102 | "description": "JSONPath expressions to extract (e.g. ['$.data.users', '$.metadata.timestamp'])"
103 | }
104 | },
105 | "required": ["session_id", "flow_index", "content_type", "json_paths"]
106 | }
107 | ),
108 | types.Tool(
109 | name="analyze_protection",
110 | description="Analyze flow for bot protection mechanisms and extract challenge details",
111 | inputSchema={
112 | "type": "object",
113 | "properties": {
114 | "session_id": {
115 | "type": "string",
116 | "description": "The ID of the session"
117 | },
118 | "flow_index": {
119 | "type": "integer",
120 | "description": "The index of the flow to analyze"
121 | },
122 | "extract_scripts": {
123 | "type": "boolean",
124 | "description": "Whether to extract and analyze JavaScript from the response (default: true)",
125 | "default": True
126 | }
127 | },
128 | "required": ["session_id", "flow_index"]
129 | }
130 | )
131 | ]
132 |
133 | async def list_flows(arguments: dict) -> list[types.TextContent]:
134 | """
135 | Lists HTTP flows from a mitmproxy dump file.
136 | """
137 | session_id = arguments.get("session_id")
138 | if not session_id:
139 | return [types.TextContent(type="text", text="Error: Missing session_id")]
140 |
141 | try:
142 | flows = await get_flows_from_dump(session_id)
143 |
144 | flow_list = []
145 | for i, flow in enumerate(flows):
146 | if flow.type == "http":
147 | request = flow.request
148 | response = flow.response
149 | flow_info = {
150 | "index": i,
151 | "method": request.method,
152 | "url": request.url,
153 | "status": response.status_code if response else None
154 | }
155 | flow_list.append(flow_info)
156 |
157 | return [types.TextContent(type="text", text=json.dumps(flow_list, indent=2))]
158 | except FileNotFoundError:
159 | return [types.TextContent(type="text", text="Error: Session not found")]
160 | except Exception as e:
161 | return [types.TextContent(type="text", text=f"Error reading flows: {str(e)}")]
162 |
163 | async def get_flow_details(arguments: dict) -> list[types.TextContent]:
164 | """
165 | Gets details of specific flows from a mitmproxy dump file.
166 | For large JSON content, returns structure preview instead of full content.
167 | """
168 | session_id = arguments.get("session_id")
169 | flow_indexes = arguments.get("flow_indexes")
170 | include_content = arguments.get("include_content", True)
171 |
172 | if not session_id:
173 | return [types.TextContent(type="text", text="Error: Missing session_id")]
174 | if not flow_indexes:
175 | return [types.TextContent(type="text", text="Error: Missing flow_indexes")]
176 |
177 | try:
178 | flows = await get_flows_from_dump(session_id)
179 | flow_details_list = []
180 |
181 | for flow_index in flow_indexes:
182 | try:
183 | flow = flows[flow_index]
184 |
185 | if flow.type == "http":
186 | request = flow.request
187 | response = flow.response
188 |
189 | # Parse content
190 | request_content = parse_json_content(request.content, dict(request.headers))
191 | response_content = None
192 | if response:
193 | response_content = parse_json_content(response.content, dict(response.headers))
194 |
195 | # Handle large content
196 | request_content_preview = None
197 | response_content_preview = None
198 |
199 | flow_details = {}
200 |
201 | # Check if request content is large and is JSON
202 | if include_content and len(request.content) > MAX_CONTENT_SIZE and isinstance(request_content, dict):
203 | request_content_preview = generate_json_structure(request_content)
204 | request_content = None # Don't include full content
205 | elif include_content and len(request.content) > MAX_CONTENT_SIZE:
206 | if isinstance(request_content, str):
207 | request_content = request_content[:MAX_CONTENT_SIZE] + " ...[truncated]"
208 | else:
209 | request_content = request_content[:MAX_CONTENT_SIZE].decode(errors="ignore") + " ...[truncated]"
210 | flow_details["request_content_note"] = f"Request content truncated to {MAX_CONTENT_SIZE} bytes."
211 |
212 | # Check if response content is large and is JSON
213 | if response and include_content and len(response.content) > MAX_CONTENT_SIZE and isinstance(response_content, dict):
214 | response_content_preview = generate_json_structure(response_content)
215 | response_content = None # Don't include full content
216 | elif response and include_content and len(response.content) > MAX_CONTENT_SIZE:
217 | if isinstance(response_content, str):
218 | response_content = response_content[:MAX_CONTENT_SIZE] + " ...[truncated]"
219 | else:
220 | response_content = response_content[:MAX_CONTENT_SIZE].decode(errors="ignore") + " ...[truncated]"
221 | flow_details["response_content_note"] = f"Response content truncated to {MAX_CONTENT_SIZE} bytes."
222 |
223 | # Build flow details
224 | flow_details.update( {
225 | "index": flow_index,
226 | "method": request.method,
227 | "url": request.url,
228 | "request_headers": dict(request.headers),
229 | "status": response.status_code if response else None,
230 | "response_headers": dict(response.headers) if response else None,
231 | })
232 |
233 | # Add content or previews based on size
234 | if include_content:
235 | if request_content is not None:
236 | flow_details["request_content"] = request_content
237 | if request_content_preview is not None:
238 | flow_details["request_content_preview"] = request_content_preview
239 | flow_details["request_content_size"] = len(request.content)
240 | flow_details["request_content_note"] = "Content too large to display. Use extract_json_fields tool to get specific values."
241 |
242 | if response_content is not None:
243 | flow_details["response_content"] = response_content
244 | if response_content_preview is not None:
245 | flow_details["response_content_preview"] = response_content_preview
246 | flow_details["response_content_size"] = len(response.content) if response else 0
247 | flow_details["response_content_note"] = "Content too large to display. Use extract_json_fields tool to get specific values."
248 |
249 | flow_details_list.append(flow_details)
250 | else:
251 | flow_details_list.append({"error": f"Flow {flow_index} is not an HTTP flow"})
252 |
253 | except IndexError:
254 | flow_details_list.append({"error": f"Flow index {flow_index} out of range"})
255 |
256 | return [types.TextContent(type="text", text=json.dumps(flow_details_list, indent=2))]
257 |
258 | except FileNotFoundError:
259 | return [types.TextContent(type="text", text="Error: Session not found")]
260 | except Exception as e:
261 | return [types.TextContent(type="text", text=f"Error reading flow details: {str(e)}")]
262 |
263 | async def extract_json_fields(arguments: dict) -> list[types.TextContent]:
264 | """
265 | Extract specific fields from JSON content in a flow using JSONPath expressions.
266 | """
267 | session_id = arguments.get("session_id")
268 | flow_index = arguments.get("flow_index")
269 | content_type = arguments.get("content_type")
270 | json_paths = arguments.get("json_paths")
271 |
272 | if not session_id:
273 | return [types.TextContent(type="text", text="Error: Missing session_id")]
274 | if flow_index is None:
275 | return [types.TextContent(type="text", text="Error: Missing flow_index")]
276 | if not content_type:
277 | return [types.TextContent(type="text", text="Error: Missing content_type")]
278 | if not json_paths:
279 | return [types.TextContent(type="text", text="Error: Missing json_paths")]
280 |
281 | try:
282 | flows = await get_flows_from_dump(session_id)
283 |
284 | try:
285 | flow = flows[flow_index]
286 |
287 | if flow.type != "http":
288 | return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")]
289 |
290 | request = flow.request
291 | response = flow.response
292 |
293 | # Determine which content to extract from
294 | content = None
295 | headers = None
296 | if content_type == "request":
297 | content = request.content
298 | headers = dict(request.headers)
299 | elif content_type == "response":
300 | if not response:
301 | return [types.TextContent(type="text", text=f"Error: Flow {flow_index} has no response")]
302 | content = response.content
303 | headers = dict(response.headers)
304 | else:
305 | return [types.TextContent(type="text", text=f"Error: Invalid content_type. Must be 'request' or 'response'")]
306 |
307 | # Parse the content
308 | json_content = parse_json_content(content, headers)
309 |
310 | # Only extract from JSON content
311 | if not isinstance(json_content, (dict, list)):
312 | return [types.TextContent(type="text", text=f"Error: The {content_type} content is not valid JSON")]
313 |
314 | # Extract fields
315 | result = {}
316 | for path in json_paths:
317 | try:
318 | extracted = extract_with_jsonpath(json_content, path)
319 | result[path] = extracted
320 | except Exception as e:
321 | result[path] = f"Error extracting path: {str(e)}"
322 |
323 | return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
324 |
325 | except IndexError:
326 | return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")]
327 |
328 | except FileNotFoundError:
329 | return [types.TextContent(type="text", text="Error: Session not found")]
330 | except Exception as e:
331 | return [types.TextContent(type="text", text=f"Error extracting JSON fields: {str(e)}")]
332 |
333 | async def analyze_protection(arguments: dict) -> list[types.TextContent]:
334 | """
335 | Analyze a flow for bot protection mechanisms and extract challenge details.
336 | """
337 | session_id = arguments.get("session_id")
338 | flow_index = arguments.get("flow_index")
339 | extract_scripts = arguments.get("extract_scripts", True)
340 |
341 | if not session_id:
342 | return [types.TextContent(type="text", text="Error: Missing session_id")]
343 | if flow_index is None:
344 | return [types.TextContent(type="text", text="Error: Missing flow_index")]
345 |
346 | try:
347 | flows = await get_flows_from_dump(session_id)
348 |
349 | try:
350 | flow = flows[flow_index]
351 |
352 | if flow.type != "http":
353 | return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")]
354 |
355 | # Analyze the flow for protection mechanisms
356 | analysis = {
357 | "flow_index": flow_index,
358 | "method": flow.request.method,
359 | "url": flow.request.url,
360 | "protection_systems": identify_protection_system(flow),
361 | "request_cookies": analyze_cookies(dict(flow.request.headers)),
362 | "has_response": flow.response is not None,
363 | }
364 |
365 | if flow.response:
366 | # Add response analysis
367 | content_type = flow.response.headers.get("Content-Type", "")
368 | is_html = "text/html" in content_type
369 |
370 | analysis.update({
371 | "status_code": flow.response.status_code,
372 | "response_cookies": analyze_cookies(dict(flow.response.headers)),
373 | "challenge_analysis": analyze_response_for_challenge(flow),
374 | "content_type": content_type,
375 | "is_html": is_html,
376 | })
377 |
378 | # If HTML and script extraction is requested, extract and analyze JavaScript
379 | if is_html and extract_scripts:
380 | try:
381 | html_content = flow.response.content.decode('utf-8', errors='ignore')
382 | analysis["scripts"] = extract_javascript(html_content)
383 | except Exception as e:
384 | analysis["script_extraction_error"] = str(e)
385 |
386 | # Add remediation suggestions based on findings
387 | analysis["suggestions"] = generate_suggestions(analysis)
388 |
389 | return [types.TextContent(type="text", text=json.dumps(analysis, indent=2))]
390 |
391 | except IndexError:
392 | return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")]
393 |
394 | except FileNotFoundError:
395 | return [types.TextContent(type="text", text="Error: Session not found")]
396 | except Exception as e:
397 | return [types.TextContent(type="text", text=f"Error analyzing protection: {str(e)}")]
398 |
399 | @server.call_tool()
400 | async def handle_call_tool(
401 | name: str, arguments: dict | None
402 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
403 | """
404 | Handle tool execution requests.
405 | Delegates to specific functions based on the tool name.
406 | """
407 | if not arguments:
408 | raise ValueError("Missing arguments")
409 |
410 | if name == "list_flows":
411 | return await list_flows(arguments)
412 | elif name == "get_flow_details":
413 | return await get_flow_details(arguments)
414 | elif name == "extract_json_fields":
415 | return await extract_json_fields(arguments)
416 | elif name == "analyze_protection":
417 | return await analyze_protection(arguments)
418 | else:
419 | raise ValueError(f"Unknown tool: {name}")
420 |
421 | async def main():
422 | # Run the server using stdin/stdout streams
423 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
424 | await server.run(
425 | read_stream,
426 | write_stream,
427 | server.create_initialization_options(),
428 | )
```