#
tokens: 13271/50000 4/4 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── certpatrol.py
├── LICENSE
├── README.md
└── requirements.txt
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
venv/
env/
ENV/

# IDE
.vscode/
.idea/
*.swp
*.swo

# OS
.DS_Store
Thumbs.db

# Project specific
checkpoints/
debug_responses/
*.log
index.html
improveplan.md
pyproject.toml
dist/

# Environment variables
.env
.env.local
.venv/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
[![PyPI Downloads](https://static.pepy.tech/personalized-badge/certpatrol?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=GREEN&left_text=downloads)](https://pepy.tech/projects/certpatrol)

# CertPatrol

<p align="center">
  <img width="609" height="250" alt="Torito Logo" src="https://torito.io/toritocertpatrol.png">
</p>

A fast, lightweight **Certificate Transparency (CT)** log tailer for your terminal. Filter domains with regex, run locally for privacy, and monitor in real time — no third-party servers, no tracking.  

A modern, local, privacy-friendly **CertStream** alternative.

> **Looking for a more advanced CertStream server alternative?**  
> Check out [Certstream Server Go](https://github.com/d-Rickyy-b/certstream-server-go) by [d-Rickyy-b](https://github.com/d-Rickyy-b) for a robust, production-grade solution.

---

## Installation

```bash
pip install certpatrol
```

---

## Quick start

```bash
# Find domains containing "example"
certpatrol --pattern "example"

# Find shop subdomains of amazon.com
certpatrol --pattern "shop.*\.amazon\.com$"

# Match against base domains only (e.g., example.co.uk)
certpatrol --pattern "argentina" --etld1
```

---

## Options

### Core Options
- `-p, --pattern PATTERN` – Regex pattern to match domains against (required)  
- `-l, --logs LOGS` – Specific CT logs to monitor (default: all usable logs)  
- `-v, --verbose` – Verbose output with extra info  
- `-h, --help` – Show help message and exit  

### Polling & Performance
- `-s, --poll-sleep SECONDS` – Initial poll interval (default: 3.0, adaptive)  
- `-mn, --min-poll-sleep` – Minimum poll sleep for adaptive polling (default: 1.0)  
- `-mx, --max-poll-sleep` – Maximum poll sleep for adaptive polling (default: 60.0)  
- `-b, --batch SIZE` – Batch size for fetching entries (default: 256)  
- `-m, --max-memory-mb` – Maximum memory usage in MB (default: 100)  

### Filtering & Output
- `-e, --etld1` – Match against base domains only (requires tldextract)  
- `-q, --quiet-warnings` – Suppress parse warnings (only show matches)  
- `-x, --quiet-parse-errors` – Suppress ASN.1 parsing warnings  
- `-d, --debug-all` – With -v, print detailed per-entry domain listings  

### Checkpoint Management
- `-c, --checkpoint-prefix` – Custom prefix for checkpoint files  
- `-k, --cleanup-checkpoints` – Clean up orphaned checkpoint files and exit  

---

## Examples

```bash
# Basic monitoring
certpatrol --pattern "petsdeli"

# Multiple patterns with verbose output
certpatrol --pattern "(petsdeli|pet-deli)" --verbose

# API subdomains with quiet mode
certpatrol --pattern "api.*\.google\.com$" --quiet-warnings

# All subdomains of a domain with custom memory limit
certpatrol --pattern ".*\.example\.com$" --max-memory-mb 200

# Base domain matching only
certpatrol --pattern "argentina" --etld1

# Run multiple instances with custom prefixes
certpatrol --pattern "domain1" --checkpoint-prefix "instance1" &
certpatrol --pattern "domain2" --checkpoint-prefix "instance2" &

# Clean up old checkpoint files
certpatrol --cleanup-checkpoints

# Performance tuning for high-volume monitoring
certpatrol --pattern "example" --batch 512 --min-poll-sleep 0.5 --max-poll-sleep 30

# Graceful shutdown examples
kill -TERM $(pgrep -f "certpatrol.*example")
# Or use Ctrl+C for immediate graceful shutdown
```

---

## Requirements

- Python 3.6+  
- requests  
- cryptography  
- idna  
- tldextract (optional, for --etld1)  
- psutil (optional, for memory monitoring)  

---

## Features

- **Real-time monitoring** – Starts from current time (no historical data)  
- **Graceful shutdown** – Handles SIGTERM, SIGINT, and SIGHUP signals properly  
- **Adaptive polling** – Automatically adjusts intervals based on activity and errors  
- **Memory management** – Monitors and limits memory usage to prevent excessive consumption  
- **Connection pooling** – Efficient HTTP session management with retry strategies  
- **Checkpoint persistence** – Automatic state saving with atomic writes  
- **Multi-instance support** – Unique checkpoint files per process with custom prefixes  

---

## Notes

- Checkpoints saved in `checkpoints/` folder with process-specific names  
- Signal handling ensures clean shutdown and checkpoint saving  
- Sleep periods are responsive to shutdown signals (checks every 0.5s)  
- Use Ctrl+C, `kill`, or system shutdown for graceful termination  

---

## License

MIT License — see [LICENSE](https://github.com/ToritoIO/CertPatrol/blob/main/LICENSE) file for details.

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
# CertPatrol - Certificate Transparency monitoring tool
# Core dependencies
requests>=2.25.0
cryptography>=3.4.0
idna>=3.0

# Optional but recommended for --etld1 functionality
tldextract>=3.0.0
```

--------------------------------------------------------------------------------
/certpatrol.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Torito CertPatrol - Tiny local CT tailer that filters domains by a regex pattern.

Author: Martin Aberastegue
Website: https://torito.io
Repository: https://github.com/ToritoIO/CertPatrol

Options:
  -p, --pattern PATTERN     Regex pattern to match domains against (required)
  -l, --logs LOGS           CT logs to tail (default: fetch all usable logs)
  -b, --batch SIZE          Batch size for fetching entries (default: 256)
  -s, --poll-sleep SECONDS  Seconds to sleep between polls (default: 3.0)
  -v, --verbose             Verbose output (extra info for matches)
  -q, --quiet-warnings      Suppress parse warnings (only show actual matches)
  -e, --etld1               Match against registrable base domain instead of full domain
  -d, --debug-all           With -v, print per-batch and per-entry domain listings
  -x, --quiet-parse-errors  Suppress ASN.1 parsing warnings (common in CT logs)
  -c, --checkpoint-prefix   Custom prefix for checkpoint file (useful for multiple instances)
  -k, --cleanup-checkpoints Clean up orphaned checkpoint files and exit
  -m, --max-memory-mb       Maximum memory usage in MB for batch processing (default: 100)
  -mn, --min-poll-sleep     Minimum poll sleep time for adaptive polling (default: 1.0)
  -mx, --max-poll-sleep     Maximum poll sleep time for adaptive polling (default: 60.0)
  -h, --help                Show this help message and exit

Requirements:
  pip install requests cryptography idna
  # Optional but recommended for --etld1
  pip install tldextract
"""

# Suppress OpenSSL and cryptography warnings BEFORE any imports
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="urllib3")
warnings.filterwarnings("ignore", message=".*OpenSSL.*")
warnings.filterwarnings("ignore", message=".*LibreSSL.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="cryptography")
warnings.filterwarnings("ignore", message=".*serial number.*")

try:
    from urllib3.exceptions import NotOpenSSLWarning
    warnings.filterwarnings("ignore", category=NotOpenSSLWarning)
except ImportError:
    pass

import argparse
import json
import logging
import os
import re
import signal
import sys
import time
import multiprocessing
import gc
from typing import List, Tuple, Optional, Dict, Any, Iterator, Union
from contextlib import contextmanager

import idna
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from cryptography import x509
from cryptography.hazmat.primitives import serialization

# Make checkpoint file unique per process to avoid conflicts when running multiple instances
CHECKPOINT_DIR = "checkpoints"
CHECKPOINT_FILE = os.path.join(CHECKPOINT_DIR, f"certpatrol_checkpoints_{os.getpid()}.json")
USER_AGENT = "torito-certpatrol/1.2.0 (+local)"
LOG_LIST_URL = "https://www.gstatic.com/ct/log_list/v3/log_list.json"

# Phase 2: Connection pooling and rate limiting constants
DEFAULT_POOL_CONNECTIONS = 10
DEFAULT_POOL_MAXSIZE = 20
DEFAULT_MAX_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.3
DEFAULT_TIMEOUT = (10, 30)  # (connect, read)

# Phase 2: Memory management constants
DEFAULT_MAX_MEMORY_MB = 100
MEMORY_CHECK_INTERVAL = 100  # Check memory every N entries

# Phase 2: Adaptive polling constants
DEFAULT_MIN_POLL_SLEEP = 1.0
DEFAULT_MAX_POLL_SLEEP = 60.0
BACKOFF_MULTIPLIER = 1.5
SUCCESS_REDUCTION_FACTOR = 0.8

# Dynamic CT log discovery - fetched from Google's official list
CT_LOGS = {}

# Phase 3: Logging setup
logger = logging.getLogger('certpatrol')

def setup_logging(verbose: bool = False, quiet_warnings: bool = False) -> None:
    """
    Setup logging configuration based on verbosity settings.
    Maintains backward compatibility with print-based output.
    
    Args:
        verbose: Enable debug level logging
        quiet_warnings: Suppress warning level messages
    """
    # Remove existing handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
    
    # Create console handler
    console_handler = logging.StreamHandler()
    
    # Set logging level based on verbose and quiet settings
    if verbose:
        logger.setLevel(logging.DEBUG)
        console_handler.setLevel(logging.DEBUG)
    elif quiet_warnings:
        logger.setLevel(logging.ERROR)
        console_handler.setLevel(logging.ERROR)
    else:
        logger.setLevel(logging.INFO)
        console_handler.setLevel(logging.INFO)
    
    # Create formatter that mimics original print output
    formatter = logging.Formatter('%(message)s')
    console_handler.setFormatter(formatter)
    
    # Add handler to logger
    logger.addHandler(console_handler)
    logger.propagate = False

# Signal handling for graceful shutdown
class GracefulShutdownHandler:
    """
    Handles graceful shutdown on various signals (SIGTERM, SIGINT, etc.).
    
    This class manages the shutdown process by:
    - Catching termination signals
    - Saving current checkpoints
    - Cleaning up resources
    - Providing clean exit
    """
    
    def __init__(self):
        self.shutdown_requested = False
        self.checkpoints: Optional[Dict[str, int]] = None
        self.cleanup_functions: List[callable] = []
        
    def request_shutdown(self, signum: int, frame) -> None:
        """
        Signal handler that requests graceful shutdown.
        
        Args:
            signum: Signal number
            frame: Current stack frame
        """
        signal_names = {
            signal.SIGTERM: "SIGTERM",
            signal.SIGINT: "SIGINT",
            signal.SIGHUP: "SIGHUP",
        }
        signal_name = signal_names.get(signum, f"Signal {signum}")
        
        if not self.shutdown_requested:
            # Always show shutdown message regardless of quiet mode
            print(f"Received {signal_name}, initiating graceful shutdown...", flush=True)
            self.shutdown_requested = True
            
            # Save checkpoints if available
            if self.checkpoints is not None:
                try:
                    save_checkpoints(self.checkpoints)
                    print("Checkpoints saved successfully", flush=True)
                except CheckpointError as e:
                    print(f"Failed to save checkpoints during shutdown: {e}", flush=True)
            
            # Run cleanup functions
            for cleanup_func in self.cleanup_functions:
                try:
                    cleanup_func()
                except Exception as e:
                    print(f"Error during cleanup: {e}", flush=True)
        else:
            # Second signal, force exit
            print(f"Received second {signal_name}, forcing immediate exit", flush=True)
            sys.exit(1)
    
    def register_cleanup(self, cleanup_func: callable) -> None:
        """
        Register a cleanup function to be called during shutdown.
        
        Args:
            cleanup_func: Function to call during cleanup
        """
        self.cleanup_functions.append(cleanup_func)
    
    def set_checkpoints(self, checkpoints: Dict[str, int]) -> None:
        """
        Set the current checkpoints for potential saving during shutdown.
        
        Args:
            checkpoints: Current checkpoint data
        """
        self.checkpoints = checkpoints
    
    def should_shutdown(self) -> bool:
        """
        Check if shutdown has been requested.
        
        Returns:
            True if shutdown was requested, False otherwise
        """
        return self.shutdown_requested
    
    def setup_signal_handlers(self) -> None:
        """Setup signal handlers for graceful shutdown."""
        # Handle common termination signals
        signals_to_handle = [signal.SIGTERM, signal.SIGINT]
        
        # Add SIGHUP on Unix systems (not available on Windows)
        if hasattr(signal, 'SIGHUP'):
            signals_to_handle.append(signal.SIGHUP)
        
        for sig in signals_to_handle:
            signal.signal(sig, self.request_shutdown)

# Global shutdown handler instance
shutdown_handler = GracefulShutdownHandler()

# Phase 3: Configuration validation
def validate_config_args(args: argparse.Namespace) -> List[str]:
    """
    Validate command line arguments and configuration.
    
    This function performs comprehensive validation of all input parameters
    to catch configuration errors early and provide helpful error messages.
    
    Args:
        args: Parsed command line arguments
        
    Returns:
        List of validation error messages (empty if all valid)
    """
    errors = []
    
    # Validate pattern if provided (not required for cleanup operation)
    if hasattr(args, 'pattern') and args.pattern:
        try:
            re.compile(args.pattern, re.IGNORECASE)
        except re.error as e:
            errors.append(f"Invalid regex pattern: {e}")
    
    # Validate numeric parameters
    if hasattr(args, 'batch') and args.batch is not None:
        if args.batch <= 0:
            errors.append("Batch size must be positive")
        elif args.batch > 10000:
            errors.append("Batch size too large (max 10000)")
    
    if hasattr(args, 'poll_sleep') and args.poll_sleep is not None:
        if args.poll_sleep < 0:
            errors.append("Poll sleep cannot be negative")
        elif args.poll_sleep > 3600:
            errors.append("Poll sleep too large (max 3600 seconds)")
    
    if hasattr(args, 'max_memory_mb') and args.max_memory_mb is not None:
        if args.max_memory_mb <= 0:
            errors.append("Max memory must be positive")
        elif args.max_memory_mb < 10:
            errors.append("Max memory too small (min 10MB)")
        elif args.max_memory_mb > 10000:
            errors.append("Max memory too large (max 10GB)")
    
    if hasattr(args, 'min_poll_sleep') and args.min_poll_sleep is not None:
        if args.min_poll_sleep < 0:
            errors.append("Min poll sleep cannot be negative")
        elif args.min_poll_sleep > 300:
            errors.append("Min poll sleep too large (max 300 seconds)")
    
    if hasattr(args, 'max_poll_sleep') and args.max_poll_sleep is not None:
        if args.max_poll_sleep < 0:
            errors.append("Max poll sleep cannot be negative")
        elif args.max_poll_sleep > 3600:
            errors.append("Max poll sleep too large (max 3600 seconds)")
    
    # Cross-parameter validation
    if (hasattr(args, 'min_poll_sleep') and hasattr(args, 'max_poll_sleep') and 
        args.min_poll_sleep is not None and args.max_poll_sleep is not None):
        if args.max_poll_sleep < args.min_poll_sleep:
            errors.append("Max poll sleep must be >= min poll sleep")
    
    # Validate checkpoint prefix if provided
    if hasattr(args, 'checkpoint_prefix') and args.checkpoint_prefix:
        import string
        safe_chars = string.ascii_letters + string.digits + "_-"
        if not all(c in safe_chars for c in args.checkpoint_prefix):
            errors.append("Checkpoint prefix can only contain letters, digits, underscores, and hyphens")
        if len(args.checkpoint_prefix) > 50:
            errors.append("Checkpoint prefix too long (max 50 characters)")
    
    return errors

# Phase 1 Improvement: Custom exceptions for better error handling
class CertPatrolError(Exception):
    """
    Base exception for CertPatrol errors.
    
    All custom exceptions in CertPatrol inherit from this base class
    to allow for easy exception handling and categorization.
    """
    pass

class CheckpointError(CertPatrolError):
    """
    Raised when checkpoint operations fail.
    
    This includes scenarios such as:
    - Failed to create checkpoint directory
    - Corrupted checkpoint files
    - Invalid checkpoint data structure
    - Atomic write failures
    """
    pass

class CTLogError(CertPatrolError):
    """
    Raised when CT log operations fail.
    
    This includes scenarios such as:
    - Network errors when connecting to CT logs
    - Invalid responses from CT log endpoints
    - Malformed JSON data from CT logs
    - Authentication or rate limiting issues
    """
    pass

class MemoryError(CertPatrolError):
    """
    Raised when memory limits are exceeded.
    
    This custom memory error is separate from Python's built-in
    MemoryError to distinguish between system-level and
    application-level memory management issues.
    """
    pass

# Phase 2: HTTP Session Management with Connection Pooling
class HTTPSessionManager:
    """
    Manages HTTP sessions with connection pooling and retries.
    
    This class provides a centralized way to manage HTTP connections
    for CT log communication, implementing connection pooling for
    improved performance and retry strategies for better reliability.
    
    Attributes:
        timeout: Tuple of (connect_timeout, read_timeout) in seconds
        session: The underlying requests.Session object
    """
    
    def __init__(
        self, 
        pool_connections: int = DEFAULT_POOL_CONNECTIONS,
        pool_maxsize: int = DEFAULT_POOL_MAXSIZE,
        max_retries: int = DEFAULT_MAX_RETRIES,
        backoff_factor: float = DEFAULT_BACKOFF_FACTOR,
        timeout: Tuple[int, int] = DEFAULT_TIMEOUT
    ) -> None:
        """
        Initialize the HTTP session manager.
        
        Args:
            pool_connections: Number of connection pools to cache
            pool_maxsize: Maximum number of connections to save in the pool
            max_retries: Maximum number of retry attempts
            backoff_factor: Backoff factor for retry delays
            timeout: Tuple of (connect, read) timeout values in seconds
        """
        self.timeout = timeout
        self.session = self._create_session(pool_connections, pool_maxsize, max_retries, backoff_factor)
    
    def _create_session(
        self, 
        pool_connections: int, 
        pool_maxsize: int,
        max_retries: int, 
        backoff_factor: float
    ) -> requests.Session:
        """
        Create a requests session with connection pooling and retry strategy.
        
        Args:
            pool_connections: Number of connection pools to cache
            pool_maxsize: Maximum number of connections to save in the pool
            max_retries: Maximum number of retry attempts
            backoff_factor: Backoff factor for retry delays
            
        Returns:
            Configured requests.Session object
        """
        session = requests.Session()
        
        # Configure retry strategy
        retry_strategy = Retry(
            total=max_retries,
            status_forcelist=[429, 500, 502, 503, 504],
            backoff_factor=backoff_factor,
            allowed_methods=["HEAD", "GET", "OPTIONS"]
        )
        
        # Configure HTTP adapter with connection pooling
        adapter = HTTPAdapter(
            pool_connections=pool_connections,
            pool_maxsize=pool_maxsize,
            max_retries=retry_strategy,
            pool_block=False
        )
        
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        session.headers.update({"User-Agent": USER_AGENT})
        
        return session
    
    def get(self, url: str, **kwargs) -> requests.Response:
        """
        Make a GET request using the managed session.
        
        Args:
            url: The URL to request
            **kwargs: Additional arguments passed to requests.get()
            
        Returns:
            Response object from the request
        """
        kwargs.setdefault('timeout', self.timeout)
        return self.session.get(url, **kwargs)
    
    def close(self) -> None:
        """Close the session and clean up connections."""
        if hasattr(self, 'session'):
            self.session.close()
    
    def __enter__(self) -> 'HTTPSessionManager':
        """Context manager entry."""
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit with cleanup."""
        self.close()

# Phase 2: Adaptive Rate Limiting
class AdaptiveRateLimiter:
    """
    Manages adaptive polling intervals with exponential backoff on errors.
    
    This class implements an adaptive rate limiting strategy that:
    - Increases sleep intervals on consecutive errors (exponential backoff)
    - Decreases sleep intervals on consecutive successes
    - Maintains configurable minimum and maximum sleep bounds
    
    Attributes:
        current_sleep: Current sleep interval in seconds
        min_sleep: Minimum allowed sleep interval
        max_sleep: Maximum allowed sleep interval
        consecutive_errors: Count of consecutive error occurrences
        consecutive_successes: Count of consecutive successful operations
    """
    
    def __init__(
        self, 
        initial_sleep: float, 
        min_sleep: float = DEFAULT_MIN_POLL_SLEEP,
        max_sleep: float = DEFAULT_MAX_POLL_SLEEP
    ) -> None:
        """
        Initialize the adaptive rate limiter.
        
        Args:
            initial_sleep: Starting sleep interval in seconds
            min_sleep: Minimum allowed sleep interval in seconds
            max_sleep: Maximum allowed sleep interval in seconds
        """
        self.current_sleep = initial_sleep
        self.min_sleep = min_sleep
        self.max_sleep = max_sleep
        self.consecutive_errors = 0
        self.consecutive_successes = 0
    
    def on_success(self) -> None:
        """
        Called when an operation succeeds.
        
        Resets error counter and potentially reduces sleep interval
        after multiple consecutive successes.
        """
        self.consecutive_errors = 0
        self.consecutive_successes += 1
        
        # Gradually reduce sleep time on consecutive successes
        if self.consecutive_successes >= 3:
            self.current_sleep = max(
                self.min_sleep,
                self.current_sleep * SUCCESS_REDUCTION_FACTOR
            )
            self.consecutive_successes = 0
    
    def on_error(self) -> None:
        """
        Called when an operation fails.
        
        Resets success counter and increases sleep interval
        using exponential backoff strategy.
        """
        self.consecutive_successes = 0
        self.consecutive_errors += 1
        
        # Exponential backoff on consecutive errors
        self.current_sleep = min(
            self.max_sleep,
            self.current_sleep * (BACKOFF_MULTIPLIER ** self.consecutive_errors)
        )
    
    def sleep(self) -> None:
        """Sleep for the current adaptive interval."""
        time.sleep(self.current_sleep)
    
    def get_current_sleep(self) -> float:
        """
        Get the current sleep interval.
        
        Returns:
            Current sleep interval in seconds
        """
        return self.current_sleep

# Phase 2: Memory Monitor
class MemoryMonitor:
    """
    Monitors and manages memory usage during processing.
    
    This class provides memory monitoring capabilities to prevent
    excessive memory usage during certificate processing. It can
    trigger garbage collection and raise exceptions when limits
    are exceeded.
    
    Attributes:
        max_memory_bytes: Maximum allowed memory usage in bytes
        check_counter: Counter for periodic memory checks
    """
    
    def __init__(self, max_memory_mb: int = DEFAULT_MAX_MEMORY_MB) -> None:
        """
        Initialize the memory monitor.
        
        Args:
            max_memory_mb: Maximum allowed memory usage in megabytes
        """
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.check_counter = 0
    
    def check_memory(self) -> None:
        """
        Check current memory usage and trigger GC if needed.
        
        Raises:
            MemoryError: If memory usage exceeds the configured limit
                        even after garbage collection
        """
        self.check_counter += 1
        
        if self.check_counter % MEMORY_CHECK_INTERVAL == 0:
            try:
                import psutil  # type: ignore[import-untyped]
                process = psutil.Process()
                memory_info = process.memory_info()
                
                if memory_info.rss > self.max_memory_bytes:
                    # Force garbage collection
                    gc.collect()
                    
                    # Check again after GC
                    memory_info = process.memory_info()
                    if memory_info.rss > self.max_memory_bytes:
                        raise MemoryError(
                            f"Memory usage ({memory_info.rss / 1024 / 1024:.1f}MB) "
                            f"exceeds limit ({self.max_memory_bytes / 1024 / 1024:.1f}MB)"
                        )
            except ImportError:
                # psutil not available, use basic GC trigger
                if self.check_counter % (MEMORY_CHECK_INTERVAL * 10) == 0:
                    gc.collect()

# --- Phase 1: Certificate parsing with cryptography library ---

def _read_uint24(b: bytes, offset: int) -> Tuple[int, int]:
    """
    Read a 3-byte big-endian unsigned int, return (value, new_offset).
    
    Args:
        b: Byte array to read from
        offset: Starting position in the byte array
        
    Returns:
        Tuple of (parsed_value, new_offset)
        
    Raises:
        ValueError: If there are insufficient bytes remaining
    """
    if offset + 3 > len(b):
        raise ValueError("Truncated uint24")
    return (b[offset] << 16) | (b[offset+1] << 8) | b[offset+2], offset + 3

def parse_tls_cert_chain(extra_data_b64: str) -> List[bytes]:
    """
    Parse certificates from CT 'extra_data' (base64).
    CT logs concatenate DER certificates directly, not in TLS structure.
    Returns a list of DER cert bytes [leaf, intermediates...].
    
    Phase 1: Replaced manual ASN.1 parsing with cryptography library-based parsing
    Phase 2: Added memory efficiency improvements
    """
    import base64
    try:
        raw = base64.b64decode(extra_data_b64)
        if len(raw) < 10:  # Minimum reasonable certificate size
            return []
        
        certs = []
        pos = 0
        
        # Phase 2: Process certificates with memory awareness
        while pos < len(raw):
            try:
                # Look for ASN.1 SEQUENCE start (0x30) which indicates start of certificate
                if pos + 1 >= len(raw) or raw[pos] != 0x30:
                    pos += 1
                    continue
                
                # Use a more robust approach: try different certificate lengths
                min_cert_size = 100  # Very small certificates are unlikely
                max_cert_size = min(len(raw) - pos, 10 * 1024 * 1024)  # Max 10MB per cert
                
                cert_found = False
                
                # Try to find the correct certificate boundary by testing if we can parse it
                for try_end in range(pos + min_cert_size, min(pos + max_cert_size + 1, len(raw) + 1)):
                    try:
                        candidate_der = raw[pos:try_end]
                        
                        # Attempt to parse with cryptography library - this validates the DER structure
                        test_cert = x509.load_der_x509_certificate(candidate_der)
                        
                        # If we got here, the certificate parsed successfully
                        certs.append(candidate_der)
                        pos = try_end
                        cert_found = True
                        break
                        
                    except (ValueError, TypeError, x509.ExtensionNotFound, x509.InvalidVersion) as e:
                        # These are expected for partial certificates or invalid DER
                        continue
                    except Exception:
                        # Unexpected error, skip this attempt
                        continue
                
                if not cert_found:
                    # No valid certificate found starting at this position, advance by 1
                    pos += 1
                    
            except Exception:
                # If anything goes wrong, advance position and continue
                pos += 1
                continue
        
        return certs
        
    except Exception:
        # Fallback: if base64 decode fails or other fundamental error
        return []

def extract_domains_from_der(der_bytes: bytes) -> List[str]:
    """
    Extract DNS names from SAN; if absent, fallback to CN when it looks like a DNS name.
    Returns lowercased, Unicode (IDNA-decoded) domains.
    """
    domains = []
    
    # Suppress warnings for certificates with non-compliant serial numbers
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=DeprecationWarning)
        warnings.filterwarnings("ignore", message=".*serial number.*")
        cert = x509.load_der_x509_certificate(der_bytes)
    
    # Try SAN first
    try:
        san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
        for name in san.value.get_values_for_type(x509.DNSName):
            domains.append(name)
    except x509.ExtensionNotFound:
        pass

    # Fallback: subject CN
    if not domains:
        try:
            cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value
            # crude DNS-ish check: contains a dot or wildcard
            if "." in cn or cn.startswith("*."):
                domains.append(cn)
        except IndexError:
            pass

    # Normalize: lower-case, IDNA decode to Unicode for display, but keep ASCII if decode fails
    normed = []
    for d in domains:
        d = d.strip().lower()
        if d.startswith("*."):
            base = d[2:]
            try:
                u = idna.decode(base)
                normed.append("*." + u)
            except Exception:
                normed.append(d)
        else:
            try:
                u = idna.decode(d)
                normed.append(u)
            except Exception:
                normed.append(d)
    return list(dict.fromkeys(normed))  # dedupe, keep order

def registrable_domain(domain: str) -> str:
    """
    Return the registrable base domain (eTLD+1) for a given domain string.
    Falls back to a best-effort heuristic if tldextract is unavailable.
    Keeps Unicode/IDNA-decoded input as-is.
    """
    # Strip wildcard for matching purposes
    d = domain.lstrip("*.")
    try:
        # Import locally to avoid hard dependency unless used
        import tldextract  # type: ignore
        ext = tldextract.extract(d)
        if ext.domain and ext.suffix:
            return f"{ext.domain}.{ext.suffix}"
        return d
    except Exception:
        parts = d.split(".")
        if len(parts) >= 2:
            return ".".join(parts[-2:])
        return d

# --- Phase 2: Enhanced Dynamic CT log discovery with connection pooling ---

def fetch_usable_ct_logs(verbose: bool = False, session_manager: HTTPSessionManager = None) -> Dict[str, str]:
    """
    Fetch the current list of usable CT logs from Google's official list.
    Returns a dict mapping log names to base URLs.
    
    Phase 2: Uses HTTP session manager for connection pooling.
    """
    close_session = False
    if session_manager is None:
        session_manager = HTTPSessionManager()
        close_session = True
    
    try:
        if verbose:
            logger.info("Fetching current CT log list from Google...")
        
        resp = session_manager.get(LOG_LIST_URL)
        resp.raise_for_status()
        data = resp.json()
        
        usable_logs = {}
        
        # Extract logs from all operators
        for operator in data.get("operators", []):
            operator_name = operator.get("name", "unknown")
            
            for log in operator.get("logs", []):
                # Check if log is usable/qualified
                state = log.get("state", {})
                if "usable" in state or "qualified" in state:
                    url = log["url"].rstrip("/")
                    description = log.get("description", "")
                    
                    # Create a simple name from description or URL
                    if description:
                        # Extract meaningful name from description
                        name = description.lower()
                        name = name.replace("'", "").replace('"', "")
                        name = name.replace(" log", "").replace(" ", "_")
                        # Take first part if too long
                        name = name.split("_")[0:2]
                        name = "_".join(name)
                    else:
                        # Fallback to URL-based name
                        name = url.split("/")[-1] or url.split("/")[-2]
                    
                    # Ensure unique names
                    original_name = name
                    counter = 1
                    while name in usable_logs:
                        name = f"{original_name}_{counter}"
                        counter += 1
                    
                    usable_logs[name] = url
                    
                    if verbose:
                        logger.info(f"Found usable log: {name} -> {url}")
        
        if verbose:
            logger.info(f"Found {len(usable_logs)} usable CT logs")
        
        return usable_logs
        
    except requests.RequestException as e:
        if verbose:
            logger.warning(f"Network error fetching CT log list: {e}")
        # Fallback to a known working log
        return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"}
    except (json.JSONDecodeError, KeyError, TypeError) as e:
        if verbose:
            logger.warning(f"Failed to parse CT log list: {e}")
        # Fallback to a known working log
        return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"}
    except Exception as e:
        if verbose:
            logger.warning(f"Unexpected error fetching CT log list: {e}")
        # Fallback to a known working log
        return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"}
    finally:
        if close_session:
            session_manager.close()

def save_debug_response(name: str, entry: dict, absolute_idx: int) -> None:
    """
    Save a CT log entry to a debug file for analysis.
    
    Args:
        name: Name of the CT log
        entry: The CT log entry data to save
        absolute_idx: Absolute index of the entry in the log
    """
    debug_dir = "debug_responses"
    try:
        if not os.path.exists(debug_dir):
            os.makedirs(debug_dir)
        
        filename = f"{debug_dir}/{name}_{absolute_idx}.json"
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(entry, f, indent=2, default=str)
        logger.debug(f"Saved response to {filename}")
    except (OSError, IOError, TypeError, ValueError) as e:
        logger.debug(f"Failed to save response: {e}")

# --- Phase 2: Enhanced CT polling with connection pooling ---

def get_sth(base_url: str, session_manager: HTTPSessionManager) -> int:
    """
    Return current tree_size of the CT log.
    Phase 2: Uses HTTP session manager for connection pooling.
    """
    try:
        r = session_manager.get(f"{base_url}/ct/v1/get-sth")
        r.raise_for_status()
        data = r.json()
        return int(data["tree_size"])
    except requests.RequestException as e:
        raise CTLogError(f"Network error getting STH from {base_url}: {e}")
    except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
        raise CTLogError(f"Invalid STH response from {base_url}: {e}")

def get_entries(base_url: str, start: int, end: int, session_manager: HTTPSessionManager) -> List[dict]:
    """
    Fetch entries [start..end] inclusive (may return fewer).
    Phase 2: Uses HTTP session manager for connection pooling.
    """
    try:
        r = session_manager.get(
            f"{base_url}/ct/v1/get-entries",
            params={"start": start, "end": end}
        )
        r.raise_for_status()
        data = r.json()
        return data.get("entries", [])
    except requests.RequestException as e:
        raise CTLogError(f"Network error getting entries from {base_url}: {e}")
    except (json.JSONDecodeError, KeyError, TypeError) as e:
        raise CTLogError(f"Invalid entries response from {base_url}: {e}")

# --- Phase 1: Enhanced Checkpoint Management ---

def ensure_checkpoint_dir():
    """Ensure the checkpoints directory exists."""
    try:
        if not os.path.exists(CHECKPOINT_DIR):
            os.makedirs(CHECKPOINT_DIR)
    except OSError as e:
        raise CheckpointError(f"Failed to create checkpoint directory: {e}")

def validate_checkpoint_data(data: Any) -> Dict[str, int]:
    """
    Validate checkpoint data structure and content.
    Returns validated data or raises CheckpointError.
    """
    if not isinstance(data, dict):
        raise CheckpointError("Checkpoint data must be a dictionary")
    
    validated = {}
    for key, value in data.items():
        if not isinstance(key, str):
            raise CheckpointError(f"Checkpoint key must be string, got {type(key)}")
        if not isinstance(value, (int, float)):
            raise CheckpointError(f"Checkpoint value must be numeric, got {type(value)}")
        
        # Convert to int and validate range
        try:
            int_value = int(value)
            if int_value < 0:
                raise CheckpointError(f"Checkpoint value must be non-negative, got {int_value}")
            validated[key] = int_value
        except (ValueError, OverflowError) as e:
            raise CheckpointError(f"Invalid checkpoint value for {key}: {e}")
    
    return validated

def load_checkpoints() -> Dict[str, int]:
    """
    Enhanced checkpoint loading with validation.
    
    Returns:
        Dictionary mapping log names to checkpoint positions
    """
    ensure_checkpoint_dir()
    if os.path.exists(CHECKPOINT_FILE):
        try:
            with open(CHECKPOINT_FILE, "r", encoding="utf-8") as fh:
                raw_data = json.load(fh)
            return validate_checkpoint_data(raw_data)
        except (json.JSONDecodeError, IOError) as e:
            logger.warning(f"Corrupted checkpoint file, starting fresh: {e}")
            return {}
        except CheckpointError as e:
            logger.warning(f"Invalid checkpoint data, starting fresh: {e}")
            return {}
    return {}

def save_checkpoints(cp: Dict[str, int]) -> None:
    """
    Enhanced atomic write with validation and integrity checks.
    
    Args:
        cp: Dictionary mapping log names to checkpoint positions
        
    Raises:
        CheckpointError: If checkpoint data is invalid or save operation fails
    """
    ensure_checkpoint_dir()
    
    # Validate checkpoint data before saving
    try:
        validated_cp = validate_checkpoint_data(cp)
    except CheckpointError as e:
        raise CheckpointError(f"Cannot save invalid checkpoint data: {e}")
    
    tmp = CHECKPOINT_FILE + ".tmp"
    max_retries = 3
    retry_delay = 0.1
    
    for attempt in range(max_retries):
        try:
            # Write to temporary file
            with open(tmp, "w", encoding="utf-8") as fh:
                json.dump(validated_cp, fh, indent=2)
            
            # Verify the file was written correctly by reading it back
            try:
                with open(tmp, "r", encoding="utf-8") as fh:
                    verify_data = json.load(fh)
                validate_checkpoint_data(verify_data)
            except Exception as e:
                raise CheckpointError(f"Checkpoint verification failed: {e}")
            
            # Atomically replace the original file
            os.replace(tmp, CHECKPOINT_FILE)
            return
            
        except (OSError, IOError, CheckpointError) as e:
            if attempt < max_retries - 1:
                time.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
                continue
            else:
                # Clean up temp file on final failure
                try:
                    if os.path.exists(tmp):
                        os.unlink(tmp)
                except OSError:
                    pass
                raise CheckpointError(f"Failed to save checkpoints after {max_retries} attempts: {e}")

def cleanup_checkpoint_file() -> None:
    """
    Clean up checkpoint file when process exits.
    
    This function is typically called during program shutdown
    to remove process-specific checkpoint files.
    """
    try:
        if os.path.exists(CHECKPOINT_FILE):
            os.unlink(CHECKPOINT_FILE)
    except OSError as e:
        logger.warning(f"Failed to cleanup checkpoint file: {e}")

def cleanup_orphaned_checkpoints() -> None:
    """
    Clean up checkpoint files from processes that are no longer running.
    
    This function scans for checkpoint files and removes those belonging
    to processes that are no longer active.
    """
    import glob
    ensure_checkpoint_dir()
    checkpoint_files = glob.glob(os.path.join(CHECKPOINT_DIR, "*.json"))
    cleaned = 0
    
    for checkpoint_file in checkpoint_files:
        try:
            # Extract filename without path
            filename = os.path.basename(checkpoint_file)
            # Try to parse the filename to extract PID
            if filename.startswith("certpatrol_checkpoints_") and filename.endswith(".json"):
                pid_part = filename[24:-5]  # Remove "certpatrol_checkpoints_" prefix and ".json" suffix
                if "_" in pid_part:
                    # Has custom prefix, extract PID from end
                    pid = pid_part.split("_")[-1]
                else:
                    # No custom prefix, entire part is PID
                    pid = pid_part
                
                try:
                    pid = int(pid)
                    # Check if process is still running
                    os.kill(pid, 0)
                    # Process exists, keep file
                except (ValueError, OSError):
                    # Process doesn't exist, remove file
                    os.unlink(checkpoint_file)
                    cleaned += 1
                    logger.info(f"Removed orphaned checkpoint: {filename}")
        except Exception as e:
            logger.warning(f"Failed to process checkpoint file {checkpoint_file}: {e}")
    
    if cleaned > 0:
        logger.info(f"Cleaned up {cleaned} orphaned checkpoint files")
    else:
        logger.info("No orphaned checkpoint files found")

# --- Phase 2: Enhanced streaming entry processor ---

def process_entries_streaming(
    entries: List[dict],
    pattern: re.Pattern,
    match_scope: str,
    verbose: bool,
    debug_all: bool,
    quiet_parse_errors: bool,
    quiet_warnings: bool,
    log_name: str,
    start_idx: int,
    memory_monitor: MemoryMonitor
) -> Iterator[str]:
    """
    Process CT log entries in streaming fashion to optimize memory usage.
    Phase 2: Added memory monitoring and streaming processing.
    
    Note: When match_scope == "etld1", the pattern is matched against the registrable
    base domain (eTLD+1), not the full domain. For example:
    - Full domain: "tom-tochito.workers.dev" 
    - Registrable domain: "workers.dev"
    - Pattern should match "workers.dev", not ".*\\.workers\\.dev$"
    """
    for i, entry in enumerate(entries):
        absolute_idx = start_idx + i
        
        # Phase 2: Check memory usage periodically
        memory_monitor.check_memory()
        
        try:
            chain = parse_tls_cert_chain(entry["extra_data"])
            if not chain:
                if verbose:
                    logger.debug(f"{log_name}@{absolute_idx}: no valid chain parsed")
                    # Save first few failed responses for debugging
                    if absolute_idx % 100 == 0:  # Save every 100th failed entry
                        save_debug_response(log_name, entry, absolute_idx)
                continue
                
            leaf_der = chain[0]  # end-entity first
            domains = extract_domains_from_der(leaf_der)
            
            if verbose and debug_all and domains:
                logger.debug(f"{log_name}@{absolute_idx}: found domains: {domains}")
                
            # Process matches immediately to reduce memory footprint
            for d in domains:
                target = registrable_domain(d) if match_scope == "etld1" else d
                if pattern.search(target):
                    yield d
                    # If verbose, also yield metadata
                    if verbose:
                        ts = entry.get("sct", {}).get("timestamp")
                        yield f"# matched {d} | log={log_name} idx={absolute_idx} ts={ts}"
            
            # Phase 2: Clear references to help with memory management
            del entry
            if i % 50 == 0:  # Force GC every 50 entries
                gc.collect()
                
        except Exception as e:
            if verbose and not quiet_warnings and not quiet_parse_errors:
                logger.warning(f"{log_name}@{absolute_idx}: parse failed: {e}")
                # Save first few failed responses for debugging
                if absolute_idx % 100 == 0:
                    save_debug_response(log_name, entry, absolute_idx)
            continue

# --- Phase 2: Main log tailing function with performance improvements ---

def tail_logs(
    logs: List[str],
    pattern: re.Pattern,
    batch: int = 256,
    poll_sleep: float = 3.0,
    verbose: bool = False,
    ct_logs: dict = None,
    quiet_warnings: bool = False,
    match_scope: str = "full",
    debug_all: bool = False,
    quiet_parse_errors: bool = False,
    max_memory_mb: int = DEFAULT_MAX_MEMORY_MB,
    min_poll_sleep: float = DEFAULT_MIN_POLL_SLEEP,
    max_poll_sleep: float = DEFAULT_MAX_POLL_SLEEP,
):
    """
    Main log tailing function.
    Phase 2: Added HTTP connection pooling, adaptive rate limiting, and memory management.
    """
    if ct_logs is None:
        ct_logs = CT_LOGS

    # Phase 2: Initialize performance components
    session_manager = HTTPSessionManager()
    rate_limiter = AdaptiveRateLimiter(poll_sleep, min_poll_sleep, max_poll_sleep)
    memory_monitor = MemoryMonitor(max_memory_mb)
    
    # Register session cleanup with shutdown handler
    shutdown_handler.register_cleanup(session_manager.close)
    
    try:
        checkpoints = load_checkpoints()
    except CheckpointError as e:
        logger.error(f"Failed to load checkpoints: {e}")
        return
    finally:
        # Ensure session cleanup on any exit
        import atexit
        atexit.register(session_manager.close)

    # Initialize checkpoints at current tree_size (tail-from-now semantics)
    for name in logs:
        # Check for shutdown during initialization
        if shutdown_handler.should_shutdown():
            print("Shutdown requested during initialization", flush=True)
            return
            
        if name not in ct_logs:
            if verbose:
                logger.warning(f"Unknown log: {name}")
            continue
        base = ct_logs[name]
        if name not in checkpoints:
            try:
                tree_size = get_sth(base, session_manager)
                checkpoints[name] = tree_size  # next index to fetch
                if verbose:
                    logger.info(f"{name}: starting at index {tree_size}")
                rate_limiter.on_success()  # STH fetch succeeded
            except CTLogError as e:
                logger.warning(f"{name}: failed to init STH: {e}")
                checkpoints[name] = 0
                rate_limiter.on_error()  # STH fetch failed

    try:
        save_checkpoints(checkpoints)
    except CheckpointError as e:
        logger.error(f"Failed to save initial checkpoints: {e}")
        return

    # Set up checkpoints for graceful shutdown
    shutdown_handler.set_checkpoints(checkpoints)

    if verbose:
        logger.info(f"Using adaptive polling: {min_poll_sleep}s - {max_poll_sleep}s")
        logger.info(f"Memory limit: {max_memory_mb}MB")

    while True:
        # Check for graceful shutdown request
        if shutdown_handler.should_shutdown():
            print("Graceful shutdown requested, exiting main loop", flush=True)
            break
        any_progress = False
        loop_start_time = time.time()
        
        for name in logs:
            if name not in ct_logs:
                continue
            base = ct_logs[name]
            
            # Phase 2: Check memory before processing each log
            try:
                memory_monitor.check_memory()
            except MemoryError as e:
                logger.error(str(e))
                # Force a more aggressive GC and continue
                gc.collect()
                continue
            
            # Determine target size
            try:
                tree_size = get_sth(base, session_manager)
                rate_limiter.on_success()  # STH fetch succeeded
            except CTLogError as e:
                if verbose:
                    logger.warning(f"{name}: get-sth failed: {e}")
                rate_limiter.on_error()  # STH fetch failed
                continue

            next_idx = checkpoints.get(name, 0)
            if next_idx >= tree_size:
                # nothing new
                continue

            any_progress = True
            # Fetch in batches up to current tree_size-1
            end_idx = min(next_idx + batch - 1, tree_size - 1)

            try:
                entries = get_entries(base, next_idx, end_idx, session_manager)
                rate_limiter.on_success()  # Entries fetch succeeded
            except CTLogError as e:
                if verbose:
                    logger.warning(f"{name}: get-entries {next_idx}-{end_idx} failed: {e}")
                rate_limiter.on_error()  # Entries fetch failed
                continue

            # Process entries with streaming
            if verbose and debug_all and entries:
                logger.debug(f"{name}: processing {len(entries)} entries from {next_idx} to {end_idx}")
            
            # Phase 2: Use streaming processor for better memory efficiency
            try:
                for result in process_entries_streaming(
                    entries, pattern, match_scope, verbose, debug_all,
                    quiet_parse_errors, quiet_warnings, name, next_idx, memory_monitor
                ):
                    print(result, flush=True)
            except MemoryError as e:
                logger.error(f"Memory limit exceeded processing {name}: {e}")
                # Skip this batch and continue
                continue

            try:
                checkpoints[name] = end_idx + 1
                save_checkpoints(checkpoints)
                # Update shutdown handler with latest checkpoints
                shutdown_handler.set_checkpoints(checkpoints)
            except CheckpointError as e:
                logger.error(f"Failed to save checkpoints for {name}: {e}")
                # Continue processing other logs even if checkpoint save fails

        # Phase 2: Adaptive sleep based on progress and errors
        if not any_progress:
            if verbose:
                current_sleep = rate_limiter.get_current_sleep()
                logger.debug(f"No progress, sleeping for {current_sleep:.1f}s")
            
            # Sleep in smaller chunks to allow for responsive shutdown
            sleep_time = rate_limiter.get_current_sleep()
            sleep_chunk = 0.5  # Check for shutdown every 0.5 seconds
            while sleep_time > 0 and not shutdown_handler.should_shutdown():
                chunk = min(sleep_chunk, sleep_time)
                time.sleep(chunk)
                sleep_time -= chunk
        else:
            # Had progress, reduce sleep time
            rate_limiter.on_success()
            
        # Phase 2: Show performance stats periodically
        if verbose and any_progress:
            loop_time = time.time() - loop_start_time
            logger.debug(f"Loop completed in {loop_time:.2f}s, current poll interval: {rate_limiter.get_current_sleep():.1f}s")

def main() -> int:
    """
    Main entry point for CertPatrol.
    
    Returns:
        Exit code (0 for success, non-zero for errors)
    """
    parser = argparse.ArgumentParser(
        description="Torito CertPatrol - Tiny local CT tailer that filters domains by a regex pattern",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        add_help=False
    )
    parser.add_argument(
        "--pattern", "-p",
        required=False,  # Make optional since cleanup-checkpoints doesn't need it
        help="Regex pattern to match domains against"
    )
    parser.add_argument(
        "--logs", "-l",
        nargs="+",
        default=None,
        help="CT logs to tail (default: fetch all usable logs)"
    )
    parser.add_argument(
        "--batch", "-b",
        type=int,
        default=256,
        help="Batch size for fetching entries (default: 256)"
    )
    parser.add_argument(
        "--poll-sleep", "-s",
        type=float,
        default=3.0,
        help="Initial seconds to sleep between polls (default: 3.0, adaptive)"
    )
    parser.add_argument(
        "--verbose", "-v",
        action="store_true",
        help="Verbose output"
    )
    parser.add_argument(
        "--quiet-warnings", "-q",
        action="store_true",
        help="Suppress parse warnings (only show actual matches)"
    )
    parser.add_argument(
        "--etld1", "-e",
        action="store_true",
        help="Match against registrable base domain instead of full domain (e.g., 'workers.dev' not 'example.workers.dev')"
    )
    parser.add_argument(
        "--debug-all", "-d",
        action="store_true",
        help="With -v, print per-batch and per-entry domain listings"
    )
    parser.add_argument(
        "--quiet-parse-errors", "-x",
        action="store_true",
        help="Suppress ASN.1 parsing warnings (common in CT logs)"
    )
    parser.add_argument(
        "--checkpoint-prefix", "-c",
        help="Custom prefix for checkpoint file (useful for multiple instances)"
    )
    parser.add_argument(
        "--cleanup-checkpoints", "-k",
        action="store_true",
        help="Clean up orphaned checkpoint files and exit"
    )
    # Phase 2: New performance and memory options
    parser.add_argument(
        "--max-memory-mb", "-m",
        type=int,
        default=DEFAULT_MAX_MEMORY_MB,
        help=f"Maximum memory usage in MB for batch processing (default: {DEFAULT_MAX_MEMORY_MB})"
    )
    parser.add_argument(
        "--min-poll-sleep", "-mn",
        type=float,
        default=DEFAULT_MIN_POLL_SLEEP,
        help=f"Minimum poll sleep time for adaptive polling (default: {DEFAULT_MIN_POLL_SLEEP})"
    )
    parser.add_argument(
        "--max-poll-sleep", "-mx",
        type=float,
        default=DEFAULT_MAX_POLL_SLEEP,
        help=f"Maximum poll sleep time for adaptive polling (default: {DEFAULT_MAX_POLL_SLEEP})"
    )
    parser.add_argument(
        "--help", "-h",
        action="store_true",
        help="Show this help message and exit"
    )

    args = parser.parse_args()
    
    # Phase 3: Setup logging early (before any output)
    setup_logging(verbose=getattr(args, 'verbose', False), 
                  quiet_warnings=getattr(args, 'quiet_warnings', False))
    
    # Setup signal handlers for graceful shutdown
    shutdown_handler.setup_signal_handlers()
    
    # Handle help command
    if args.help:
        print(__doc__)
        return 0
    
    # Handle cleanup command
    if args.cleanup_checkpoints:
        try:
            cleanup_orphaned_checkpoints()
            return 0
        except Exception as e:
            logger.error(f"Cleanup failed: {e}")
            return 1
    
    # Validate that pattern is provided for normal operation
    if not args.pattern:
        logger.error("--pattern/-p is required (unless using --cleanup-checkpoints)")
        return 1
    
    # Phase 3: Use comprehensive validation function
    validation_errors = validate_config_args(args)
    if validation_errors:
        for error in validation_errors:
            logger.error(f"Configuration error: {error}")
        return 1
        
    # Adjust poll sleep if outside adaptive range (with warning)
    if args.poll_sleep < args.min_poll_sleep or args.poll_sleep > args.max_poll_sleep:
        logger.warning(f"Poll sleep ({args.poll_sleep}) outside adaptive range [{args.min_poll_sleep}, {args.max_poll_sleep}], adjusting")
        args.poll_sleep = max(args.min_poll_sleep, min(args.max_poll_sleep, args.poll_sleep))
    
    # Set checkpoint file with custom prefix if provided
    global CHECKPOINT_FILE
    if args.checkpoint_prefix:
        CHECKPOINT_FILE = os.path.join(CHECKPOINT_DIR, f"certpatrol_checkpoints_{args.checkpoint_prefix}_{os.getpid()}.json")
    
    # Register cleanup function to remove checkpoint file on exit
    import atexit
    atexit.register(cleanup_checkpoint_file)
    
    # Compile regex pattern (already validated by validate_config_args)
    try:
        pattern = re.compile(args.pattern, re.IGNORECASE)
    except re.error as e:
        logger.error(f"Invalid regex pattern: {e}")
        return 1

    # Phase 2: Fetch current usable CT logs with session management
    try:
        with HTTPSessionManager() as session_manager:
            ct_logs = fetch_usable_ct_logs(verbose=args.verbose, session_manager=session_manager)
        if not ct_logs:
            logger.error("No usable CT logs found")
            return 1
    except Exception as e:
        logger.error(f"Failed to fetch CT logs: {e}")
        return 1

    # Use specified logs or default to all usable logs
    if args.logs is None:
        logs_to_use = list(ct_logs.keys())
    else:
        logs_to_use = args.logs
        # Validate log names
        invalid_logs = [name for name in logs_to_use if name not in ct_logs]
        if invalid_logs:
            logger.error(f"Unknown log(s): {', '.join(invalid_logs)}")
            logger.info(f"Available logs: {', '.join(sorted(ct_logs.keys()))}")
            return 1

    if args.verbose:
        logger.info(f"Tailing logs: {', '.join(logs_to_use)}")
        logger.info(f"Pattern: {args.pattern}")
        logger.info(f"Batch size: {args.batch}")
        logger.info(f"Initial poll sleep: {args.poll_sleep}s")
        logger.info(f"Adaptive polling range: {args.min_poll_sleep}s - {args.max_poll_sleep}s")
        logger.info(f"Memory limit: {args.max_memory_mb}MB")
        if args.checkpoint_prefix:
            logger.info(f"Checkpoint prefix: {args.checkpoint_prefix}")

    try:
        tail_logs(
            logs=logs_to_use,
            pattern=pattern,
            batch=args.batch,
            poll_sleep=args.poll_sleep,
            verbose=args.verbose,
            ct_logs=ct_logs,
            quiet_warnings=args.quiet_warnings,
            match_scope="etld1" if args.etld1 else "full",
            debug_all=args.debug_all,
            quiet_parse_errors=args.quiet_parse_errors,
            max_memory_mb=args.max_memory_mb,
            min_poll_sleep=args.min_poll_sleep,
            max_poll_sleep=args.max_poll_sleep,
        )
        
        # If we exit the tail_logs normally, it was due to graceful shutdown
        if shutdown_handler.should_shutdown():
            print("Graceful shutdown completed successfully", flush=True)
            return 0
    except KeyboardInterrupt:
        # This should rarely happen now due to signal handling, but handle it gracefully
        logger.info("Interrupted by user (KeyboardInterrupt)")
        # Make sure checkpoints are saved
        if hasattr(shutdown_handler, 'checkpoints') and shutdown_handler.checkpoints:
            try:
                save_checkpoints(shutdown_handler.checkpoints)
                logger.info("Final checkpoints saved")
            except CheckpointError as e:
                logger.error(f"Failed to save final checkpoints: {e}")
        return 0
    except CheckpointError as e:
        logger.error(f"Checkpoint error: {e}")
        return 1
    except CTLogError as e:
        logger.error(f"CT log error: {e}")
        return 1
    except MemoryError as e:
        logger.error(f"Memory error: {e}")
        return 1
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        if args.verbose:
            import traceback
            traceback.print_exc()
        return 1

if __name__ == "__main__":
    exit(main())
```