# Directory Structure ``` ├── .gitignore ├── certpatrol.py ├── LICENSE ├── README.md └── requirements.txt ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python 2 | __pycache__/ 3 | *.py[cod] 4 | *$py.class 5 | *.so 6 | .Python 7 | build/ 8 | develop-eggs/ 9 | dist/ 10 | downloads/ 11 | eggs/ 12 | .eggs/ 13 | lib/ 14 | lib64/ 15 | parts/ 16 | sdist/ 17 | var/ 18 | wheels/ 19 | *.egg-info/ 20 | .installed.cfg 21 | *.egg 22 | 23 | # Virtual environments 24 | venv/ 25 | env/ 26 | ENV/ 27 | 28 | # IDE 29 | .vscode/ 30 | .idea/ 31 | *.swp 32 | *.swo 33 | 34 | # OS 35 | .DS_Store 36 | Thumbs.db 37 | 38 | # Project specific 39 | checkpoints/ 40 | debug_responses/ 41 | *.log 42 | index.html 43 | improveplan.md 44 | pyproject.toml 45 | dist/ 46 | 47 | # Environment variables 48 | .env 49 | .env.local 50 | .venv/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | [](https://pepy.tech/projects/certpatrol) 2 | 3 | # CertPatrol 4 | 5 | <p align="center"> 6 | <img width="609" height="250" alt="Torito Logo" src="https://torito.io/toritocertpatrol.png"> 7 | </p> 8 | 9 | A fast, lightweight **Certificate Transparency (CT)** log tailer for your terminal. Filter domains with regex, run locally for privacy, and monitor in real time — no third-party servers, no tracking. 10 | 11 | A modern, local, privacy-friendly **CertStream** alternative. 12 | 13 | > **Looking for a more advanced CertStream server alternative?** 14 | > Check out [Certstream Server Go](https://github.com/d-Rickyy-b/certstream-server-go) by [d-Rickyy-b](https://github.com/d-Rickyy-b) for a robust, production-grade solution. 15 | 16 | --- 17 | 18 | ## Installation 19 | 20 | ```bash 21 | pip install certpatrol 22 | ``` 23 | 24 | --- 25 | 26 | ## Quick start 27 | 28 | ```bash 29 | # Find domains containing "example" 30 | certpatrol --pattern "example" 31 | 32 | # Find shop subdomains of amazon.com 33 | certpatrol --pattern "shop.*\.amazon\.com$" 34 | 35 | # Match against base domains only (e.g., example.co.uk) 36 | certpatrol --pattern "argentina" --etld1 37 | ``` 38 | 39 | --- 40 | 41 | ## Options 42 | 43 | ### Core Options 44 | - `-p, --pattern PATTERN` – Regex pattern to match domains against (required) 45 | - `-l, --logs LOGS` – Specific CT logs to monitor (default: all usable logs) 46 | - `-v, --verbose` – Verbose output with extra info 47 | - `-h, --help` – Show help message and exit 48 | 49 | ### Polling & Performance 50 | - `-s, --poll-sleep SECONDS` – Initial poll interval (default: 3.0, adaptive) 51 | - `-mn, --min-poll-sleep` – Minimum poll sleep for adaptive polling (default: 1.0) 52 | - `-mx, --max-poll-sleep` – Maximum poll sleep for adaptive polling (default: 60.0) 53 | - `-b, --batch SIZE` – Batch size for fetching entries (default: 256) 54 | - `-m, --max-memory-mb` – Maximum memory usage in MB (default: 100) 55 | 56 | ### Filtering & Output 57 | - `-e, --etld1` – Match against base domains only (requires tldextract) 58 | - `-q, --quiet-warnings` – Suppress parse warnings (only show matches) 59 | - `-x, --quiet-parse-errors` – Suppress ASN.1 parsing warnings 60 | - `-d, --debug-all` – With -v, print detailed per-entry domain listings 61 | 62 | ### Checkpoint Management 63 | - `-c, --checkpoint-prefix` – Custom prefix for checkpoint files 64 | - `-k, --cleanup-checkpoints` – Clean up orphaned checkpoint files and exit 65 | 66 | --- 67 | 68 | ## Examples 69 | 70 | ```bash 71 | # Basic monitoring 72 | certpatrol --pattern "petsdeli" 73 | 74 | # Multiple patterns with verbose output 75 | certpatrol --pattern "(petsdeli|pet-deli)" --verbose 76 | 77 | # API subdomains with quiet mode 78 | certpatrol --pattern "api.*\.google\.com$" --quiet-warnings 79 | 80 | # All subdomains of a domain with custom memory limit 81 | certpatrol --pattern ".*\.example\.com$" --max-memory-mb 200 82 | 83 | # Base domain matching only 84 | certpatrol --pattern "argentina" --etld1 85 | 86 | # Run multiple instances with custom prefixes 87 | certpatrol --pattern "domain1" --checkpoint-prefix "instance1" & 88 | certpatrol --pattern "domain2" --checkpoint-prefix "instance2" & 89 | 90 | # Clean up old checkpoint files 91 | certpatrol --cleanup-checkpoints 92 | 93 | # Performance tuning for high-volume monitoring 94 | certpatrol --pattern "example" --batch 512 --min-poll-sleep 0.5 --max-poll-sleep 30 95 | 96 | # Graceful shutdown examples 97 | kill -TERM $(pgrep -f "certpatrol.*example") 98 | # Or use Ctrl+C for immediate graceful shutdown 99 | ``` 100 | 101 | --- 102 | 103 | ## Requirements 104 | 105 | - Python 3.6+ 106 | - requests 107 | - cryptography 108 | - idna 109 | - tldextract (optional, for --etld1) 110 | - psutil (optional, for memory monitoring) 111 | 112 | --- 113 | 114 | ## Features 115 | 116 | - **Real-time monitoring** – Starts from current time (no historical data) 117 | - **Graceful shutdown** – Handles SIGTERM, SIGINT, and SIGHUP signals properly 118 | - **Adaptive polling** – Automatically adjusts intervals based on activity and errors 119 | - **Memory management** – Monitors and limits memory usage to prevent excessive consumption 120 | - **Connection pooling** – Efficient HTTP session management with retry strategies 121 | - **Checkpoint persistence** – Automatic state saving with atomic writes 122 | - **Multi-instance support** – Unique checkpoint files per process with custom prefixes 123 | 124 | --- 125 | 126 | ## Notes 127 | 128 | - Checkpoints saved in `checkpoints/` folder with process-specific names 129 | - Signal handling ensures clean shutdown and checkpoint saving 130 | - Sleep periods are responsive to shutdown signals (checks every 0.5s) 131 | - Use Ctrl+C, `kill`, or system shutdown for graceful termination 132 | 133 | --- 134 | 135 | ## License 136 | 137 | MIT License — see [LICENSE](https://github.com/ToritoIO/CertPatrol/blob/main/LICENSE) file for details. 138 | ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | # CertPatrol - Certificate Transparency monitoring tool 2 | # Core dependencies 3 | requests>=2.25.0 4 | cryptography>=3.4.0 5 | idna>=3.0 6 | 7 | # Optional but recommended for --etld1 functionality 8 | tldextract>=3.0.0 ``` -------------------------------------------------------------------------------- /certpatrol.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Torito CertPatrol - Tiny local CT tailer that filters domains by a regex pattern. 4 | 5 | Author: Martin Aberastegue 6 | Website: https://torito.io 7 | Repository: https://github.com/ToritoIO/CertPatrol 8 | 9 | Options: 10 | -p, --pattern PATTERN Regex pattern to match domains against (required) 11 | -l, --logs LOGS CT logs to tail (default: fetch all usable logs) 12 | -b, --batch SIZE Batch size for fetching entries (default: 256) 13 | -s, --poll-sleep SECONDS Seconds to sleep between polls (default: 3.0) 14 | -v, --verbose Verbose output (extra info for matches) 15 | -q, --quiet-warnings Suppress parse warnings (only show actual matches) 16 | -e, --etld1 Match against registrable base domain instead of full domain 17 | -d, --debug-all With -v, print per-batch and per-entry domain listings 18 | -x, --quiet-parse-errors Suppress ASN.1 parsing warnings (common in CT logs) 19 | -c, --checkpoint-prefix Custom prefix for checkpoint file (useful for multiple instances) 20 | -k, --cleanup-checkpoints Clean up orphaned checkpoint files and exit 21 | -m, --max-memory-mb Maximum memory usage in MB for batch processing (default: 100) 22 | -mn, --min-poll-sleep Minimum poll sleep time for adaptive polling (default: 1.0) 23 | -mx, --max-poll-sleep Maximum poll sleep time for adaptive polling (default: 60.0) 24 | -h, --help Show this help message and exit 25 | 26 | Requirements: 27 | pip install requests cryptography idna 28 | # Optional but recommended for --etld1 29 | pip install tldextract 30 | """ 31 | 32 | # Suppress OpenSSL and cryptography warnings BEFORE any imports 33 | import warnings 34 | warnings.filterwarnings("ignore", category=UserWarning, module="urllib3") 35 | warnings.filterwarnings("ignore", message=".*OpenSSL.*") 36 | warnings.filterwarnings("ignore", message=".*LibreSSL.*") 37 | warnings.filterwarnings("ignore", category=DeprecationWarning, module="cryptography") 38 | warnings.filterwarnings("ignore", message=".*serial number.*") 39 | 40 | try: 41 | from urllib3.exceptions import NotOpenSSLWarning 42 | warnings.filterwarnings("ignore", category=NotOpenSSLWarning) 43 | except ImportError: 44 | pass 45 | 46 | import argparse 47 | import json 48 | import logging 49 | import os 50 | import re 51 | import signal 52 | import sys 53 | import time 54 | import multiprocessing 55 | import gc 56 | from typing import List, Tuple, Optional, Dict, Any, Iterator, Union 57 | from contextlib import contextmanager 58 | 59 | import idna 60 | import requests 61 | from requests.adapters import HTTPAdapter 62 | from urllib3.util.retry import Retry 63 | from cryptography import x509 64 | from cryptography.hazmat.primitives import serialization 65 | 66 | # Make checkpoint file unique per process to avoid conflicts when running multiple instances 67 | CHECKPOINT_DIR = "checkpoints" 68 | CHECKPOINT_FILE = os.path.join(CHECKPOINT_DIR, f"certpatrol_checkpoints_{os.getpid()}.json") 69 | USER_AGENT = "torito-certpatrol/1.2.0 (+local)" 70 | LOG_LIST_URL = "https://www.gstatic.com/ct/log_list/v3/log_list.json" 71 | 72 | # Phase 2: Connection pooling and rate limiting constants 73 | DEFAULT_POOL_CONNECTIONS = 10 74 | DEFAULT_POOL_MAXSIZE = 20 75 | DEFAULT_MAX_RETRIES = 3 76 | DEFAULT_BACKOFF_FACTOR = 0.3 77 | DEFAULT_TIMEOUT = (10, 30) # (connect, read) 78 | 79 | # Phase 2: Memory management constants 80 | DEFAULT_MAX_MEMORY_MB = 100 81 | MEMORY_CHECK_INTERVAL = 100 # Check memory every N entries 82 | 83 | # Phase 2: Adaptive polling constants 84 | DEFAULT_MIN_POLL_SLEEP = 1.0 85 | DEFAULT_MAX_POLL_SLEEP = 60.0 86 | BACKOFF_MULTIPLIER = 1.5 87 | SUCCESS_REDUCTION_FACTOR = 0.8 88 | 89 | # Dynamic CT log discovery - fetched from Google's official list 90 | CT_LOGS = {} 91 | 92 | # Phase 3: Logging setup 93 | logger = logging.getLogger('certpatrol') 94 | 95 | def setup_logging(verbose: bool = False, quiet_warnings: bool = False) -> None: 96 | """ 97 | Setup logging configuration based on verbosity settings. 98 | Maintains backward compatibility with print-based output. 99 | 100 | Args: 101 | verbose: Enable debug level logging 102 | quiet_warnings: Suppress warning level messages 103 | """ 104 | # Remove existing handlers 105 | for handler in logger.handlers[:]: 106 | logger.removeHandler(handler) 107 | 108 | # Create console handler 109 | console_handler = logging.StreamHandler() 110 | 111 | # Set logging level based on verbose and quiet settings 112 | if verbose: 113 | logger.setLevel(logging.DEBUG) 114 | console_handler.setLevel(logging.DEBUG) 115 | elif quiet_warnings: 116 | logger.setLevel(logging.ERROR) 117 | console_handler.setLevel(logging.ERROR) 118 | else: 119 | logger.setLevel(logging.INFO) 120 | console_handler.setLevel(logging.INFO) 121 | 122 | # Create formatter that mimics original print output 123 | formatter = logging.Formatter('%(message)s') 124 | console_handler.setFormatter(formatter) 125 | 126 | # Add handler to logger 127 | logger.addHandler(console_handler) 128 | logger.propagate = False 129 | 130 | # Signal handling for graceful shutdown 131 | class GracefulShutdownHandler: 132 | """ 133 | Handles graceful shutdown on various signals (SIGTERM, SIGINT, etc.). 134 | 135 | This class manages the shutdown process by: 136 | - Catching termination signals 137 | - Saving current checkpoints 138 | - Cleaning up resources 139 | - Providing clean exit 140 | """ 141 | 142 | def __init__(self): 143 | self.shutdown_requested = False 144 | self.checkpoints: Optional[Dict[str, int]] = None 145 | self.cleanup_functions: List[callable] = [] 146 | 147 | def request_shutdown(self, signum: int, frame) -> None: 148 | """ 149 | Signal handler that requests graceful shutdown. 150 | 151 | Args: 152 | signum: Signal number 153 | frame: Current stack frame 154 | """ 155 | signal_names = { 156 | signal.SIGTERM: "SIGTERM", 157 | signal.SIGINT: "SIGINT", 158 | signal.SIGHUP: "SIGHUP", 159 | } 160 | signal_name = signal_names.get(signum, f"Signal {signum}") 161 | 162 | if not self.shutdown_requested: 163 | # Always show shutdown message regardless of quiet mode 164 | print(f"Received {signal_name}, initiating graceful shutdown...", flush=True) 165 | self.shutdown_requested = True 166 | 167 | # Save checkpoints if available 168 | if self.checkpoints is not None: 169 | try: 170 | save_checkpoints(self.checkpoints) 171 | print("Checkpoints saved successfully", flush=True) 172 | except CheckpointError as e: 173 | print(f"Failed to save checkpoints during shutdown: {e}", flush=True) 174 | 175 | # Run cleanup functions 176 | for cleanup_func in self.cleanup_functions: 177 | try: 178 | cleanup_func() 179 | except Exception as e: 180 | print(f"Error during cleanup: {e}", flush=True) 181 | else: 182 | # Second signal, force exit 183 | print(f"Received second {signal_name}, forcing immediate exit", flush=True) 184 | sys.exit(1) 185 | 186 | def register_cleanup(self, cleanup_func: callable) -> None: 187 | """ 188 | Register a cleanup function to be called during shutdown. 189 | 190 | Args: 191 | cleanup_func: Function to call during cleanup 192 | """ 193 | self.cleanup_functions.append(cleanup_func) 194 | 195 | def set_checkpoints(self, checkpoints: Dict[str, int]) -> None: 196 | """ 197 | Set the current checkpoints for potential saving during shutdown. 198 | 199 | Args: 200 | checkpoints: Current checkpoint data 201 | """ 202 | self.checkpoints = checkpoints 203 | 204 | def should_shutdown(self) -> bool: 205 | """ 206 | Check if shutdown has been requested. 207 | 208 | Returns: 209 | True if shutdown was requested, False otherwise 210 | """ 211 | return self.shutdown_requested 212 | 213 | def setup_signal_handlers(self) -> None: 214 | """Setup signal handlers for graceful shutdown.""" 215 | # Handle common termination signals 216 | signals_to_handle = [signal.SIGTERM, signal.SIGINT] 217 | 218 | # Add SIGHUP on Unix systems (not available on Windows) 219 | if hasattr(signal, 'SIGHUP'): 220 | signals_to_handle.append(signal.SIGHUP) 221 | 222 | for sig in signals_to_handle: 223 | signal.signal(sig, self.request_shutdown) 224 | 225 | # Global shutdown handler instance 226 | shutdown_handler = GracefulShutdownHandler() 227 | 228 | # Phase 3: Configuration validation 229 | def validate_config_args(args: argparse.Namespace) -> List[str]: 230 | """ 231 | Validate command line arguments and configuration. 232 | 233 | This function performs comprehensive validation of all input parameters 234 | to catch configuration errors early and provide helpful error messages. 235 | 236 | Args: 237 | args: Parsed command line arguments 238 | 239 | Returns: 240 | List of validation error messages (empty if all valid) 241 | """ 242 | errors = [] 243 | 244 | # Validate pattern if provided (not required for cleanup operation) 245 | if hasattr(args, 'pattern') and args.pattern: 246 | try: 247 | re.compile(args.pattern, re.IGNORECASE) 248 | except re.error as e: 249 | errors.append(f"Invalid regex pattern: {e}") 250 | 251 | # Validate numeric parameters 252 | if hasattr(args, 'batch') and args.batch is not None: 253 | if args.batch <= 0: 254 | errors.append("Batch size must be positive") 255 | elif args.batch > 10000: 256 | errors.append("Batch size too large (max 10000)") 257 | 258 | if hasattr(args, 'poll_sleep') and args.poll_sleep is not None: 259 | if args.poll_sleep < 0: 260 | errors.append("Poll sleep cannot be negative") 261 | elif args.poll_sleep > 3600: 262 | errors.append("Poll sleep too large (max 3600 seconds)") 263 | 264 | if hasattr(args, 'max_memory_mb') and args.max_memory_mb is not None: 265 | if args.max_memory_mb <= 0: 266 | errors.append("Max memory must be positive") 267 | elif args.max_memory_mb < 10: 268 | errors.append("Max memory too small (min 10MB)") 269 | elif args.max_memory_mb > 10000: 270 | errors.append("Max memory too large (max 10GB)") 271 | 272 | if hasattr(args, 'min_poll_sleep') and args.min_poll_sleep is not None: 273 | if args.min_poll_sleep < 0: 274 | errors.append("Min poll sleep cannot be negative") 275 | elif args.min_poll_sleep > 300: 276 | errors.append("Min poll sleep too large (max 300 seconds)") 277 | 278 | if hasattr(args, 'max_poll_sleep') and args.max_poll_sleep is not None: 279 | if args.max_poll_sleep < 0: 280 | errors.append("Max poll sleep cannot be negative") 281 | elif args.max_poll_sleep > 3600: 282 | errors.append("Max poll sleep too large (max 3600 seconds)") 283 | 284 | # Cross-parameter validation 285 | if (hasattr(args, 'min_poll_sleep') and hasattr(args, 'max_poll_sleep') and 286 | args.min_poll_sleep is not None and args.max_poll_sleep is not None): 287 | if args.max_poll_sleep < args.min_poll_sleep: 288 | errors.append("Max poll sleep must be >= min poll sleep") 289 | 290 | # Validate checkpoint prefix if provided 291 | if hasattr(args, 'checkpoint_prefix') and args.checkpoint_prefix: 292 | import string 293 | safe_chars = string.ascii_letters + string.digits + "_-" 294 | if not all(c in safe_chars for c in args.checkpoint_prefix): 295 | errors.append("Checkpoint prefix can only contain letters, digits, underscores, and hyphens") 296 | if len(args.checkpoint_prefix) > 50: 297 | errors.append("Checkpoint prefix too long (max 50 characters)") 298 | 299 | return errors 300 | 301 | # Phase 1 Improvement: Custom exceptions for better error handling 302 | class CertPatrolError(Exception): 303 | """ 304 | Base exception for CertPatrol errors. 305 | 306 | All custom exceptions in CertPatrol inherit from this base class 307 | to allow for easy exception handling and categorization. 308 | """ 309 | pass 310 | 311 | class CheckpointError(CertPatrolError): 312 | """ 313 | Raised when checkpoint operations fail. 314 | 315 | This includes scenarios such as: 316 | - Failed to create checkpoint directory 317 | - Corrupted checkpoint files 318 | - Invalid checkpoint data structure 319 | - Atomic write failures 320 | """ 321 | pass 322 | 323 | class CTLogError(CertPatrolError): 324 | """ 325 | Raised when CT log operations fail. 326 | 327 | This includes scenarios such as: 328 | - Network errors when connecting to CT logs 329 | - Invalid responses from CT log endpoints 330 | - Malformed JSON data from CT logs 331 | - Authentication or rate limiting issues 332 | """ 333 | pass 334 | 335 | class MemoryError(CertPatrolError): 336 | """ 337 | Raised when memory limits are exceeded. 338 | 339 | This custom memory error is separate from Python's built-in 340 | MemoryError to distinguish between system-level and 341 | application-level memory management issues. 342 | """ 343 | pass 344 | 345 | # Phase 2: HTTP Session Management with Connection Pooling 346 | class HTTPSessionManager: 347 | """ 348 | Manages HTTP sessions with connection pooling and retries. 349 | 350 | This class provides a centralized way to manage HTTP connections 351 | for CT log communication, implementing connection pooling for 352 | improved performance and retry strategies for better reliability. 353 | 354 | Attributes: 355 | timeout: Tuple of (connect_timeout, read_timeout) in seconds 356 | session: The underlying requests.Session object 357 | """ 358 | 359 | def __init__( 360 | self, 361 | pool_connections: int = DEFAULT_POOL_CONNECTIONS, 362 | pool_maxsize: int = DEFAULT_POOL_MAXSIZE, 363 | max_retries: int = DEFAULT_MAX_RETRIES, 364 | backoff_factor: float = DEFAULT_BACKOFF_FACTOR, 365 | timeout: Tuple[int, int] = DEFAULT_TIMEOUT 366 | ) -> None: 367 | """ 368 | Initialize the HTTP session manager. 369 | 370 | Args: 371 | pool_connections: Number of connection pools to cache 372 | pool_maxsize: Maximum number of connections to save in the pool 373 | max_retries: Maximum number of retry attempts 374 | backoff_factor: Backoff factor for retry delays 375 | timeout: Tuple of (connect, read) timeout values in seconds 376 | """ 377 | self.timeout = timeout 378 | self.session = self._create_session(pool_connections, pool_maxsize, max_retries, backoff_factor) 379 | 380 | def _create_session( 381 | self, 382 | pool_connections: int, 383 | pool_maxsize: int, 384 | max_retries: int, 385 | backoff_factor: float 386 | ) -> requests.Session: 387 | """ 388 | Create a requests session with connection pooling and retry strategy. 389 | 390 | Args: 391 | pool_connections: Number of connection pools to cache 392 | pool_maxsize: Maximum number of connections to save in the pool 393 | max_retries: Maximum number of retry attempts 394 | backoff_factor: Backoff factor for retry delays 395 | 396 | Returns: 397 | Configured requests.Session object 398 | """ 399 | session = requests.Session() 400 | 401 | # Configure retry strategy 402 | retry_strategy = Retry( 403 | total=max_retries, 404 | status_forcelist=[429, 500, 502, 503, 504], 405 | backoff_factor=backoff_factor, 406 | allowed_methods=["HEAD", "GET", "OPTIONS"] 407 | ) 408 | 409 | # Configure HTTP adapter with connection pooling 410 | adapter = HTTPAdapter( 411 | pool_connections=pool_connections, 412 | pool_maxsize=pool_maxsize, 413 | max_retries=retry_strategy, 414 | pool_block=False 415 | ) 416 | 417 | session.mount("http://", adapter) 418 | session.mount("https://", adapter) 419 | session.headers.update({"User-Agent": USER_AGENT}) 420 | 421 | return session 422 | 423 | def get(self, url: str, **kwargs) -> requests.Response: 424 | """ 425 | Make a GET request using the managed session. 426 | 427 | Args: 428 | url: The URL to request 429 | **kwargs: Additional arguments passed to requests.get() 430 | 431 | Returns: 432 | Response object from the request 433 | """ 434 | kwargs.setdefault('timeout', self.timeout) 435 | return self.session.get(url, **kwargs) 436 | 437 | def close(self) -> None: 438 | """Close the session and clean up connections.""" 439 | if hasattr(self, 'session'): 440 | self.session.close() 441 | 442 | def __enter__(self) -> 'HTTPSessionManager': 443 | """Context manager entry.""" 444 | return self 445 | 446 | def __exit__(self, exc_type, exc_val, exc_tb) -> None: 447 | """Context manager exit with cleanup.""" 448 | self.close() 449 | 450 | # Phase 2: Adaptive Rate Limiting 451 | class AdaptiveRateLimiter: 452 | """ 453 | Manages adaptive polling intervals with exponential backoff on errors. 454 | 455 | This class implements an adaptive rate limiting strategy that: 456 | - Increases sleep intervals on consecutive errors (exponential backoff) 457 | - Decreases sleep intervals on consecutive successes 458 | - Maintains configurable minimum and maximum sleep bounds 459 | 460 | Attributes: 461 | current_sleep: Current sleep interval in seconds 462 | min_sleep: Minimum allowed sleep interval 463 | max_sleep: Maximum allowed sleep interval 464 | consecutive_errors: Count of consecutive error occurrences 465 | consecutive_successes: Count of consecutive successful operations 466 | """ 467 | 468 | def __init__( 469 | self, 470 | initial_sleep: float, 471 | min_sleep: float = DEFAULT_MIN_POLL_SLEEP, 472 | max_sleep: float = DEFAULT_MAX_POLL_SLEEP 473 | ) -> None: 474 | """ 475 | Initialize the adaptive rate limiter. 476 | 477 | Args: 478 | initial_sleep: Starting sleep interval in seconds 479 | min_sleep: Minimum allowed sleep interval in seconds 480 | max_sleep: Maximum allowed sleep interval in seconds 481 | """ 482 | self.current_sleep = initial_sleep 483 | self.min_sleep = min_sleep 484 | self.max_sleep = max_sleep 485 | self.consecutive_errors = 0 486 | self.consecutive_successes = 0 487 | 488 | def on_success(self) -> None: 489 | """ 490 | Called when an operation succeeds. 491 | 492 | Resets error counter and potentially reduces sleep interval 493 | after multiple consecutive successes. 494 | """ 495 | self.consecutive_errors = 0 496 | self.consecutive_successes += 1 497 | 498 | # Gradually reduce sleep time on consecutive successes 499 | if self.consecutive_successes >= 3: 500 | self.current_sleep = max( 501 | self.min_sleep, 502 | self.current_sleep * SUCCESS_REDUCTION_FACTOR 503 | ) 504 | self.consecutive_successes = 0 505 | 506 | def on_error(self) -> None: 507 | """ 508 | Called when an operation fails. 509 | 510 | Resets success counter and increases sleep interval 511 | using exponential backoff strategy. 512 | """ 513 | self.consecutive_successes = 0 514 | self.consecutive_errors += 1 515 | 516 | # Exponential backoff on consecutive errors 517 | self.current_sleep = min( 518 | self.max_sleep, 519 | self.current_sleep * (BACKOFF_MULTIPLIER ** self.consecutive_errors) 520 | ) 521 | 522 | def sleep(self) -> None: 523 | """Sleep for the current adaptive interval.""" 524 | time.sleep(self.current_sleep) 525 | 526 | def get_current_sleep(self) -> float: 527 | """ 528 | Get the current sleep interval. 529 | 530 | Returns: 531 | Current sleep interval in seconds 532 | """ 533 | return self.current_sleep 534 | 535 | # Phase 2: Memory Monitor 536 | class MemoryMonitor: 537 | """ 538 | Monitors and manages memory usage during processing. 539 | 540 | This class provides memory monitoring capabilities to prevent 541 | excessive memory usage during certificate processing. It can 542 | trigger garbage collection and raise exceptions when limits 543 | are exceeded. 544 | 545 | Attributes: 546 | max_memory_bytes: Maximum allowed memory usage in bytes 547 | check_counter: Counter for periodic memory checks 548 | """ 549 | 550 | def __init__(self, max_memory_mb: int = DEFAULT_MAX_MEMORY_MB) -> None: 551 | """ 552 | Initialize the memory monitor. 553 | 554 | Args: 555 | max_memory_mb: Maximum allowed memory usage in megabytes 556 | """ 557 | self.max_memory_bytes = max_memory_mb * 1024 * 1024 558 | self.check_counter = 0 559 | 560 | def check_memory(self) -> None: 561 | """ 562 | Check current memory usage and trigger GC if needed. 563 | 564 | Raises: 565 | MemoryError: If memory usage exceeds the configured limit 566 | even after garbage collection 567 | """ 568 | self.check_counter += 1 569 | 570 | if self.check_counter % MEMORY_CHECK_INTERVAL == 0: 571 | try: 572 | import psutil # type: ignore[import-untyped] 573 | process = psutil.Process() 574 | memory_info = process.memory_info() 575 | 576 | if memory_info.rss > self.max_memory_bytes: 577 | # Force garbage collection 578 | gc.collect() 579 | 580 | # Check again after GC 581 | memory_info = process.memory_info() 582 | if memory_info.rss > self.max_memory_bytes: 583 | raise MemoryError( 584 | f"Memory usage ({memory_info.rss / 1024 / 1024:.1f}MB) " 585 | f"exceeds limit ({self.max_memory_bytes / 1024 / 1024:.1f}MB)" 586 | ) 587 | except ImportError: 588 | # psutil not available, use basic GC trigger 589 | if self.check_counter % (MEMORY_CHECK_INTERVAL * 10) == 0: 590 | gc.collect() 591 | 592 | # --- Phase 1: Certificate parsing with cryptography library --- 593 | 594 | def _read_uint24(b: bytes, offset: int) -> Tuple[int, int]: 595 | """ 596 | Read a 3-byte big-endian unsigned int, return (value, new_offset). 597 | 598 | Args: 599 | b: Byte array to read from 600 | offset: Starting position in the byte array 601 | 602 | Returns: 603 | Tuple of (parsed_value, new_offset) 604 | 605 | Raises: 606 | ValueError: If there are insufficient bytes remaining 607 | """ 608 | if offset + 3 > len(b): 609 | raise ValueError("Truncated uint24") 610 | return (b[offset] << 16) | (b[offset+1] << 8) | b[offset+2], offset + 3 611 | 612 | def parse_tls_cert_chain(extra_data_b64: str) -> List[bytes]: 613 | """ 614 | Parse certificates from CT 'extra_data' (base64). 615 | CT logs concatenate DER certificates directly, not in TLS structure. 616 | Returns a list of DER cert bytes [leaf, intermediates...]. 617 | 618 | Phase 1: Replaced manual ASN.1 parsing with cryptography library-based parsing 619 | Phase 2: Added memory efficiency improvements 620 | """ 621 | import base64 622 | try: 623 | raw = base64.b64decode(extra_data_b64) 624 | if len(raw) < 10: # Minimum reasonable certificate size 625 | return [] 626 | 627 | certs = [] 628 | pos = 0 629 | 630 | # Phase 2: Process certificates with memory awareness 631 | while pos < len(raw): 632 | try: 633 | # Look for ASN.1 SEQUENCE start (0x30) which indicates start of certificate 634 | if pos + 1 >= len(raw) or raw[pos] != 0x30: 635 | pos += 1 636 | continue 637 | 638 | # Use a more robust approach: try different certificate lengths 639 | min_cert_size = 100 # Very small certificates are unlikely 640 | max_cert_size = min(len(raw) - pos, 10 * 1024 * 1024) # Max 10MB per cert 641 | 642 | cert_found = False 643 | 644 | # Try to find the correct certificate boundary by testing if we can parse it 645 | for try_end in range(pos + min_cert_size, min(pos + max_cert_size + 1, len(raw) + 1)): 646 | try: 647 | candidate_der = raw[pos:try_end] 648 | 649 | # Attempt to parse with cryptography library - this validates the DER structure 650 | test_cert = x509.load_der_x509_certificate(candidate_der) 651 | 652 | # If we got here, the certificate parsed successfully 653 | certs.append(candidate_der) 654 | pos = try_end 655 | cert_found = True 656 | break 657 | 658 | except (ValueError, TypeError, x509.ExtensionNotFound, x509.InvalidVersion) as e: 659 | # These are expected for partial certificates or invalid DER 660 | continue 661 | except Exception: 662 | # Unexpected error, skip this attempt 663 | continue 664 | 665 | if not cert_found: 666 | # No valid certificate found starting at this position, advance by 1 667 | pos += 1 668 | 669 | except Exception: 670 | # If anything goes wrong, advance position and continue 671 | pos += 1 672 | continue 673 | 674 | return certs 675 | 676 | except Exception: 677 | # Fallback: if base64 decode fails or other fundamental error 678 | return [] 679 | 680 | def extract_domains_from_der(der_bytes: bytes) -> List[str]: 681 | """ 682 | Extract DNS names from SAN; if absent, fallback to CN when it looks like a DNS name. 683 | Returns lowercased, Unicode (IDNA-decoded) domains. 684 | """ 685 | domains = [] 686 | 687 | # Suppress warnings for certificates with non-compliant serial numbers 688 | with warnings.catch_warnings(): 689 | warnings.filterwarnings("ignore", category=DeprecationWarning) 690 | warnings.filterwarnings("ignore", message=".*serial number.*") 691 | cert = x509.load_der_x509_certificate(der_bytes) 692 | 693 | # Try SAN first 694 | try: 695 | san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) 696 | for name in san.value.get_values_for_type(x509.DNSName): 697 | domains.append(name) 698 | except x509.ExtensionNotFound: 699 | pass 700 | 701 | # Fallback: subject CN 702 | if not domains: 703 | try: 704 | cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value 705 | # crude DNS-ish check: contains a dot or wildcard 706 | if "." in cn or cn.startswith("*."): 707 | domains.append(cn) 708 | except IndexError: 709 | pass 710 | 711 | # Normalize: lower-case, IDNA decode to Unicode for display, but keep ASCII if decode fails 712 | normed = [] 713 | for d in domains: 714 | d = d.strip().lower() 715 | if d.startswith("*."): 716 | base = d[2:] 717 | try: 718 | u = idna.decode(base) 719 | normed.append("*." + u) 720 | except Exception: 721 | normed.append(d) 722 | else: 723 | try: 724 | u = idna.decode(d) 725 | normed.append(u) 726 | except Exception: 727 | normed.append(d) 728 | return list(dict.fromkeys(normed)) # dedupe, keep order 729 | 730 | def registrable_domain(domain: str) -> str: 731 | """ 732 | Return the registrable base domain (eTLD+1) for a given domain string. 733 | Falls back to a best-effort heuristic if tldextract is unavailable. 734 | Keeps Unicode/IDNA-decoded input as-is. 735 | """ 736 | # Strip wildcard for matching purposes 737 | d = domain.lstrip("*.") 738 | try: 739 | # Import locally to avoid hard dependency unless used 740 | import tldextract # type: ignore 741 | ext = tldextract.extract(d) 742 | if ext.domain and ext.suffix: 743 | return f"{ext.domain}.{ext.suffix}" 744 | return d 745 | except Exception: 746 | parts = d.split(".") 747 | if len(parts) >= 2: 748 | return ".".join(parts[-2:]) 749 | return d 750 | 751 | # --- Phase 2: Enhanced Dynamic CT log discovery with connection pooling --- 752 | 753 | def fetch_usable_ct_logs(verbose: bool = False, session_manager: HTTPSessionManager = None) -> Dict[str, str]: 754 | """ 755 | Fetch the current list of usable CT logs from Google's official list. 756 | Returns a dict mapping log names to base URLs. 757 | 758 | Phase 2: Uses HTTP session manager for connection pooling. 759 | """ 760 | close_session = False 761 | if session_manager is None: 762 | session_manager = HTTPSessionManager() 763 | close_session = True 764 | 765 | try: 766 | if verbose: 767 | logger.info("Fetching current CT log list from Google...") 768 | 769 | resp = session_manager.get(LOG_LIST_URL) 770 | resp.raise_for_status() 771 | data = resp.json() 772 | 773 | usable_logs = {} 774 | 775 | # Extract logs from all operators 776 | for operator in data.get("operators", []): 777 | operator_name = operator.get("name", "unknown") 778 | 779 | for log in operator.get("logs", []): 780 | # Check if log is usable/qualified 781 | state = log.get("state", {}) 782 | if "usable" in state or "qualified" in state: 783 | url = log["url"].rstrip("/") 784 | description = log.get("description", "") 785 | 786 | # Create a simple name from description or URL 787 | if description: 788 | # Extract meaningful name from description 789 | name = description.lower() 790 | name = name.replace("'", "").replace('"', "") 791 | name = name.replace(" log", "").replace(" ", "_") 792 | # Take first part if too long 793 | name = name.split("_")[0:2] 794 | name = "_".join(name) 795 | else: 796 | # Fallback to URL-based name 797 | name = url.split("/")[-1] or url.split("/")[-2] 798 | 799 | # Ensure unique names 800 | original_name = name 801 | counter = 1 802 | while name in usable_logs: 803 | name = f"{original_name}_{counter}" 804 | counter += 1 805 | 806 | usable_logs[name] = url 807 | 808 | if verbose: 809 | logger.info(f"Found usable log: {name} -> {url}") 810 | 811 | if verbose: 812 | logger.info(f"Found {len(usable_logs)} usable CT logs") 813 | 814 | return usable_logs 815 | 816 | except requests.RequestException as e: 817 | if verbose: 818 | logger.warning(f"Network error fetching CT log list: {e}") 819 | # Fallback to a known working log 820 | return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"} 821 | except (json.JSONDecodeError, KeyError, TypeError) as e: 822 | if verbose: 823 | logger.warning(f"Failed to parse CT log list: {e}") 824 | # Fallback to a known working log 825 | return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"} 826 | except Exception as e: 827 | if verbose: 828 | logger.warning(f"Unexpected error fetching CT log list: {e}") 829 | # Fallback to a known working log 830 | return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"} 831 | finally: 832 | if close_session: 833 | session_manager.close() 834 | 835 | def save_debug_response(name: str, entry: dict, absolute_idx: int) -> None: 836 | """ 837 | Save a CT log entry to a debug file for analysis. 838 | 839 | Args: 840 | name: Name of the CT log 841 | entry: The CT log entry data to save 842 | absolute_idx: Absolute index of the entry in the log 843 | """ 844 | debug_dir = "debug_responses" 845 | try: 846 | if not os.path.exists(debug_dir): 847 | os.makedirs(debug_dir) 848 | 849 | filename = f"{debug_dir}/{name}_{absolute_idx}.json" 850 | with open(filename, "w", encoding="utf-8") as f: 851 | json.dump(entry, f, indent=2, default=str) 852 | logger.debug(f"Saved response to {filename}") 853 | except (OSError, IOError, TypeError, ValueError) as e: 854 | logger.debug(f"Failed to save response: {e}") 855 | 856 | # --- Phase 2: Enhanced CT polling with connection pooling --- 857 | 858 | def get_sth(base_url: str, session_manager: HTTPSessionManager) -> int: 859 | """ 860 | Return current tree_size of the CT log. 861 | Phase 2: Uses HTTP session manager for connection pooling. 862 | """ 863 | try: 864 | r = session_manager.get(f"{base_url}/ct/v1/get-sth") 865 | r.raise_for_status() 866 | data = r.json() 867 | return int(data["tree_size"]) 868 | except requests.RequestException as e: 869 | raise CTLogError(f"Network error getting STH from {base_url}: {e}") 870 | except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e: 871 | raise CTLogError(f"Invalid STH response from {base_url}: {e}") 872 | 873 | def get_entries(base_url: str, start: int, end: int, session_manager: HTTPSessionManager) -> List[dict]: 874 | """ 875 | Fetch entries [start..end] inclusive (may return fewer). 876 | Phase 2: Uses HTTP session manager for connection pooling. 877 | """ 878 | try: 879 | r = session_manager.get( 880 | f"{base_url}/ct/v1/get-entries", 881 | params={"start": start, "end": end} 882 | ) 883 | r.raise_for_status() 884 | data = r.json() 885 | return data.get("entries", []) 886 | except requests.RequestException as e: 887 | raise CTLogError(f"Network error getting entries from {base_url}: {e}") 888 | except (json.JSONDecodeError, KeyError, TypeError) as e: 889 | raise CTLogError(f"Invalid entries response from {base_url}: {e}") 890 | 891 | # --- Phase 1: Enhanced Checkpoint Management --- 892 | 893 | def ensure_checkpoint_dir(): 894 | """Ensure the checkpoints directory exists.""" 895 | try: 896 | if not os.path.exists(CHECKPOINT_DIR): 897 | os.makedirs(CHECKPOINT_DIR) 898 | except OSError as e: 899 | raise CheckpointError(f"Failed to create checkpoint directory: {e}") 900 | 901 | def validate_checkpoint_data(data: Any) -> Dict[str, int]: 902 | """ 903 | Validate checkpoint data structure and content. 904 | Returns validated data or raises CheckpointError. 905 | """ 906 | if not isinstance(data, dict): 907 | raise CheckpointError("Checkpoint data must be a dictionary") 908 | 909 | validated = {} 910 | for key, value in data.items(): 911 | if not isinstance(key, str): 912 | raise CheckpointError(f"Checkpoint key must be string, got {type(key)}") 913 | if not isinstance(value, (int, float)): 914 | raise CheckpointError(f"Checkpoint value must be numeric, got {type(value)}") 915 | 916 | # Convert to int and validate range 917 | try: 918 | int_value = int(value) 919 | if int_value < 0: 920 | raise CheckpointError(f"Checkpoint value must be non-negative, got {int_value}") 921 | validated[key] = int_value 922 | except (ValueError, OverflowError) as e: 923 | raise CheckpointError(f"Invalid checkpoint value for {key}: {e}") 924 | 925 | return validated 926 | 927 | def load_checkpoints() -> Dict[str, int]: 928 | """ 929 | Enhanced checkpoint loading with validation. 930 | 931 | Returns: 932 | Dictionary mapping log names to checkpoint positions 933 | """ 934 | ensure_checkpoint_dir() 935 | if os.path.exists(CHECKPOINT_FILE): 936 | try: 937 | with open(CHECKPOINT_FILE, "r", encoding="utf-8") as fh: 938 | raw_data = json.load(fh) 939 | return validate_checkpoint_data(raw_data) 940 | except (json.JSONDecodeError, IOError) as e: 941 | logger.warning(f"Corrupted checkpoint file, starting fresh: {e}") 942 | return {} 943 | except CheckpointError as e: 944 | logger.warning(f"Invalid checkpoint data, starting fresh: {e}") 945 | return {} 946 | return {} 947 | 948 | def save_checkpoints(cp: Dict[str, int]) -> None: 949 | """ 950 | Enhanced atomic write with validation and integrity checks. 951 | 952 | Args: 953 | cp: Dictionary mapping log names to checkpoint positions 954 | 955 | Raises: 956 | CheckpointError: If checkpoint data is invalid or save operation fails 957 | """ 958 | ensure_checkpoint_dir() 959 | 960 | # Validate checkpoint data before saving 961 | try: 962 | validated_cp = validate_checkpoint_data(cp) 963 | except CheckpointError as e: 964 | raise CheckpointError(f"Cannot save invalid checkpoint data: {e}") 965 | 966 | tmp = CHECKPOINT_FILE + ".tmp" 967 | max_retries = 3 968 | retry_delay = 0.1 969 | 970 | for attempt in range(max_retries): 971 | try: 972 | # Write to temporary file 973 | with open(tmp, "w", encoding="utf-8") as fh: 974 | json.dump(validated_cp, fh, indent=2) 975 | 976 | # Verify the file was written correctly by reading it back 977 | try: 978 | with open(tmp, "r", encoding="utf-8") as fh: 979 | verify_data = json.load(fh) 980 | validate_checkpoint_data(verify_data) 981 | except Exception as e: 982 | raise CheckpointError(f"Checkpoint verification failed: {e}") 983 | 984 | # Atomically replace the original file 985 | os.replace(tmp, CHECKPOINT_FILE) 986 | return 987 | 988 | except (OSError, IOError, CheckpointError) as e: 989 | if attempt < max_retries - 1: 990 | time.sleep(retry_delay) 991 | retry_delay *= 2 # Exponential backoff 992 | continue 993 | else: 994 | # Clean up temp file on final failure 995 | try: 996 | if os.path.exists(tmp): 997 | os.unlink(tmp) 998 | except OSError: 999 | pass 1000 | raise CheckpointError(f"Failed to save checkpoints after {max_retries} attempts: {e}") 1001 | 1002 | def cleanup_checkpoint_file() -> None: 1003 | """ 1004 | Clean up checkpoint file when process exits. 1005 | 1006 | This function is typically called during program shutdown 1007 | to remove process-specific checkpoint files. 1008 | """ 1009 | try: 1010 | if os.path.exists(CHECKPOINT_FILE): 1011 | os.unlink(CHECKPOINT_FILE) 1012 | except OSError as e: 1013 | logger.warning(f"Failed to cleanup checkpoint file: {e}") 1014 | 1015 | def cleanup_orphaned_checkpoints() -> None: 1016 | """ 1017 | Clean up checkpoint files from processes that are no longer running. 1018 | 1019 | This function scans for checkpoint files and removes those belonging 1020 | to processes that are no longer active. 1021 | """ 1022 | import glob 1023 | ensure_checkpoint_dir() 1024 | checkpoint_files = glob.glob(os.path.join(CHECKPOINT_DIR, "*.json")) 1025 | cleaned = 0 1026 | 1027 | for checkpoint_file in checkpoint_files: 1028 | try: 1029 | # Extract filename without path 1030 | filename = os.path.basename(checkpoint_file) 1031 | # Try to parse the filename to extract PID 1032 | if filename.startswith("certpatrol_checkpoints_") and filename.endswith(".json"): 1033 | pid_part = filename[24:-5] # Remove "certpatrol_checkpoints_" prefix and ".json" suffix 1034 | if "_" in pid_part: 1035 | # Has custom prefix, extract PID from end 1036 | pid = pid_part.split("_")[-1] 1037 | else: 1038 | # No custom prefix, entire part is PID 1039 | pid = pid_part 1040 | 1041 | try: 1042 | pid = int(pid) 1043 | # Check if process is still running 1044 | os.kill(pid, 0) 1045 | # Process exists, keep file 1046 | except (ValueError, OSError): 1047 | # Process doesn't exist, remove file 1048 | os.unlink(checkpoint_file) 1049 | cleaned += 1 1050 | logger.info(f"Removed orphaned checkpoint: {filename}") 1051 | except Exception as e: 1052 | logger.warning(f"Failed to process checkpoint file {checkpoint_file}: {e}") 1053 | 1054 | if cleaned > 0: 1055 | logger.info(f"Cleaned up {cleaned} orphaned checkpoint files") 1056 | else: 1057 | logger.info("No orphaned checkpoint files found") 1058 | 1059 | # --- Phase 2: Enhanced streaming entry processor --- 1060 | 1061 | def process_entries_streaming( 1062 | entries: List[dict], 1063 | pattern: re.Pattern, 1064 | match_scope: str, 1065 | verbose: bool, 1066 | debug_all: bool, 1067 | quiet_parse_errors: bool, 1068 | quiet_warnings: bool, 1069 | log_name: str, 1070 | start_idx: int, 1071 | memory_monitor: MemoryMonitor 1072 | ) -> Iterator[str]: 1073 | """ 1074 | Process CT log entries in streaming fashion to optimize memory usage. 1075 | Phase 2: Added memory monitoring and streaming processing. 1076 | 1077 | Note: When match_scope == "etld1", the pattern is matched against the registrable 1078 | base domain (eTLD+1), not the full domain. For example: 1079 | - Full domain: "tom-tochito.workers.dev" 1080 | - Registrable domain: "workers.dev" 1081 | - Pattern should match "workers.dev", not ".*\\.workers\\.dev$" 1082 | """ 1083 | for i, entry in enumerate(entries): 1084 | absolute_idx = start_idx + i 1085 | 1086 | # Phase 2: Check memory usage periodically 1087 | memory_monitor.check_memory() 1088 | 1089 | try: 1090 | chain = parse_tls_cert_chain(entry["extra_data"]) 1091 | if not chain: 1092 | if verbose: 1093 | logger.debug(f"{log_name}@{absolute_idx}: no valid chain parsed") 1094 | # Save first few failed responses for debugging 1095 | if absolute_idx % 100 == 0: # Save every 100th failed entry 1096 | save_debug_response(log_name, entry, absolute_idx) 1097 | continue 1098 | 1099 | leaf_der = chain[0] # end-entity first 1100 | domains = extract_domains_from_der(leaf_der) 1101 | 1102 | if verbose and debug_all and domains: 1103 | logger.debug(f"{log_name}@{absolute_idx}: found domains: {domains}") 1104 | 1105 | # Process matches immediately to reduce memory footprint 1106 | for d in domains: 1107 | target = registrable_domain(d) if match_scope == "etld1" else d 1108 | if pattern.search(target): 1109 | yield d 1110 | # If verbose, also yield metadata 1111 | if verbose: 1112 | ts = entry.get("sct", {}).get("timestamp") 1113 | yield f"# matched {d} | log={log_name} idx={absolute_idx} ts={ts}" 1114 | 1115 | # Phase 2: Clear references to help with memory management 1116 | del entry 1117 | if i % 50 == 0: # Force GC every 50 entries 1118 | gc.collect() 1119 | 1120 | except Exception as e: 1121 | if verbose and not quiet_warnings and not quiet_parse_errors: 1122 | logger.warning(f"{log_name}@{absolute_idx}: parse failed: {e}") 1123 | # Save first few failed responses for debugging 1124 | if absolute_idx % 100 == 0: 1125 | save_debug_response(log_name, entry, absolute_idx) 1126 | continue 1127 | 1128 | # --- Phase 2: Main log tailing function with performance improvements --- 1129 | 1130 | def tail_logs( 1131 | logs: List[str], 1132 | pattern: re.Pattern, 1133 | batch: int = 256, 1134 | poll_sleep: float = 3.0, 1135 | verbose: bool = False, 1136 | ct_logs: dict = None, 1137 | quiet_warnings: bool = False, 1138 | match_scope: str = "full", 1139 | debug_all: bool = False, 1140 | quiet_parse_errors: bool = False, 1141 | max_memory_mb: int = DEFAULT_MAX_MEMORY_MB, 1142 | min_poll_sleep: float = DEFAULT_MIN_POLL_SLEEP, 1143 | max_poll_sleep: float = DEFAULT_MAX_POLL_SLEEP, 1144 | ): 1145 | """ 1146 | Main log tailing function. 1147 | Phase 2: Added HTTP connection pooling, adaptive rate limiting, and memory management. 1148 | """ 1149 | if ct_logs is None: 1150 | ct_logs = CT_LOGS 1151 | 1152 | # Phase 2: Initialize performance components 1153 | session_manager = HTTPSessionManager() 1154 | rate_limiter = AdaptiveRateLimiter(poll_sleep, min_poll_sleep, max_poll_sleep) 1155 | memory_monitor = MemoryMonitor(max_memory_mb) 1156 | 1157 | # Register session cleanup with shutdown handler 1158 | shutdown_handler.register_cleanup(session_manager.close) 1159 | 1160 | try: 1161 | checkpoints = load_checkpoints() 1162 | except CheckpointError as e: 1163 | logger.error(f"Failed to load checkpoints: {e}") 1164 | return 1165 | finally: 1166 | # Ensure session cleanup on any exit 1167 | import atexit 1168 | atexit.register(session_manager.close) 1169 | 1170 | # Initialize checkpoints at current tree_size (tail-from-now semantics) 1171 | for name in logs: 1172 | # Check for shutdown during initialization 1173 | if shutdown_handler.should_shutdown(): 1174 | print("Shutdown requested during initialization", flush=True) 1175 | return 1176 | 1177 | if name not in ct_logs: 1178 | if verbose: 1179 | logger.warning(f"Unknown log: {name}") 1180 | continue 1181 | base = ct_logs[name] 1182 | if name not in checkpoints: 1183 | try: 1184 | tree_size = get_sth(base, session_manager) 1185 | checkpoints[name] = tree_size # next index to fetch 1186 | if verbose: 1187 | logger.info(f"{name}: starting at index {tree_size}") 1188 | rate_limiter.on_success() # STH fetch succeeded 1189 | except CTLogError as e: 1190 | logger.warning(f"{name}: failed to init STH: {e}") 1191 | checkpoints[name] = 0 1192 | rate_limiter.on_error() # STH fetch failed 1193 | 1194 | try: 1195 | save_checkpoints(checkpoints) 1196 | except CheckpointError as e: 1197 | logger.error(f"Failed to save initial checkpoints: {e}") 1198 | return 1199 | 1200 | # Set up checkpoints for graceful shutdown 1201 | shutdown_handler.set_checkpoints(checkpoints) 1202 | 1203 | if verbose: 1204 | logger.info(f"Using adaptive polling: {min_poll_sleep}s - {max_poll_sleep}s") 1205 | logger.info(f"Memory limit: {max_memory_mb}MB") 1206 | 1207 | while True: 1208 | # Check for graceful shutdown request 1209 | if shutdown_handler.should_shutdown(): 1210 | print("Graceful shutdown requested, exiting main loop", flush=True) 1211 | break 1212 | any_progress = False 1213 | loop_start_time = time.time() 1214 | 1215 | for name in logs: 1216 | if name not in ct_logs: 1217 | continue 1218 | base = ct_logs[name] 1219 | 1220 | # Phase 2: Check memory before processing each log 1221 | try: 1222 | memory_monitor.check_memory() 1223 | except MemoryError as e: 1224 | logger.error(str(e)) 1225 | # Force a more aggressive GC and continue 1226 | gc.collect() 1227 | continue 1228 | 1229 | # Determine target size 1230 | try: 1231 | tree_size = get_sth(base, session_manager) 1232 | rate_limiter.on_success() # STH fetch succeeded 1233 | except CTLogError as e: 1234 | if verbose: 1235 | logger.warning(f"{name}: get-sth failed: {e}") 1236 | rate_limiter.on_error() # STH fetch failed 1237 | continue 1238 | 1239 | next_idx = checkpoints.get(name, 0) 1240 | if next_idx >= tree_size: 1241 | # nothing new 1242 | continue 1243 | 1244 | any_progress = True 1245 | # Fetch in batches up to current tree_size-1 1246 | end_idx = min(next_idx + batch - 1, tree_size - 1) 1247 | 1248 | try: 1249 | entries = get_entries(base, next_idx, end_idx, session_manager) 1250 | rate_limiter.on_success() # Entries fetch succeeded 1251 | except CTLogError as e: 1252 | if verbose: 1253 | logger.warning(f"{name}: get-entries {next_idx}-{end_idx} failed: {e}") 1254 | rate_limiter.on_error() # Entries fetch failed 1255 | continue 1256 | 1257 | # Process entries with streaming 1258 | if verbose and debug_all and entries: 1259 | logger.debug(f"{name}: processing {len(entries)} entries from {next_idx} to {end_idx}") 1260 | 1261 | # Phase 2: Use streaming processor for better memory efficiency 1262 | try: 1263 | for result in process_entries_streaming( 1264 | entries, pattern, match_scope, verbose, debug_all, 1265 | quiet_parse_errors, quiet_warnings, name, next_idx, memory_monitor 1266 | ): 1267 | print(result, flush=True) 1268 | except MemoryError as e: 1269 | logger.error(f"Memory limit exceeded processing {name}: {e}") 1270 | # Skip this batch and continue 1271 | continue 1272 | 1273 | try: 1274 | checkpoints[name] = end_idx + 1 1275 | save_checkpoints(checkpoints) 1276 | # Update shutdown handler with latest checkpoints 1277 | shutdown_handler.set_checkpoints(checkpoints) 1278 | except CheckpointError as e: 1279 | logger.error(f"Failed to save checkpoints for {name}: {e}") 1280 | # Continue processing other logs even if checkpoint save fails 1281 | 1282 | # Phase 2: Adaptive sleep based on progress and errors 1283 | if not any_progress: 1284 | if verbose: 1285 | current_sleep = rate_limiter.get_current_sleep() 1286 | logger.debug(f"No progress, sleeping for {current_sleep:.1f}s") 1287 | 1288 | # Sleep in smaller chunks to allow for responsive shutdown 1289 | sleep_time = rate_limiter.get_current_sleep() 1290 | sleep_chunk = 0.5 # Check for shutdown every 0.5 seconds 1291 | while sleep_time > 0 and not shutdown_handler.should_shutdown(): 1292 | chunk = min(sleep_chunk, sleep_time) 1293 | time.sleep(chunk) 1294 | sleep_time -= chunk 1295 | else: 1296 | # Had progress, reduce sleep time 1297 | rate_limiter.on_success() 1298 | 1299 | # Phase 2: Show performance stats periodically 1300 | if verbose and any_progress: 1301 | loop_time = time.time() - loop_start_time 1302 | logger.debug(f"Loop completed in {loop_time:.2f}s, current poll interval: {rate_limiter.get_current_sleep():.1f}s") 1303 | 1304 | def main() -> int: 1305 | """ 1306 | Main entry point for CertPatrol. 1307 | 1308 | Returns: 1309 | Exit code (0 for success, non-zero for errors) 1310 | """ 1311 | parser = argparse.ArgumentParser( 1312 | description="Torito CertPatrol - Tiny local CT tailer that filters domains by a regex pattern", 1313 | formatter_class=argparse.RawDescriptionHelpFormatter, 1314 | add_help=False 1315 | ) 1316 | parser.add_argument( 1317 | "--pattern", "-p", 1318 | required=False, # Make optional since cleanup-checkpoints doesn't need it 1319 | help="Regex pattern to match domains against" 1320 | ) 1321 | parser.add_argument( 1322 | "--logs", "-l", 1323 | nargs="+", 1324 | default=None, 1325 | help="CT logs to tail (default: fetch all usable logs)" 1326 | ) 1327 | parser.add_argument( 1328 | "--batch", "-b", 1329 | type=int, 1330 | default=256, 1331 | help="Batch size for fetching entries (default: 256)" 1332 | ) 1333 | parser.add_argument( 1334 | "--poll-sleep", "-s", 1335 | type=float, 1336 | default=3.0, 1337 | help="Initial seconds to sleep between polls (default: 3.0, adaptive)" 1338 | ) 1339 | parser.add_argument( 1340 | "--verbose", "-v", 1341 | action="store_true", 1342 | help="Verbose output" 1343 | ) 1344 | parser.add_argument( 1345 | "--quiet-warnings", "-q", 1346 | action="store_true", 1347 | help="Suppress parse warnings (only show actual matches)" 1348 | ) 1349 | parser.add_argument( 1350 | "--etld1", "-e", 1351 | action="store_true", 1352 | help="Match against registrable base domain instead of full domain (e.g., 'workers.dev' not 'example.workers.dev')" 1353 | ) 1354 | parser.add_argument( 1355 | "--debug-all", "-d", 1356 | action="store_true", 1357 | help="With -v, print per-batch and per-entry domain listings" 1358 | ) 1359 | parser.add_argument( 1360 | "--quiet-parse-errors", "-x", 1361 | action="store_true", 1362 | help="Suppress ASN.1 parsing warnings (common in CT logs)" 1363 | ) 1364 | parser.add_argument( 1365 | "--checkpoint-prefix", "-c", 1366 | help="Custom prefix for checkpoint file (useful for multiple instances)" 1367 | ) 1368 | parser.add_argument( 1369 | "--cleanup-checkpoints", "-k", 1370 | action="store_true", 1371 | help="Clean up orphaned checkpoint files and exit" 1372 | ) 1373 | # Phase 2: New performance and memory options 1374 | parser.add_argument( 1375 | "--max-memory-mb", "-m", 1376 | type=int, 1377 | default=DEFAULT_MAX_MEMORY_MB, 1378 | help=f"Maximum memory usage in MB for batch processing (default: {DEFAULT_MAX_MEMORY_MB})" 1379 | ) 1380 | parser.add_argument( 1381 | "--min-poll-sleep", "-mn", 1382 | type=float, 1383 | default=DEFAULT_MIN_POLL_SLEEP, 1384 | help=f"Minimum poll sleep time for adaptive polling (default: {DEFAULT_MIN_POLL_SLEEP})" 1385 | ) 1386 | parser.add_argument( 1387 | "--max-poll-sleep", "-mx", 1388 | type=float, 1389 | default=DEFAULT_MAX_POLL_SLEEP, 1390 | help=f"Maximum poll sleep time for adaptive polling (default: {DEFAULT_MAX_POLL_SLEEP})" 1391 | ) 1392 | parser.add_argument( 1393 | "--help", "-h", 1394 | action="store_true", 1395 | help="Show this help message and exit" 1396 | ) 1397 | 1398 | args = parser.parse_args() 1399 | 1400 | # Phase 3: Setup logging early (before any output) 1401 | setup_logging(verbose=getattr(args, 'verbose', False), 1402 | quiet_warnings=getattr(args, 'quiet_warnings', False)) 1403 | 1404 | # Setup signal handlers for graceful shutdown 1405 | shutdown_handler.setup_signal_handlers() 1406 | 1407 | # Handle help command 1408 | if args.help: 1409 | print(__doc__) 1410 | return 0 1411 | 1412 | # Handle cleanup command 1413 | if args.cleanup_checkpoints: 1414 | try: 1415 | cleanup_orphaned_checkpoints() 1416 | return 0 1417 | except Exception as e: 1418 | logger.error(f"Cleanup failed: {e}") 1419 | return 1 1420 | 1421 | # Validate that pattern is provided for normal operation 1422 | if not args.pattern: 1423 | logger.error("--pattern/-p is required (unless using --cleanup-checkpoints)") 1424 | return 1 1425 | 1426 | # Phase 3: Use comprehensive validation function 1427 | validation_errors = validate_config_args(args) 1428 | if validation_errors: 1429 | for error in validation_errors: 1430 | logger.error(f"Configuration error: {error}") 1431 | return 1 1432 | 1433 | # Adjust poll sleep if outside adaptive range (with warning) 1434 | if args.poll_sleep < args.min_poll_sleep or args.poll_sleep > args.max_poll_sleep: 1435 | logger.warning(f"Poll sleep ({args.poll_sleep}) outside adaptive range [{args.min_poll_sleep}, {args.max_poll_sleep}], adjusting") 1436 | args.poll_sleep = max(args.min_poll_sleep, min(args.max_poll_sleep, args.poll_sleep)) 1437 | 1438 | # Set checkpoint file with custom prefix if provided 1439 | global CHECKPOINT_FILE 1440 | if args.checkpoint_prefix: 1441 | CHECKPOINT_FILE = os.path.join(CHECKPOINT_DIR, f"certpatrol_checkpoints_{args.checkpoint_prefix}_{os.getpid()}.json") 1442 | 1443 | # Register cleanup function to remove checkpoint file on exit 1444 | import atexit 1445 | atexit.register(cleanup_checkpoint_file) 1446 | 1447 | # Compile regex pattern (already validated by validate_config_args) 1448 | try: 1449 | pattern = re.compile(args.pattern, re.IGNORECASE) 1450 | except re.error as e: 1451 | logger.error(f"Invalid regex pattern: {e}") 1452 | return 1 1453 | 1454 | # Phase 2: Fetch current usable CT logs with session management 1455 | try: 1456 | with HTTPSessionManager() as session_manager: 1457 | ct_logs = fetch_usable_ct_logs(verbose=args.verbose, session_manager=session_manager) 1458 | if not ct_logs: 1459 | logger.error("No usable CT logs found") 1460 | return 1 1461 | except Exception as e: 1462 | logger.error(f"Failed to fetch CT logs: {e}") 1463 | return 1 1464 | 1465 | # Use specified logs or default to all usable logs 1466 | if args.logs is None: 1467 | logs_to_use = list(ct_logs.keys()) 1468 | else: 1469 | logs_to_use = args.logs 1470 | # Validate log names 1471 | invalid_logs = [name for name in logs_to_use if name not in ct_logs] 1472 | if invalid_logs: 1473 | logger.error(f"Unknown log(s): {', '.join(invalid_logs)}") 1474 | logger.info(f"Available logs: {', '.join(sorted(ct_logs.keys()))}") 1475 | return 1 1476 | 1477 | if args.verbose: 1478 | logger.info(f"Tailing logs: {', '.join(logs_to_use)}") 1479 | logger.info(f"Pattern: {args.pattern}") 1480 | logger.info(f"Batch size: {args.batch}") 1481 | logger.info(f"Initial poll sleep: {args.poll_sleep}s") 1482 | logger.info(f"Adaptive polling range: {args.min_poll_sleep}s - {args.max_poll_sleep}s") 1483 | logger.info(f"Memory limit: {args.max_memory_mb}MB") 1484 | if args.checkpoint_prefix: 1485 | logger.info(f"Checkpoint prefix: {args.checkpoint_prefix}") 1486 | 1487 | try: 1488 | tail_logs( 1489 | logs=logs_to_use, 1490 | pattern=pattern, 1491 | batch=args.batch, 1492 | poll_sleep=args.poll_sleep, 1493 | verbose=args.verbose, 1494 | ct_logs=ct_logs, 1495 | quiet_warnings=args.quiet_warnings, 1496 | match_scope="etld1" if args.etld1 else "full", 1497 | debug_all=args.debug_all, 1498 | quiet_parse_errors=args.quiet_parse_errors, 1499 | max_memory_mb=args.max_memory_mb, 1500 | min_poll_sleep=args.min_poll_sleep, 1501 | max_poll_sleep=args.max_poll_sleep, 1502 | ) 1503 | 1504 | # If we exit the tail_logs normally, it was due to graceful shutdown 1505 | if shutdown_handler.should_shutdown(): 1506 | print("Graceful shutdown completed successfully", flush=True) 1507 | return 0 1508 | except KeyboardInterrupt: 1509 | # This should rarely happen now due to signal handling, but handle it gracefully 1510 | logger.info("Interrupted by user (KeyboardInterrupt)") 1511 | # Make sure checkpoints are saved 1512 | if hasattr(shutdown_handler, 'checkpoints') and shutdown_handler.checkpoints: 1513 | try: 1514 | save_checkpoints(shutdown_handler.checkpoints) 1515 | logger.info("Final checkpoints saved") 1516 | except CheckpointError as e: 1517 | logger.error(f"Failed to save final checkpoints: {e}") 1518 | return 0 1519 | except CheckpointError as e: 1520 | logger.error(f"Checkpoint error: {e}") 1521 | return 1 1522 | except CTLogError as e: 1523 | logger.error(f"CT log error: {e}") 1524 | return 1 1525 | except MemoryError as e: 1526 | logger.error(f"Memory error: {e}") 1527 | return 1 1528 | except Exception as e: 1529 | logger.error(f"Unexpected error: {e}") 1530 | if args.verbose: 1531 | import traceback 1532 | traceback.print_exc() 1533 | return 1 1534 | 1535 | if __name__ == "__main__": 1536 | exit(main()) ```