This is page 1 of 2. Use http://codebase.md/data-goblin/claude-goblin?page={x} to view the full context. # Directory Structure ``` ├── .gitignore ├── .python-version ├── CHANGELOG.md ├── docs │ ├── commands.md │ ├── images │ │ ├── dashboard.png │ │ ├── heatmap.png │ │ └── status-bar.png │ └── versions │ ├── 0.1.0.md │ ├── 0.1.1.md │ ├── 0.1.2.md │ └── 0.1.3.md ├── LICENSE ├── pyproject.toml ├── README.md └── src ├── __init__.py ├── aggregation │ ├── __init__.py │ ├── daily_stats.py │ └── usage_limits.py ├── cli.py ├── commands │ ├── __init__.py │ ├── delete_usage.py │ ├── export.py │ ├── help.py │ ├── limits.py │ ├── restore_backup.py │ ├── stats.py │ ├── status_bar.py │ ├── update_usage.py │ └── usage.py ├── config │ ├── __init__.py │ ├── settings.py │ └── user_config.py ├── data │ ├── __init__.py │ └── jsonl_parser.py ├── hooks │ ├── __init__.py │ ├── audio_tts.py │ ├── audio.py │ ├── manager.py │ ├── png.py │ ├── scripts │ │ └── audio_tts_hook.sh │ └── usage.py ├── models │ ├── __init__.py │ └── usage_record.py ├── storage │ ├── __init__.py │ └── snapshot_db.py ├── utils │ ├── __init__.py │ ├── _system.py │ └── text_analysis.py └── visualization ├── __init__.py ├── activity_graph.py ├── dashboard.py ├── export.py └── usage_bars.py ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[cod] *$py.class build/ dist/ wheels/ *.egg-info .eggs/ *.egg # Virtual environments .venv/ venv/ ENV/ env/ # IDE & AI tools .claude/ CLAUDE.md AGENTS.md .cursor/ .codex/ .gemini/ .vscode/ .idea/ *.swp *.swo *~ # Testing .pytest_cache/ .coverage htmlcov/ .tox/ .mypy_cache/ tests/ # OS .DS_Store Thumbs.db # Project-specific *.svg *.db *.db-journal main.py test_*.py RELEASING.md ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Claude Code Goblin     Python command line tool to help with Claude Code utilities and Claude Code usage analytics and long-term tracking. **Quick Start:** Install with `pip install claude-goblin` and use `ccg --help` for commands or `ccg usage` to start tracking. Below are some examples of outputs that this command line can give you. > [!NOTE] > Both `claude-goblin` and `ccg` work interchangeably as command aliases. ## Example outputs **TUI Dashboard:**  --- **MacOS status bar for usage limits:**  --- **GitHub activity-style heatmap of annual usage:**  --- > [!NOTE] > This tool was developed and tested on macOS (Python 3.13). Should work on Linux and Windows but is untested on those platforms. ## Features - Local snapshotting of Claude Code logs for analytics - Local snapshotting of usage limits from the Claude Code `/usage` command - Dashboard and stats of usage and limit history - Project anonymization for sharing screenshots (`--anon` flag) - Hook setup to automate data logging or analysis of Claude Code - Audio notifications for Claude Code completion, permission requests, and conversation compaction - Text-to-speech (TTS) notifications with customizable hook selection (macOS only) ## Installation ### From PyPI (recommended) ```bash # Install from PyPI pip install claude-goblin # Optional: Install export dependencies for PNG/SVG generation pip install "claude-goblin[export]" ``` ### From source ```bash # Clone the repository git clone https://github.com/data-goblin/claude-goblin.git cd claude-goblin # Install with pip pip install -e . # Optional: Install export dependencies pip install -e ".[export]" ``` ## First-Time Setup After installation, start tracking your Claude Code usage: ```bash # View your current usage dashboard ccg usage # (Optional) Enable automatic tracking with hooks ccg setup-hooks usage ``` **Note**: The `usage` command automatically saves your data to the historical database every time you run it. No manual setup required. ### Commands Explained - **`update-usage`**: Update historical database with latest data and fill in missing date gaps with empty records (use when you want continuous date coverage for the heatmap) For most users, just run `usage` regularly and it will handle data tracking automatically. Use `setup-hooks usage` to automate this completely. ## Commands | Command | Description | |---------|-------------| | **Dashboard & Analytics** | | | `ccg usage` | Show usage dashboard with KPI cards and breakdowns | | `ccg usage --live` | Auto-refresh dashboard every 5 seconds | | `ccg usage --fast` | Skip live limits for faster rendering | | `ccg usage --anon` | Anonymize project names (project-001, project-002, etc.) | | `ccg limits` | Show current usage limits (session, week, Opus) | | `ccg stats` | Show detailed statistics and cost analysis | | `ccg stats --fast` | Skip live limits for faster rendering | | `ccg status-bar [type]` | Launch macOS menu bar app (session\|weekly\|opus) | | **Export** | | | `ccg export` | Export yearly heatmap as PNG (default) | | `ccg export --svg` | Export as SVG image | | `ccg export --open` | Export and open the image | | `ccg export -y 2024` | Export specific year | | `ccg export -o output.png` | Specify output file path | | **Data Management** | | | `ccg update-usage` | Update historical database with latest data | | `ccg delete-usage --force` | Delete historical database (requires --force) | | `ccg restore-backup` | Restore from backup | | **Hooks (Advanced)** | | | `ccg setup-hooks usage` | Auto-track usage after each Claude response | | `ccg setup-hooks audio` | Play sounds for completion, permission & compaction | | `ccg setup-hooks audio-tts` | Speak notifications using TTS (macOS, multi-hook) | | `ccg setup-hooks png` | Auto-generate PNG after each response | | `ccg remove-hooks [type]` | Remove hooks (usage\|audio\|audio-tts\|png, or all) | ## Data Source Claude Goblin reads usage data from Claude Code's local session logs: ``` ~/.claude/projects/*.jsonl ``` **Important**: Claude Code retains session logs for approximately **30 days** (rolling window). There is no way to get other historical data without contacting Anthropic support. Claude Goblin solves this by: - Automatically saving data to an SQLite database (`~/.claude/usage/usage_history.db`) whenever you run `--usage` - Preserving historical data indefinitely - Merging current + historical data for complete analytics - Configuration to choose between saving detailed or aggregate data ## How It Works ```mermaid graph TD A[Claude Code] -->|writes| B[JSONL Files<br/>~/.claude/projects/*.jsonl] A -.->|triggers| H[Hooks] B --> ING{Ingestion<br/>--usage<br/>--update-usage} H -.->|automates| ING ING --> DB[(Database<br/>~/.claude/usage/usage_history.db)] DB --> CMD1{--usage} DB --> CMD2{--stats} DB --> CMD3{--export} CMD1 --> OUT1[TUI Dashboard] CMD2 --> OUT2[Summary Stats<br/>in Terminal] CMD3 --> OUT3[Annual Activity PNG] H -.->|automates| CMD3 style A fill:#e0e0e0,stroke:#333,color:#000 style B fill:#ff8800,stroke:#333,color:#000 style DB fill:#4a9eff,stroke:#333,color:#fff style OUT1 fill:#90ee90,stroke:#333,color:#000 style OUT2 fill:#90ee90,stroke:#333,color:#000 style OUT3 fill:#90ee90,stroke:#333,color:#000 style H fill:#ffeb3b,stroke:#333,color:#000 ``` **Key Points:** - **JSONL files** are raw logs with a 30-day rolling window (older data disappears) - **Ingestion** step reads JSONL and saves to DB (with automatic deduplication via `UNIQUE` constraint) - **Database** is the single source of truth - all display commands read from here only - **Hooks** can automate ingestion after each Claude response ### Command Behavior **`ccg usage`** (Display + Ingestion) 1. **Ingestion**: Reads JSONL files from `~/.claude/projects/*.jsonl` and saves to DB 2. **Display**: Reads data from DB and renders dashboard **`ccg export`** (Display only) 1. Reads data from DB at `~/.claude/usage/usage_history.db` 2. Generates yearly heatmap 3. Exports to current directory as `claude-usage-<timestamp>.png` (or specify with `-o`) **`ccg stats`** (Display + Ingestion) 1. **Ingestion**: Reads JSONL files from `~/.claude/projects/*.jsonl` and saves to DB 2. **Display**: Reads data from DB and displays comprehensive statistics **`ccg update-usage`** (Ingestion only) 1. Reads JSONL files from `~/.claude/projects/*.jsonl` 2. Saves to DB at `~/.claude/usage/usage_history.db` (with automatic deduplication) 3. Fills in missing dates with empty records (ensures continuous heatmap) ### File Locations | File | Location | Purpose | |------|----------|---------| | **JSONL logs** | `~/.claude/projects/*.jsonl` | Current 30-day usage data from Claude Code | | **SQLite DB** | `~/.claude/usage/usage_history.db` | Historical usage data preserved indefinitely | | **Default exports** | `~/.claude/usage/claude-usage-<timestamp>.png` | PNG/SVG heatmaps (default location unless `-o` is used) | | **Hook exports** | `~/.claude/usage/claude-usage.png` | Default location for PNG hook auto-updates | ## --usage TUI dashboard Example TUI:  ## --export Heatmap Export a GitHub-style yearly activity heatmap: ```bash ccg export --open ``` Example heatmap:  ### --export Formats - **PNG** (default): `ccg export` ## --status-bar (macOS only) Launch a menu bar app showing your Claude Code usage limits: ```bash # Show weekly usage (default) ccg status-bar weekly # Show session usage ccg status-bar session # Show Opus weekly usage ccg status-bar opus ``` The menu bar displays "CC: XX%" and clicking it shows all three limits (Session, Weekly, Opus) with reset times. **Running in background:** - Use `&` to run in background: `ccg status-bar weekly &` - Use `nohup` to persist after terminal closes: `nohup ccg status-bar weekly > /dev/null 2>&1 &` Example:  ## Hooks Claude Goblin can integrate with Claude Code's hook system to automate various tasks. Hooks trigger automatically based on Claude Code events. ### Available Hook Types #### Usage Hook Automatically tracks usage data after each Claude response: ```bash ccg setup-hooks usage ``` This adds a hook that runs `ccg update-usage --fast` after each Claude response, keeping your historical database up-to-date. #### Audio Hook Plays system sounds for three different events: ```bash ccg setup-hooks audio ``` You'll be prompted to select three sounds: 1. **Completion sound**: Plays when Claude finishes responding 2. **Permission sound**: Plays when Claude requests permission 3. **Compaction sound**: Plays before conversation compaction Supports macOS (10 built-in sounds), Windows, and Linux. #### Audio TTS Hook (macOS only) Speaks notifications aloud using macOS text-to-speech: ```bash ccg setup-hooks audio-tts ``` **Multi-hook selection** - Choose which events to speak: 1. Notification only (permission requests) - **[recommended]** 2. Stop only (when Claude finishes responding) 3. PreCompact only (before conversation compaction) 4. Notification + Stop 5. Notification + PreCompact 6. Stop + PreCompact 7. All three (Notification + Stop + PreCompact) You can also select from 7 different voices (Samantha, Alex, Daniel, Karen, Moira, Fred, Zarvox). **Example messages:** - Notification: Speaks the permission request message - Stop: "Claude finished responding" - PreCompact: "Auto compacting conversation" or "Manually compacting conversation" #### PNG Hook Auto-generates usage heatmap PNG after each Claude response: ```bash ccg setup-hooks png ``` Requires export dependencies: `pip install "claude-goblin[export]"` ### Removing Hooks ```bash # Remove specific hook type ccg remove-hooks usage ccg remove-hooks audio ccg remove-hooks audio-tts ccg remove-hooks png # Remove all Claude Goblin hooks ccg remove-hooks ``` ## Project Anonymization The `--anon` flag anonymizes project names when displaying usage data, perfect for sharing screenshots: ```bash ccg usage --anon ccg stats --anon ``` Projects are renamed to `project-001`, `project-002`, etc., ranked by total token usage (project-001 has the highest usage). ## Historical Data Claude Goblin automatically saves data every time you run `usage`. To manually manage: ```bash # View historical stats ccg stats # Update database with latest data and fill date gaps ccg update-usage # Delete all history ccg delete-usage -f ``` ## What It Tracks - **Tokens**: Input, output, cache creation, cache read (by model and project) - **Prompts**: User prompts and assistant responses - **Sessions**: Unique conversation threads - **Models**: Which Claude models you've used (Sonnet, Opus, Haiku) - **Projects**: Folders/directories where you've used Claude - **Time**: Daily activity patterns throughout the year - **Usage Limits**: Real-time session, weekly, and Opus limits It will also compute how much you would have had to pay if you used API pricing instead of a $200 Max plan. ## Technical Details ### Timezone Handling All timestamps in Claude Code's JSONL files seem to be stored in **UTC**. Claude Goblin should convert to your **local timezone** when grouping activity by date. This has only been tested with European CET. ### Cache Efficiency The token breakdown shows cache efficiency. High "Cache Read" percentages (80-90%+) mean Claude Code is effectively reusing context, which: - Speeds up responses - Can reduce costs on usage-based plans - Indicates good context management ## Requirements - Python >= 3.10 - Claude Code (for generating usage data) - Rich >= 13.7.0 (terminal UI) - rumps >= 0.4.0 (macOS menu bar app, macOS only) - Pillow + CairoSVG (optional, for PNG/SVG export) ## License MIT License - see LICENSE file for details ## Contributing Contributions welcome! Please: 1. Fork the repository 2. Create a feature branch 3. Submit a pull request I don't have much time but I'll review PRs when I can. ## Troubleshooting ### "No Claude Code data found" - Ensure Claude Code is installed and you've used it at least once - Check that `~/.claude/projects/` exists and contains `.jsonl` files ### Limits showing "Could not parse usage data" - Run `claude` in a trusted folder first - Claude needs folder trust to display usage limits ### Export fails - Install export dependencies: `pip install -e ".[export]"` - For PNG: requires Pillow and CairoSVG ### Database errors - Try deleting and recreating: `ccg delete-usage --force` - Then run: `ccg usage` to rebuild from current data ## **AI Tools Disclaimer**: This project was developed with assistance from Claude Code. ## Credits Built with: - [Rich](https://github.com/Textualize/rich) - Terminal UI framework - [Pillow](https://python-pillow.org/) - Image processing (optional) - [CairoSVG](https://cairosvg.org/) - SVG to PNG conversion (optional) ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/aggregation/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/config/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/data/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/models/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/visualization/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/hooks/__init__.py: -------------------------------------------------------------------------------- ```python # Hooks module ``` -------------------------------------------------------------------------------- /src/utils/__init__.py: -------------------------------------------------------------------------------- ```python # Utils module ``` -------------------------------------------------------------------------------- /src/commands/__init__.py: -------------------------------------------------------------------------------- ```python # Commands module ``` -------------------------------------------------------------------------------- /src/storage/__init__.py: -------------------------------------------------------------------------------- ```python """Storage layer for historical usage snapshots.""" ``` -------------------------------------------------------------------------------- /docs/versions/0.1.0.md: -------------------------------------------------------------------------------- ```markdown # Version 0.1.0 Initial release -- I'm too lazy to write all the initial release features and just using this to track feature adds. :) ``` -------------------------------------------------------------------------------- /docs/versions/0.1.1.md: -------------------------------------------------------------------------------- ```markdown # Version 0.1.1 ## Features Added - Added `--status-bar` command for macOS menu bar integration - Menu bar shows real-time usage percentages (session, weekly, or opus) - Auto-refresh every 5 minutes ## Improvements - Enhanced documentation with status bar examples - Added background execution instructions for menu bar app ``` -------------------------------------------------------------------------------- /docs/versions/0.1.2.md: -------------------------------------------------------------------------------- ```markdown # Version 0.1.2 ## Critical Bug Fixes - **Data Loss Fix**: Fixed critical bug in "full" storage mode where `INSERT OR REPLACE` was recalculating ALL daily_snapshots from current usage_records, causing data loss when JSONL files aged out (30-day window) - Now only updates dates that currently have records, preserving historical daily_snapshots forever ## Features Added - Migrated CLI from argparse to Typer for better command structure - New command syntax: `claude-goblin <command>` instead of `claude-goblin --<command>` - Old: `claude-goblin --usage` → New: `claude-goblin usage` - Old: `claude-goblin --stats` → New: `claude-goblin stats` - Old: `claude-goblin --export` → New: `claude-goblin export` - Updated hooks to use new command syntax automatically ## Improvements - Better command-line interface with clearer help messages - Improved documentation structure ``` -------------------------------------------------------------------------------- /src/config/settings.py: -------------------------------------------------------------------------------- ```python #region Imports from pathlib import Path from typing import Final #endregion #region Constants # Claude data directory CLAUDE_DATA_DIR: Final[Path] = Path.home() / ".claude" / "projects" # Default refresh interval for dashboard (seconds) DEFAULT_REFRESH_INTERVAL: Final[int] = 5 # Number of days to show in activity graph ACTIVITY_GRAPH_DAYS: Final[int] = 365 # Graph dimensions GRAPH_WEEKS: Final[int] = 52 # 52 weeks = 364 days (close to 365) GRAPH_DAYS_PER_WEEK: Final[int] = 7 #endregion #region Functions def get_claude_jsonl_files() -> list[Path]: """ Get all JSONL files from Claude's project data directory. Returns: List of Path objects pointing to JSONL files Raises: FileNotFoundError: If Claude data directory doesn't exist """ if not CLAUDE_DATA_DIR.exists(): raise FileNotFoundError( f"Claude data directory not found at {CLAUDE_DATA_DIR}. " "Make sure Claude Code has been run at least once." ) return list(CLAUDE_DATA_DIR.rglob("*.jsonl")) #endregion ``` -------------------------------------------------------------------------------- /src/hooks/scripts/audio_tts_hook.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Audio TTS Hook for Claude Code # Reads hook JSON from stdin and speaks it using macOS 'say' # Read JSON from stdin json_input=$(cat) # Extract the message content from the JSON # Try different fields depending on hook type message=$(echo "$json_input" | python3 -c " import sys import json try: data = json.load(sys.stdin) hook_type = data.get('hook_event_name', '') # Get appropriate message based on hook type if hook_type == 'Notification': msg = data.get('message', 'Claude requesting permission') elif hook_type == 'Stop': msg = 'Claude finished responding' elif hook_type == 'PreCompact': trigger = data.get('trigger', 'unknown') if trigger == 'auto': msg = 'Auto compacting conversation' else: msg = 'Manually compacting conversation' else: msg = data.get('message', 'Claude event') print(msg) except: print('Claude event') ") # Speak the message using macOS 'say' with selected voice (run in background to avoid blocking) echo "$message" | say -v Samantha & # Optional: Log for debugging # echo "$(date): TTS spoke: $message" >> ~/.claude/tts_hook.log ``` -------------------------------------------------------------------------------- /docs/versions/0.1.3.md: -------------------------------------------------------------------------------- ```markdown # Version 0.1.3 ## Features Added - **`--fast` flag** for `usage` command: Skip all updates and read from database only for faster rendering - **`--fast` flag** for `export` command: Skip all updates and export directly from database - Shows last update timestamp when using `--fast` mode ## Improvements - Default export location changed to `~/.claude/usage/` (same as hook exports) - Export only saves to current directory when `-o` flag is explicitly used - Fixed command syntax in all error messages and hints (removed old `--command` style references) - Updated help text to use new command syntax throughout ## Documentation - Created `/docs/commands.md` with comprehensive command reference - Added separate sections for commands, flags, and arguments with proper syntax notation - Created `/docs/versions/` directory to track version-specific features - Reorganized README commands section into a clear table format - Updated all file location documentation to reflect new default export path ## Bug Fixes - Fixed dashboard tip showing incorrect syntax: `claude-goblin --export --open` → `claude-goblin export --open` - Fixed all references to old command syntax in: - `help.py` - Updated all command examples - `delete_usage.py` - Fixed deletion confirmation message - `hooks/manager.py` - Fixed setup and removal hints - `hooks/usage.py` - Fixed restore backup hint ``` -------------------------------------------------------------------------------- /src/commands/delete_usage.py: -------------------------------------------------------------------------------- ```python #region Imports import sys from rich.console import Console from src.storage.snapshot_db import ( DEFAULT_DB_PATH, get_database_stats, ) #endregion #region Functions def run(console: Console) -> None: """ Delete all historical usage data from the database. Requires -f or --force flag to prevent accidental deletion. Args: console: Rich console for output Flags: -f or --force: Required flag to confirm deletion """ force = "-f" in sys.argv or "--force" in sys.argv if not force: console.print("[red]WARNING: This will delete ALL historical usage data![/red]") console.print("[yellow]To confirm deletion, use: ccg delete-usage --force[/yellow]") return db_path = DEFAULT_DB_PATH if not db_path.exists(): console.print("[yellow]No historical database found.[/yellow]") return try: # Show stats before deletion db_stats = get_database_stats() if db_stats["total_records"] > 0: console.print("[cyan]Current database:[/cyan]") console.print(f" Records: {db_stats['total_records']:,}") console.print(f" Days: {db_stats['total_days']}") console.print(f" Range: {db_stats['oldest_date']} to {db_stats['newest_date']}\n") # Delete the database file db_path.unlink() console.print("[green]✓ Successfully deleted historical usage database[/green]") console.print(f"[dim]Deleted: {db_path}[/dim]") except Exception as e: console.print(f"[red]Error deleting database: {e}[/red]") #endregion ``` -------------------------------------------------------------------------------- /src/visualization/usage_bars.py: -------------------------------------------------------------------------------- ```python #region Imports from datetime import datetime from rich.console import Console from src.aggregation.usage_limits import UsageLimits #endregion #region Functions def render_usage_limits(limits: UsageLimits, console: Console) -> None: """ Render usage limits as simple percentages with reset times. Displays: - Session: X% (resets at TIME) - Week: X% (resets on DATE) - Opus: X% (resets on DATE) [if applicable] Args: limits: UsageLimits object with usage data console: Rich console for output Common failure modes: - None values are handled gracefully - Percentages over 100% are shown as-is (no capping) """ console.print() # Session session_pct = limits.session_percentage reset_str = "" if limits.session_reset_time: local_time = limits.session_reset_time.astimezone() reset_str = local_time.strftime("%I:%M%p").lstrip('0') console.print(f"[bold cyan]Session:[/bold cyan] {session_pct:.0f}% [dim](resets {reset_str})[/dim]") # Week (all models) week_pct = limits.week_percentage week_reset_str = "" if limits.week_reset_time: local_time = limits.week_reset_time.astimezone() week_reset_str = local_time.strftime("%b %d").replace(' 0', ' ') console.print(f"[bold cyan]Week:[/bold cyan] {week_pct:.0f}% [dim](resets {week_reset_str})[/dim]") # Opus (only for Max plans) if limits.opus_limit > 0: opus_pct = limits.opus_percentage console.print(f"[bold cyan]Opus:[/bold cyan] {opus_pct:.0f}% [dim](resets {week_reset_str})[/dim]") console.print() #endregion ``` -------------------------------------------------------------------------------- /src/utils/_system.py: -------------------------------------------------------------------------------- ```python #region Imports import platform import subprocess from pathlib import Path from typing import Optional #endregion #region Functions def open_file(file_path: Path) -> None: """ Open a file with the default application (cross-platform). Args: file_path: Path to the file to open """ system = platform.system() try: if system == "Darwin": # macOS subprocess.run(["open", str(file_path)], check=False) elif system == "Windows": subprocess.run(["start", str(file_path)], shell=True, check=False) else: # Linux and others subprocess.run(["xdg-open", str(file_path)], check=False) except Exception: pass # Silently fail if opening doesn't work def get_sound_command(sound_name: str) -> Optional[str]: """ Get the command to play a sound (cross-platform). Args: sound_name: Name of the sound file (without extension) Returns: Command string to play the sound, or None if not supported """ system = platform.system() if system == "Darwin": # macOS return f"afplay /System/Library/Sounds/{sound_name}.aiff &" elif system == "Windows": # Windows Media Player command for playing system sounds return f'powershell -c "(New-Object Media.SoundPlayer \'C:\\Windows\\Media\\{sound_name}.wav\').PlaySync();" &' else: # Linux # Try to use paplay (PulseAudio) or aplay (ALSA) # Most Linux systems have one of these return f"(paplay /usr/share/sounds/freedesktop/stereo/{sound_name}.oga 2>/dev/null || aplay /usr/share/sounds/alsa/{sound_name}.wav 2>/dev/null) &" #endregion ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "claude-goblin" version = "0.1.5" description = "Python CLI for Claude Code utilities and usage tracking/analytics" readme = "README.md" requires-python = ">=3.10" license = {text = "MIT"} authors = [ {name = "Kurt Buhler"} ] keywords = ["claude", "claude-code", "usage", "analytics", "tui", "dashboard", "visualization"] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Utilities", ] dependencies = [ "rich>=13.7.0", "typer>=0.9.0", "rumps>=0.4.0; sys_platform == 'darwin'", ] [project.urls] Homepage = "https://github.com/data-goblin/claude-goblin" Repository = "https://github.com/data-goblin/claude-goblin" Issues = "https://github.com/data-goblin/claude-goblin/issues" [project.optional-dependencies] export = [ "pillow>=10.0.0", "cairosvg>=2.7.0", ] [project.scripts] claude-goblin = "src.cli:main" ccg = "src.cli:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src"] [dependency-groups] dev = [ "pytest>=7.4.0", "pytest-cov>=4.1.0", "pytest-asyncio>=0.23.0", "pytest-faker>=2.0.0", "mypy>=1.7.0", ] [tool.pytest.ini_options] testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] addopts = "-v --strict-markers" [tool.mypy] python_version = "3.10" strict = true warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true ``` -------------------------------------------------------------------------------- /src/hooks/png.py: -------------------------------------------------------------------------------- ```python #region Imports from pathlib import Path from rich.console import Console #endregion #region Functions def setup(console: Console, settings: dict, settings_path: Path) -> None: """ Set up the PNG auto-update hook. Args: console: Rich console for output settings: Settings dictionary to modify settings_path: Path to settings.json file """ # Ask for output path default_output = str(Path.home() / ".claude" / "usage" / "claude-usage.png") console.print("[bold cyan]Configure PNG auto-update:[/bold cyan]\n") console.print(f"[dim]Default output: {default_output}[/dim]") console.print("[dim]Enter custom path (or press Enter for default):[/dim] ", end="") try: user_input = input().strip() output_path = user_input if user_input else default_output except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return # Create directory if it doesn't exist output_dir = Path(output_path).parent output_dir.mkdir(parents=True, exist_ok=True) hook_command = f"ccg export -o {output_path} > /dev/null 2>&1 &" # Remove existing PNG hooks original_count = len(settings["hooks"]["Stop"]) settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not is_hook(hook) ] png_hook_removed = len(settings["hooks"]["Stop"]) < original_count # Add new hook settings["hooks"]["Stop"].append({ "matcher": "*", "hooks": [{ "type": "command", "command": hook_command }] }) if png_hook_removed: console.print("[cyan]Replaced existing PNG auto-update hook[/cyan]") console.print(f"[green]✓ Successfully configured PNG auto-update hook[/green]") console.print("\n[bold]What this does:[/bold]") console.print(" • Exports PNG after each Claude response completes") console.print(f" • Overwrites: {output_path}") console.print(" • Runs silently in the background") def is_hook(hook) -> bool: """ Check if a hook is a PNG export hook. Recognizes both old-style (--export) and new-style (export) commands. Args: hook: Hook configuration dictionary Returns: True if this is a PNG export hook, False otherwise """ if not isinstance(hook, dict) or "hooks" not in hook: return False for h in hook.get("hooks", []): cmd = h.get("command", "") # Support both old-style (--export) and new-style (export) # Also support both claude-goblin and ccg aliases if (("claude-goblin --export" in cmd or "claude-goblin export" in cmd or "ccg --export" in cmd or "ccg export" in cmd) and "-o" in cmd): return True return False #endregion ``` -------------------------------------------------------------------------------- /src/models/usage_record.py: -------------------------------------------------------------------------------- ```python #region Imports from dataclasses import dataclass from datetime import datetime from typing import Optional #endregion #region Data Classes @dataclass(frozen=True) class TokenUsage: """ Represents token usage for a single API call. Attributes: input_tokens: Number of input tokens output_tokens: Number of output tokens cache_creation_tokens: Number of tokens written to cache cache_read_tokens: Number of tokens read from cache """ input_tokens: int output_tokens: int cache_creation_tokens: int cache_read_tokens: int @property def total_tokens(self) -> int: """Calculate total tokens across all categories.""" return ( self.input_tokens + self.output_tokens + self.cache_creation_tokens + self.cache_read_tokens ) @dataclass(frozen=True) class UsageRecord: """ Represents a single usage event from Claude Code. Attributes: timestamp: When the event occurred session_id: UUID of the conversation session message_uuid: UUID of the specific message message_type: Type of message ('user' or 'assistant') model: Model name (e.g., 'claude-sonnet-4-5-20250929') folder: Project folder path git_branch: Current git branch (if available) version: Claude Code version token_usage: Token usage details (None for user messages) content: Message content text (for analysis) char_count: Character count of message content """ timestamp: datetime session_id: str message_uuid: str message_type: str model: Optional[str] folder: str git_branch: Optional[str] version: str token_usage: Optional[TokenUsage] content: Optional[str] = None char_count: int = 0 @property def date_key(self) -> str: """ Get date string in YYYY-MM-DD format for grouping. Converts UTC timestamp to local timezone before extracting date. This ensures activity is grouped by the user's local calendar day, not UTC days. For example, activity at 23:30 local time will be grouped into the correct local day, even though it may be a different UTC day. Returns: Date string in YYYY-MM-DD format (local timezone) """ local_timestamp = self.timestamp.astimezone() # Convert to local timezone return local_timestamp.strftime("%Y-%m-%d") @property def is_user_prompt(self) -> bool: """Check if this is a user prompt message.""" return self.message_type == "user" @property def is_assistant_response(self) -> bool: """Check if this is an assistant response message.""" return self.message_type == "assistant" #endregion ``` -------------------------------------------------------------------------------- /src/commands/restore_backup.py: -------------------------------------------------------------------------------- ```python #region Imports import os import shutil from datetime import datetime from rich.console import Console from src.storage.snapshot_db import ( DEFAULT_DB_PATH, get_database_stats, ) #endregion #region Functions def run(console: Console) -> None: """ Restore database from backup file. Restores the usage history database from a backup file (.db.bak). Creates a safety backup of the current database before restoring. Args: console: Rich console for output """ backup_path = DEFAULT_DB_PATH.parent / "usage_history.db.bak" if not backup_path.exists(): console.print("[yellow]No backup file found.[/yellow]") console.print(f"[dim]Expected location: {backup_path}[/dim]") return console.print("[bold cyan]Restore Database from Backup[/bold cyan]\n") console.print(f"[yellow]Backup file: {backup_path}[/yellow]") console.print(f"[yellow]This will replace: {DEFAULT_DB_PATH}[/yellow]") # Show backup file info backup_size = os.path.getsize(backup_path) backup_time = os.path.getmtime(backup_path) backup_date = datetime.fromtimestamp(backup_time).strftime("%Y-%m-%d %H:%M:%S") console.print(f"[dim]Backup size: {backup_size:,} bytes[/dim]") console.print(f"[dim]Backup date: {backup_date}[/dim]") console.print("") if DEFAULT_DB_PATH.exists(): console.print("[bold red]⚠️ WARNING: This will overwrite your current database![/bold red]") console.print("[yellow]Consider backing up your current database first.[/yellow]") console.print("") console.print("[cyan]Continue with restore? (yes/no):[/cyan] ", end="") try: confirm = input().strip().lower() if confirm not in ["yes", "y"]: console.print("[yellow]Restore cancelled[/yellow]") return except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Restore cancelled[/yellow]") return try: # Create a backup of current DB if it exists if DEFAULT_DB_PATH.exists(): current_backup = DEFAULT_DB_PATH.parent / "usage_history.db.before_restore" shutil.copy2(DEFAULT_DB_PATH, current_backup) console.print(f"[dim]Current database backed up to: {current_backup}[/dim]") # Restore from backup shutil.copy2(backup_path, DEFAULT_DB_PATH) console.print(f"[green]✓ Database restored from backup[/green]") console.print(f"[dim]Restored: {DEFAULT_DB_PATH}[/dim]") # Show restored stats db_stats = get_database_stats() if db_stats["total_records"] > 0: console.print("") console.print("[cyan]Restored database contains:[/cyan]") console.print(f" Records: {db_stats['total_records']:,}") console.print(f" Days: {db_stats['total_days']}") console.print(f" Range: {db_stats['oldest_date']} to {db_stats['newest_date']}") except Exception as e: console.print(f"[red]Error restoring backup: {e}[/red]") #endregion ``` -------------------------------------------------------------------------------- /src/commands/help.py: -------------------------------------------------------------------------------- ```python #region Imports from rich.console import Console #endregion #region Functions def run(console: Console) -> None: """ Display help message. Shows comprehensive usage information including: - Available commands and their flags - Key features of the tool - Data sources and storage locations - Recommended setup workflow Args: console: Rich console for output """ help_text = """ [bold cyan]Claude Goblin Usage Tracker[/bold cyan] Track and visualize your Claude Code usage with GitHub-style activity graphs. Automatically saves historical snapshots to preserve data beyond the 30-day rolling window. [bold]Usage:[/bold] ccg Show this help message ccg limits Show usage percentages (session, week, opus) ccg status-bar [type] Launch macOS menu bar app (session|weekly|opus) Defaults to weekly if type not specified ccg usage Show usage stats (single shot) ccg usage --live Show usage with auto-refresh ccg update-usage Update historical database with latest data ccg setup-hooks <type> Configure Claude Code hooks (usage|audio|png) ccg remove-hooks [type] Remove hooks (usage|audio|png, or all if not specified) ccg export Export heatmap as PNG image (default) Use --svg for SVG format Use --open to open after export Use -o FILE to specify output path Use --year YYYY to select year (default: current) ccg stats Show historical database statistics ccg restore-backup Restore database from backup (.db.bak file) ccg delete-usage -f Delete all historical data (requires --force) ccg help Show this help message [bold]Features:[/bold] • GitHub-style 365-day activity heatmap • Token usage breakdown (input, output, cache) • Session and prompt counts • Model and project folder breakdowns • Live auto-refresh dashboard • Automatic historical data preservation • Claude Code hooks integration for real-time tracking [bold]Data Sources:[/bold] Current (30 days): ~/.claude/projects/*.jsonl Historical: ~/.claude/usage/usage_history.db [bold]Recommended Setup:[/bold] 1. Run: ccg usage (View your dashboard and save initial snapshot) 2. Optional: ccg setup-hooks usage (Configure automatic tracking after each Claude response) 3. Optional: ccg setup-hooks audio (Play sound when Claude is ready for input) [bold]Exit:[/bold] Press Ctrl+C to exit [bold]Note:[/bold] Claude Code keeps a rolling 30-day window of logs. This tool automatically snapshots your data each time you run it, building a complete history over time. With hooks enabled, tracking happens automatically in the background. """ console.print(help_text) #endregion ``` -------------------------------------------------------------------------------- /src/config/user_config.py: -------------------------------------------------------------------------------- ```python #region Imports import json from pathlib import Path from typing import Optional #endregion #region Constants CONFIG_PATH = Path.home() / ".claude" / "goblin_config.json" #endregion #region Functions def load_config() -> dict: """ Load user configuration from disk. Returns: Configuration dictionary with user preferences """ if not CONFIG_PATH.exists(): return get_default_config() try: with open(CONFIG_PATH, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): return get_default_config() def save_config(config: dict) -> None: """ Save user configuration to disk. Args: config: Configuration dictionary to save Raises: IOError: If config cannot be written """ CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) with open(CONFIG_PATH, "w") as f: json.dump(config, f, indent=2) def get_default_config() -> dict: """ Get default configuration values. Returns: Default configuration dictionary """ return { "storage_mode": "aggregate", # "aggregate" or "full" "plan_type": "max_20x", # "pro", "max_5x", or "max_20x" "tracking_mode": "both", # "both", "tokens", or "limits" "version": "1.0" } def get_storage_mode() -> str: """ Get the current storage mode setting. Returns: Either "aggregate" or "full" """ config = load_config() return config.get("storage_mode", "aggregate") def set_storage_mode(mode: str) -> None: """ Set the storage mode. Args: mode: Either "aggregate" or "full" Raises: ValueError: If mode is not valid """ if mode not in ["aggregate", "full"]: raise ValueError(f"Invalid storage mode: {mode}. Must be 'aggregate' or 'full'") config = load_config() config["storage_mode"] = mode save_config(config) def get_plan_type() -> str: """ Get the current Claude Code plan type. Returns: One of "pro", "max_5x", or "max_20x" """ config = load_config() return config.get("plan_type", "max_20x") def set_plan_type(plan: str) -> None: """ Set the Claude Code plan type. Args: plan: One of "pro", "max_5x", or "max_20x" Raises: ValueError: If plan is not valid """ if plan not in ["pro", "max_5x", "max_20x"]: raise ValueError(f"Invalid plan type: {plan}. Must be 'pro', 'max_5x', or 'max_20x'") config = load_config() config["plan_type"] = plan save_config(config) def get_tracking_mode() -> str: """ Get the current tracking mode setting. Returns: One of "both", "tokens", or "limits" """ config = load_config() return config.get("tracking_mode", "both") def set_tracking_mode(mode: str) -> None: """ Set the tracking mode for data capture and visualization. Args: mode: One of "both", "tokens", or "limits" Raises: ValueError: If mode is not valid """ if mode not in ["both", "tokens", "limits"]: raise ValueError(f"Invalid tracking mode: {mode}. Must be 'both', 'tokens', or 'limits'") config = load_config() config["tracking_mode"] = mode save_config(config) #endregion ``` -------------------------------------------------------------------------------- /src/utils/text_analysis.py: -------------------------------------------------------------------------------- ```python #region Imports import re from typing import Optional #endregion #region Constants # Swear word patterns (comprehensive list with common misspellings) SWEAR_PATTERNS = [ # F-word variations r'\bf[u\*]c?k+(?:ing|ed|er|s)?\b', r'\bf+[aeiou]*c?k+\b', r'\bfck(?:ing|ed|er|s)?\b', r'\bfuk(?:ing|ed|er|s)?\b', r'\bphuck(?:ing|ed|er|s)?\b', # S-word variations r'\bsh[i\*]t+(?:ty|ting|ted|s)?\b', r'\bsht(?:ty|ting|ted|s)?\b', r'\bshyt(?:ty|ting|ted|s)?\b', r'\bcr[a\*]p+(?:py|ping|ped|s)?\b', # A-word variations r'\bass+h[o\*]le?s?\b', r'\ba+rse+(?:hole)?s?\b', # D-word variations r'\bd[a\*]mn+(?:ed|ing|s)?\b', r'\bd[a\*]m+(?:ed|ing|s)?\b', # B-word variations r'\bb[i\*]tch+(?:ing|ed|es|y)?\b', r'\bbstard+s?\b', # Other common variations r'\bhell+\b', r'\bpiss+(?:ed|ing|es)?\b', r'\bc[o\*]ck+(?:s)?\b', r'\bd[i\*]ck+(?:s|head)?\b', r'\btw[a\*]t+s?\b', ] # Specific phrase patterns PERFECT_PATTERNS = [ r'\bperfect!', r'\bperfect\.', r'\bexcellent!', r'\bexcellent\.', ] ABSOLUTELY_RIGHT_PATTERNS = [ r"\byou'?re?\s+absolutely\s+right\b", r"\byou\s+are\s+absolutely\s+right\b", ] # Politeness patterns THANK_PATTERNS = [ r'\bthank+(?:s|you|u)?\b', r'\bthn?x\b', r'\bty\b', r'\bthanku\b', r'\bthnk+s?\b', ] PLEASE_PATTERNS = [ r'\bplease\b', r'\bpl[sz]e?\b', r'\bples[ae]?\b', r'\bpls\b', ] #endregion #region Functions def count_swears(text: Optional[str]) -> int: """ Count swear words in text using comprehensive pattern matching. Args: text: Text to analyze Returns: Count of swear words found Reasons for failure: - None (returns 0 if text is None/empty) """ if not text: return 0 text_lower = text.lower() count = 0 for pattern in SWEAR_PATTERNS: matches = re.findall(pattern, text_lower) count += len(matches) return count def count_perfect_phrases(text: Optional[str]) -> int: """ Count instances of "Perfect!" in text. Args: text: Text to analyze Returns: Count of "Perfect!" phrases found """ if not text: return 0 text_lower = text.lower() count = 0 for pattern in PERFECT_PATTERNS: matches = re.findall(pattern, text_lower) count += len(matches) return count def count_absolutely_right_phrases(text: Optional[str]) -> int: """ Count instances of "You're absolutely right!" in text. Args: text: Text to analyze Returns: Count of "You're absolutely right!" phrases found """ if not text: return 0 text_lower = text.lower() count = 0 for pattern in ABSOLUTELY_RIGHT_PATTERNS: matches = re.findall(pattern, text_lower) count += len(matches) return count def count_thank_phrases(text: Optional[str]) -> int: """ Count instances of "thank you" and variations in text. Args: text: Text to analyze Returns: Count of thank you phrases found """ if not text: return 0 text_lower = text.lower() count = 0 for pattern in THANK_PATTERNS: matches = re.findall(pattern, text_lower) count += len(matches) return count def count_please_phrases(text: Optional[str]) -> int: """ Count instances of "please" and variations in text. Args: text: Text to analyze Returns: Count of please phrases found """ if not text: return 0 text_lower = text.lower() count = 0 for pattern in PLEASE_PATTERNS: matches = re.findall(pattern, text_lower) count += len(matches) return count def get_character_count(text: Optional[str]) -> int: """ Get character count of text. Args: text: Text to analyze Returns: Number of characters """ if not text: return 0 return len(text) #endregion ``` -------------------------------------------------------------------------------- /src/commands/update_usage.py: -------------------------------------------------------------------------------- ```python #region Imports from datetime import datetime, timedelta import sqlite3 from rich.console import Console from src.commands.limits import capture_limits from src.config.settings import get_claude_jsonl_files from src.config.user_config import get_storage_mode, get_tracking_mode from src.data.jsonl_parser import parse_all_jsonl_files from src.storage.snapshot_db import ( DEFAULT_DB_PATH, get_database_stats, init_database, save_limits_snapshot, save_snapshot, ) #endregion #region Functions def run(console: Console) -> None: """ Update usage database and fill in gaps with empty records. This command: 1. Saves current usage data from JSONL files 2. Fills in missing days with zero-usage records 3. Ensures complete date coverage from earliest record to today Args: console: Rich console for output """ try: tracking_mode = get_tracking_mode() # Save current snapshot (tokens) if tracking_mode in ["both", "tokens"]: jsonl_files = get_claude_jsonl_files() if jsonl_files: records = parse_all_jsonl_files(jsonl_files) if records: saved_count = save_snapshot(records, storage_mode=get_storage_mode()) console.print(f"[green]Saved {saved_count} new token records[/green]") # Capture and save limits if tracking_mode in ["both", "limits"]: limits = capture_limits() if limits and "error" not in limits: save_limits_snapshot( session_pct=limits["session_pct"], week_pct=limits["week_pct"], opus_pct=limits["opus_pct"], session_reset=limits["session_reset"], week_reset=limits["week_reset"], opus_reset=limits["opus_reset"], ) console.print(f"[green]Saved limits snapshot (Session: {limits['session_pct']}%, Week: {limits['week_pct']}%, Opus: {limits['opus_pct']}%)[/green]") # Get database stats to determine date range db_stats = get_database_stats() if db_stats["total_records"] == 0: console.print("[yellow]No data to process.[/yellow]") return # Fill in gaps from oldest date to today init_database() conn = sqlite3.connect(DEFAULT_DB_PATH) try: cursor = conn.cursor() # Get all dates that have data cursor.execute("SELECT DISTINCT date FROM usage_records ORDER BY date") existing_dates = {row[0] for row in cursor.fetchall()} # Generate complete date range start_date = datetime.strptime(db_stats["oldest_date"], "%Y-%m-%d").date() end_date = datetime.now().date() current_date = start_date filled_count = 0 while current_date <= end_date: date_str = current_date.strftime("%Y-%m-%d") if date_str not in existing_dates: # Insert empty daily snapshot for this date cursor.execute(""" INSERT OR IGNORE INTO daily_snapshots ( date, total_prompts, total_responses, total_sessions, total_tokens, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, snapshot_timestamp ) VALUES (?, 0, 0, 0, 0, 0, 0, 0, 0, ?) """, (date_str, datetime.now().isoformat())) filled_count += 1 current_date += timedelta(days=1) conn.commit() if filled_count > 0: console.print(f"[cyan]Filled {filled_count} empty days[/cyan]") # Show updated stats db_stats = get_database_stats() console.print( f"[green]Complete! Coverage: {db_stats['oldest_date']} to {db_stats['newest_date']}[/green]" ) finally: conn.close() except Exception as e: console.print(f"[red]Error updating usage: {e}[/red]") import traceback traceback.print_exc() #endregion ``` -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown # Changelog All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [0.1.5] - 2025-10-13 ### Added - Added `--fast` flag to `stats` command for faster rendering (skips all updates, reads from database) ### Fixed - Fixed missing limits updates in `stats` command - now automatically saves limits to database like other commands ## [0.1.4] - 2025-10-12 ### Added - Added `--anon` flag to `usage` command to anonymize project names (displays as project-001, project-002, etc., ranked by token usage) - Added `PreCompact` hook support for audio notifications (plays sound before conversation compaction) - Added multi-hook selection for `audio-tts` setup (choose between Notification, Stop, PreCompact, or combinations) - Audio hook now supports three sounds: completion, permission requests, and conversation compaction ### Changed - `audio-tts` hook now supports configurable hook types (Notification only by default, with 7 selection options) - Audio hook setup now prompts for three sounds instead of two (added compaction sound) - TTS hook script intelligently handles different hook types with appropriate messages - Enhanced hook removal to properly clean up PreCompact hooks ### Fixed - Fixed `AttributeError` in `--anon` flag where `total_tokens` was accessed incorrectly on UsageRecord objects ## [0.1.3] - 2025-10-12 ### Fixed - Fixed audio `Notification` hook format to properly trigger on permission requests (removed incorrect `matcher` field) - Fixed missing limits data in heatmap exports - `usage` command now automatically saves limits to database - Fixed double `claude` command execution - dashboard now uses cached limits from database instead of fetching live ### Changed - Improved status messages to show three distinct steps: "Updating usage data", "Updating usage limits", "Preparing dashboard" - Dashboard now displays limits from database after initial fetch, eliminating redundant API calls ### Added - Added `get_latest_limits()` function to retrieve most recent limits from database - Added `--fast` flag to `usage` command for faster dashboard rendering (skips all updates, reads directly from database) - Added `--fast` flag to `export` command for faster exports (skips all updates, reads directly from database) - Added database existence check for `--fast` mode with helpful error message - Added timestamp warning when using `--fast` mode showing last database update date ## [0.1.2] - 2025-10-11 ### Added - Enhanced audio hook to support both `Stop` and `Notification` hooks - Completion sound: Plays when Claude finishes responding (`Stop` hook) - Permission sound: Plays when Claude requests permission (`Notification` hook) - User now selects two different sounds during `setup-hooks audio` for better distinction - Expanded macOS sound library from 5 to 10 sounds ### Changed - Updated `claude-goblin setup-hooks audio` to prompt for two sounds instead of one - Audio hook removal now cleans up both `Stop` and `Notification` hooks - Updated documentation to reflect dual audio notification capability ### Fixed - Fixed `NameError: name 'fast' is not defined` in usage command when `--fast` flag was used ## [0.1.1] - 2025-10-11 ### Fixed - **CRITICAL**: Fixed data loss bug in "full" storage mode where `daily_snapshots` were being recalculated from scratch, causing historical data to be lost when JSONL files aged out (30-day window) - Now only updates `daily_snapshots` for dates that currently have records, preserving all historical data forever ### Changed - Migrated CLI from manual `sys.argv` parsing to `typer` for better UX and automatic help generation - Updated command syntax: `claude-goblin <command>` instead of `claude-goblin --<command>` - Old: `claude-goblin --usage` → New: `claude-goblin usage` - Old: `claude-goblin --stats` → New: `claude-goblin stats` - Old: `claude-goblin --export` → New: `claude-goblin export` - All other commands follow the same pattern - Updated hooks to use new command syntax (`claude-goblin update-usage` instead of `claude-goblin --update-usage`) - Improved help messages with examples and better descriptions ### Added - Added `typer>=0.9.0` as a dependency for CLI framework - Added backward compatibility in hooks to recognize both old and new command syntax ## [0.1.0] - 2025-10-10 ### Added - Initial release - Usage tracking and analytics for Claude Code - GitHub-style activity heatmap visualization - TUI dashboard with real-time stats - Cost analysis and API pricing comparison - Export functionality (PNG/SVG) - Hook integration for automatic tracking - macOS menu bar app for usage monitoring - Support for both "aggregate" and "full" storage modes - Historical database preservation (SQLite) - Text analysis (politeness markers, phrase counting) - Model and project breakdown statistics ``` -------------------------------------------------------------------------------- /docs/commands.md: -------------------------------------------------------------------------------- ```markdown # Commands Reference Complete reference for all `claude-goblin` commands. ## Commands ### Dashboard & Analytics #### `claude-goblin usage` Show usage dashboard with KPI cards and breakdowns. Displays: - Total tokens, prompts, and sessions - Current usage limits (session, weekly, Opus) - Token breakdown by model - Token breakdown by project #### `claude-goblin limits` Show current usage limits (session, week, Opus). Displays current usage percentages and reset times for all three limit types. **Note:** Must be run from a trusted folder where Claude Code has been used. #### `claude-goblin stats` Show detailed statistics and cost analysis. Displays: - Summary: total tokens, prompts, responses, sessions, days tracked - Cost analysis: estimated API costs vs Max Plan costs - Averages: tokens per session/response, cost per session/response - Text analysis: prompt length, politeness markers, phrase counts - Usage by model: token distribution across different models #### `claude-goblin status-bar <type>` Launch macOS menu bar app (macOS only). Shows "CC: XX%" in your menu bar with auto-refresh every 5 minutes. **Arguments:** - `type` - Type of limit to display: `session`, `weekly`, or `opus` (default: `weekly`) ### Export #### `claude-goblin export` Export yearly heatmap as PNG or SVG. Generates a GitHub-style activity heatmap showing Claude Code usage throughout the year. ### Data Management #### `claude-goblin update-usage` Update historical database with latest data. This command: 1. Saves current usage data from JSONL files 2. Fills in missing days with zero-usage records 3. Ensures complete date coverage from earliest record to today Useful for ensuring continuous heatmap data without gaps. #### `claude-goblin delete-usage` Delete historical usage database. **WARNING:** This will permanently delete all historical usage data! A backup is automatically created before deletion. #### `claude-goblin restore-backup` Restore database from backup file. Restores the usage history database from `~/.claude/usage/usage_history.db.bak`. Creates a safety backup of the current database before restoring. ### Hooks (Advanced) #### `claude-goblin setup-hooks <type>` Setup Claude Code hooks for automation. **Arguments:** - `type` - Hook type to setup: `usage`, `audio`, or `png` Hook types: - `usage` - Auto-track usage after each Claude response - `audio` - Play sounds for completion and permission requests - `png` - Auto-update usage PNG after each Claude response #### `claude-goblin remove-hooks [type]` Remove Claude Code hooks configured by this tool. **Arguments:** - `type` (optional) - Hook type to remove: `usage`, `audio`, `png`, or omit to remove all ## Flags & Arguments ### Global Flags None currently available. ### Command-Specific Flags #### `usage` command - `--live` - Auto-refresh dashboard every 5 seconds - `--fast` - Skip live limits for faster rendering #### `export` command - `--svg` - Export as SVG instead of PNG - `--open` - Open file after export - `-y, --year <YYYY>` - Filter by year (default: current year) - `-o, --output <path>` - Output file path #### `delete-usage` command - `-f, --force` - Force deletion without confirmation (required) #### `status-bar` command Arguments: - `<type>` - Limit type: `session`, `weekly`, or `opus` (default: `weekly`) #### `setup-hooks` command Arguments: - `<type>` - Hook type: `usage`, `audio`, or `png` (required) #### `remove-hooks` command Arguments: - `[type]` - Hook type to remove: `usage`, `audio`, `png`, or omit for all (optional) ## Examples ```bash # View dashboard claude-goblin usage # View dashboard with auto-refresh claude-goblin usage --live # Export current year as PNG and open it claude-goblin export --open # Export specific year claude-goblin export -y 2024 # Export as SVG to specific path claude-goblin export --svg -o ~/reports/usage.svg # Show current limits claude-goblin limits # Launch menu bar with weekly usage claude-goblin status-bar weekly # Setup automatic usage tracking claude-goblin setup-hooks usage # Setup audio notifications claude-goblin setup-hooks audio # Remove all hooks claude-goblin remove-hooks # Remove only audio hooks claude-goblin remove-hooks audio # Delete all historical data (with confirmation) claude-goblin delete-usage --force ``` ## File Locations | File | Location | Purpose | |------|----------|---------| | **JSONL logs** | `~/.claude/projects/*.jsonl` | Current 30-day usage data from Claude Code | | **SQLite DB** | `~/.claude/usage/usage_history.db` | Historical usage data preserved indefinitely | | **DB Backup** | `~/.claude/usage/usage_history.db.bak` | Automatic backup created before destructive operations | | **Default exports** | `~/.claude/usage/claude-usage-<timestamp>.png` | PNG/SVG heatmaps (default location unless `-o` is used) | | **Hook exports** | `~/.claude/usage/claude-usage.png` | Default location for PNG hook auto-updates | | **Settings** | `~/.claude/settings.json` | Claude Code settings including hooks configuration | ``` -------------------------------------------------------------------------------- /src/data/jsonl_parser.py: -------------------------------------------------------------------------------- ```python #region Imports import json from datetime import datetime from pathlib import Path from typing import Iterator, Optional from src.models.usage_record import TokenUsage, UsageRecord #endregion #region Functions def parse_jsonl_file(file_path: Path) -> Iterator[UsageRecord]: """ Parse a single JSONL file and yield UsageRecord objects. Extracts usage data from Claude Code session logs, including: - Token usage (input, output, cache creation, cache read) - Session metadata (model, folder, version, branch) - Timestamps and identifiers Args: file_path: Path to the JSONL file to parse Yields: UsageRecord objects for each assistant message with usage data Raises: FileNotFoundError: If the file doesn't exist json.JSONDecodeError: If the file contains invalid JSON """ if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") with open(file_path, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, start=1): line = line.strip() if not line: continue try: data = json.loads(line) record = _parse_record(data) if record: yield record except json.JSONDecodeError as e: # Skip malformed lines but continue processing print(f"Warning: Skipping malformed JSON at {file_path}:{line_num}: {e}") continue def parse_all_jsonl_files(file_paths: list[Path]) -> list[UsageRecord]: """ Parse multiple JSONL files and return all usage records. Args: file_paths: List of paths to JSONL files Returns: List of all UsageRecord objects found across all files Raises: ValueError: If file_paths is empty """ if not file_paths: raise ValueError("No JSONL files provided to parse") records: list[UsageRecord] = [] for file_path in file_paths: try: records.extend(parse_jsonl_file(file_path)) except FileNotFoundError: print(f"Warning: File not found, skipping: {file_path}") except Exception as e: print(f"Warning: Error parsing {file_path}: {e}") return records def _parse_record(data: dict) -> Optional[UsageRecord]: """ Parse a single JSON record into a UsageRecord. Processes both user prompts and assistant responses. Skips system events and other message types. Args: data: Parsed JSON object from JSONL line Returns: UsageRecord for user or assistant messages, None otherwise """ message_type = data.get("type") # Only process user and assistant messages if message_type not in ("user", "assistant"): return None # Parse timestamp timestamp_str = data.get("timestamp") if not timestamp_str: return None timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) # Extract metadata (common to both user and assistant) session_id = data.get("sessionId", "unknown") message_uuid = data.get("uuid", "unknown") folder = data.get("cwd", "unknown") git_branch = data.get("gitBranch") version = data.get("version", "unknown") # Extract message data message = data.get("message", {}) model = message.get("model") # Filter out synthetic models (test/internal artifacts) if model == "<synthetic>": return None # Extract content for analysis content = None char_count = 0 if isinstance(message.get("content"), str): content = message["content"] char_count = len(content) elif isinstance(message.get("content"), list): # Handle content blocks (concatenate text) text_parts = [] for block in message["content"]: if isinstance(block, dict) and block.get("type") == "text": text_parts.append(block.get("text", "")) content = "\n".join(text_parts) if text_parts else None char_count = len(content) if content else 0 # Extract token usage (only available for assistant messages) token_usage = None if message_type == "assistant": usage_data = message.get("usage") if usage_data: cache_creation = usage_data.get("cache_creation", {}) cache_creation_tokens = ( cache_creation.get("cache_creation_input_tokens", 0) + cache_creation.get("ephemeral_5m_input_tokens", 0) + cache_creation.get("ephemeral_1h_input_tokens", 0) ) token_usage = TokenUsage( input_tokens=usage_data.get("input_tokens", 0), output_tokens=usage_data.get("output_tokens", 0), cache_creation_tokens=cache_creation_tokens, cache_read_tokens=usage_data.get("cache_read_input_tokens", 0), ) return UsageRecord( timestamp=timestamp, session_id=session_id, message_uuid=message_uuid, message_type=message_type, model=model, folder=folder, git_branch=git_branch, version=version, token_usage=token_usage, content=content, char_count=char_count, ) #endregion ``` -------------------------------------------------------------------------------- /src/commands/status_bar.py: -------------------------------------------------------------------------------- ```python #region Imports import sys import time import re from typing import Literal from rich.console import Console try: import rumps except ImportError: rumps = None #endregion #region Functions def _strip_timezone(reset_time: str) -> str: """ Remove timezone information from reset time string. Converts "in 2 hours (PST)" to "in 2 hours" Converts "Monday at 9:00 AM PST" to "Monday at 9:00 AM" Args: reset_time: Reset time string with optional timezone Returns: Reset time without timezone info """ # Remove timezone in parentheses: "(PST)", "(UTC)", etc. result = re.sub(r'\s*\([A-Z]{2,5}\)', '', reset_time) # Remove trailing timezone abbreviations: "PST", "UTC", etc. result = re.sub(r'\s+[A-Z]{2,5}$', '', result) return result.strip() def run(console: Console, limit_type: Literal["session", "weekly", "opus"]) -> None: """ Launch macOS menu bar app showing Claude Code usage percentage. Displays "CC: XX%" in the menu bar, updating every 5 minutes. The percentage shown depends on the limit_type argument: - session: Current session usage - weekly: Current week (all models) usage - opus: Current week (Opus only) usage Args: console: Rich console for output limit_type: Type of limit to display ("session", "weekly", or "opus") Raises: SystemExit: If not running on macOS or rumps is not available """ # Check platform if sys.platform != 'darwin': console.print("[red]Error: --status-bar is only available on macOS[/red]") sys.exit(1) # Check if rumps is available if rumps is None: console.print("[red]Error: rumps library not installed[/red]") console.print("[yellow]Install with: uv pip install rumps[/yellow]") sys.exit(1) # Import the capture function from limits from src.commands.limits import capture_limits class ClaudeStatusApp(rumps.App): """ macOS menu bar app for displaying Claude Code usage. Shows usage percentage in menu bar with format "CC: XX%" Updates every 5 minutes automatically. """ def __init__(self, limit_type: str): super(ClaudeStatusApp, self).__init__("CC: --", quit_button="Quit") self.limit_type = limit_type self.update_interval = 300 # 5 minutes in seconds # Set up menu items - will be populated in update_usage self.menu_refresh = rumps.MenuItem("Refresh Now", callback=self.manual_refresh) self.menu_session = rumps.MenuItem("Loading...") self.menu_weekly = rumps.MenuItem("Loading...") self.menu_opus = rumps.MenuItem("Loading...") self.menu.add(self.menu_refresh) self.menu.add(rumps.separator) self.menu.add(self.menu_session) self.menu.add(self.menu_weekly) self.menu.add(self.menu_opus) # Initial update self.update_usage() @rumps.timer(300) # Update every 5 minutes def update_usage(self, _: rumps.Timer | None = None) -> None: """ Update the menu bar display with current usage. Fetches latest usage data from Claude and updates the menu bar title. Called automatically every 5 minutes and on manual refresh. Args: _: Timer object (unused, required by rumps.timer decorator) """ limits = capture_limits() if limits is None: self.title = "CC: ??" self.menu_session.title = "Error: Could not fetch usage data" self.menu_weekly.title = "" self.menu_opus.title = "" return # Check for trust prompt error if "error" in limits: self.title = "CC: ??" self.menu_session.title = "Error: " + limits.get("message", "Unknown error") self.menu_weekly.title = "" self.menu_opus.title = "" return # Extract all three percentages and reset times session_pct = limits.get("session_pct", 0) week_pct = limits.get("week_pct", 0) opus_pct = limits.get("opus_pct", 0) session_reset = _strip_timezone(limits.get("session_reset", "Unknown")) week_reset = _strip_timezone(limits.get("week_reset", "Unknown")) opus_reset = _strip_timezone(limits.get("opus_reset", "Unknown")) # Update menu bar title based on selected limit type if self.limit_type == "session": pct = session_pct elif self.limit_type == "weekly": pct = week_pct elif self.limit_type == "opus": pct = opus_pct else: self.title = "CC: ??" self.menu_session.title = f"Error: Invalid limit type '{self.limit_type}'" self.menu_weekly.title = "" self.menu_opus.title = "" return # Update menu bar title self.title = f"CC: {pct}%" # Update all three menu items to show all limits self.menu_session.title = f"Session: {session_pct}% (resets {session_reset})" self.menu_weekly.title = f"Weekly: {week_pct}% (resets {week_reset})" self.menu_opus.title = f"Opus: {opus_pct}% (resets {opus_reset})" def manual_refresh(self, _: rumps.MenuItem) -> None: """ Handle manual refresh request from menu. Args: _: Menu item that triggered the callback (unused) """ self.update_usage() # Launch the app console.print(f"[green]Launching status bar app (showing {limit_type} usage)...[/green]") console.print("[dim]The app will appear in your menu bar as 'CC: XX%'[/dim]") console.print("[dim]Press Ctrl+C or select 'Quit' from the menu to stop[/dim]") app = ClaudeStatusApp(limit_type) app.run() #endregion ``` -------------------------------------------------------------------------------- /src/aggregation/daily_stats.py: -------------------------------------------------------------------------------- ```python #region Imports from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta from typing import DefaultDict from src.models.usage_record import UsageRecord #endregion #region Data Classes @dataclass class DailyStats: """ Aggregated statistics for a single day. Attributes: date: Date in YYYY-MM-DD format total_prompts: Number of user prompts (user messages) total_responses: Number of assistant responses (assistant messages) total_sessions: Number of unique sessions total_tokens: Total token count across all categories input_tokens: Total input tokens output_tokens: Total output tokens cache_creation_tokens: Total cache creation tokens cache_read_tokens: Total cache read tokens models: Set of unique model names used folders: Set of unique project folders """ date: str total_prompts: int total_responses: int total_sessions: int total_tokens: int input_tokens: int output_tokens: int cache_creation_tokens: int cache_read_tokens: int models: set[str] folders: set[str] @dataclass class AggregatedStats: """ Complete statistics across all time periods. Attributes: daily_stats: Dictionary mapping date strings to DailyStats overall_totals: DailyStats object with totals across all dates """ daily_stats: dict[str, DailyStats] overall_totals: DailyStats #endregion #region Functions def aggregate_by_day(records: list[UsageRecord]) -> dict[str, DailyStats]: """ Aggregate usage records by day. Groups records by date and calculates totals for each metric. Args: records: List of usage records to aggregate Returns: Dictionary mapping date strings (YYYY-MM-DD) to DailyStats objects Raises: ValueError: If records list is empty """ if not records: return {} # Group records by date daily_data: DefaultDict[str, list[UsageRecord]] = defaultdict(list) for record in records: daily_data[record.date_key].append(record) # Aggregate statistics for each day daily_stats: dict[str, DailyStats] = {} for date, day_records in daily_data.items(): daily_stats[date] = _calculate_day_stats(date, day_records) return daily_stats def calculate_overall_stats(records: list[UsageRecord]) -> DailyStats: """ Calculate overall statistics across all records. Args: records: List of all usage records Returns: DailyStats object with totals across all time periods """ if not records: return DailyStats( date="all", total_prompts=0, total_responses=0, total_sessions=0, total_tokens=0, input_tokens=0, output_tokens=0, cache_creation_tokens=0, cache_read_tokens=0, models=set(), folders=set(), ) return _calculate_day_stats("all", records) def aggregate_all(records: list[UsageRecord]) -> AggregatedStats: """ Create complete aggregated statistics from usage records. Args: records: List of all usage records Returns: AggregatedStats object with daily and overall totals """ return AggregatedStats( daily_stats=aggregate_by_day(records), overall_totals=calculate_overall_stats(records), ) def get_date_range(daily_stats: dict[str, DailyStats], days: int = 365) -> list[str]: """ Get a list of dates for the specified range, ending today. Creates a continuous date range even if some days have no data. Args: daily_stats: Dictionary of daily statistics (used to determine if we have any data) days: Number of days to include in range (default: 365) Returns: List of date strings in YYYY-MM-DD format, from oldest to newest """ if not daily_stats: # If no data, return empty range return [] today = datetime.now().date() start_date = today - timedelta(days=days - 1) date_range = [] current_date = start_date while current_date <= today: date_range.append(current_date.strftime("%Y-%m-%d")) current_date += timedelta(days=1) return date_range def _calculate_day_stats(date: str, records: list[UsageRecord]) -> DailyStats: """ Calculate statistics for a single day's records. Args: date: Date string in YYYY-MM-DD format records: All usage records for this day Returns: DailyStats object with aggregated metrics """ unique_sessions = set() models = set() folders = set() total_prompts = 0 total_responses = 0 total_tokens = 0 input_tokens = 0 output_tokens = 0 cache_creation_tokens = 0 cache_read_tokens = 0 for record in records: unique_sessions.add(record.session_id) if record.model: models.add(record.model) folders.add(record.folder) # Count message types separately if record.is_user_prompt: total_prompts += 1 elif record.is_assistant_response: total_responses += 1 # Token usage only available on assistant responses if record.token_usage: total_tokens += record.token_usage.total_tokens input_tokens += record.token_usage.input_tokens output_tokens += record.token_usage.output_tokens cache_creation_tokens += record.token_usage.cache_creation_tokens cache_read_tokens += record.token_usage.cache_read_tokens return DailyStats( date=date, total_prompts=total_prompts, total_responses=total_responses, total_sessions=len(unique_sessions), total_tokens=total_tokens, input_tokens=input_tokens, output_tokens=output_tokens, cache_creation_tokens=cache_creation_tokens, cache_read_tokens=cache_read_tokens, models=models, folders=folders, ) #endregion ``` -------------------------------------------------------------------------------- /src/hooks/usage.py: -------------------------------------------------------------------------------- ```python #region Imports import shutil from pathlib import Path from rich.console import Console from src.config.user_config import get_storage_mode, set_storage_mode from src.storage.snapshot_db import DEFAULT_DB_PATH #endregion #region Functions def setup(console: Console, settings: dict, settings_path: Path) -> None: """ Set up the usage tracking hook. Args: console: Rich console for output settings: Settings dictionary to modify settings_path: Path to settings.json file """ # Check current storage mode current_mode = get_storage_mode() # Ask user to choose storage mode console.print("[bold cyan]Choose storage mode:[/bold cyan]\n") console.print(" [bold]1. Aggregate (default)[/bold] - Daily totals only (smaller, faster)") console.print(" • Stores: date, prompts count, tokens totals") console.print(" • ~10-50 KB for a year of data") console.print(" • Good for: Activity tracking, usage trends\n") console.print(" [bold]2. Full Analytics[/bold] - Every individual message (larger, detailed)") console.print(" • Stores: every prompt with model, folder, timestamps") console.print(" • ~5-10 MB for a year of heavy usage") console.print(" • Good for: Detailed analysis, per-project breakdowns\n") if current_mode == "full": console.print(f"[dim]Current mode: Full Analytics[/dim]") else: console.print(f"[dim]Current mode: Aggregate[/dim]") console.print("[dim]Enter 1 or 2 (or press Enter for default):[/dim] ", end="") try: user_input = input().strip() if user_input == "2": storage_mode = "full" else: storage_mode = "aggregate" except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return hook_command = "ccg update-usage > /dev/null 2>&1 &" # Check if already exists hook_exists = any(is_hook(hook) for hook in settings["hooks"]["Stop"]) # Warn if changing storage modes if current_mode != storage_mode and hook_exists: console.print("\n[bold yellow]⚠️ WARNING: Changing storage mode[/bold yellow]") console.print(f"[yellow]Current mode: {current_mode.title()}[/yellow]") console.print(f"[yellow]New mode: {storage_mode.title()}[/yellow]") console.print("") if current_mode == "full" and storage_mode == "aggregate": console.print("[yellow]• New data will only save daily totals (no individual messages)[/yellow]") console.print("[yellow]• Existing detailed records will remain but won't be updated[/yellow]") else: console.print("[yellow]• New data will save full details for each message[/yellow]") console.print("[yellow]• Historical aggregates will still be available[/yellow]") console.print("") console.print("[bold cyan]Would you like to create a backup of your database?[/bold cyan]") console.print(f"[dim]Database: {DEFAULT_DB_PATH}[/dim]") console.print("[dim]Backup will be saved as: usage_history.db.bak[/dim]") console.print("") console.print("[cyan]Create backup? (yes/no) [recommended: yes]:[/cyan] ", end="") try: backup_choice = input().strip().lower() if backup_choice in ["yes", "y"]: # Create backup backup_path = DEFAULT_DB_PATH.parent / "usage_history.db.bak" if DEFAULT_DB_PATH.exists(): shutil.copy2(DEFAULT_DB_PATH, backup_path) console.print(f"[green]✓ Backup created: {backup_path}[/green]") console.print(f"[dim]To restore: ccg restore-backup[/dim]") else: console.print("[yellow]No database file found to backup[/yellow]") except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return console.print("") console.print("[cyan]Continue with mode change? (yes/no):[/cyan] ", end="") try: confirm = input().strip().lower() if confirm not in ["yes", "y"]: console.print(f"[yellow]Cancelled - keeping current mode ({current_mode})[/yellow]") return except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return # Save storage mode preference set_storage_mode(storage_mode) if hook_exists: console.print(f"\n[yellow]Usage tracking hook already configured![/yellow]") console.print(f"[cyan]Storage mode updated to: {storage_mode}[/cyan]") return # Add hook settings["hooks"]["Stop"].append({ "matcher": "*", "hooks": [{ "type": "command", "command": hook_command }] }) console.print(f"[green]✓ Successfully configured usage tracking hook ({storage_mode} mode)[/green]") console.print("\n[bold]What this does:[/bold]") console.print(" • Runs after each Claude response completes") if storage_mode == "aggregate": console.print(" • Saves daily usage totals (lightweight)") else: console.print(" • Saves every individual message (detailed analytics)") console.print(" • Fills in gaps with empty records") console.print(" • Runs silently in the background") def is_hook(hook) -> bool: """ Check if a hook is a usage tracking hook. Recognizes both old-style (--update-usage) and new-style (update-usage) commands. Args: hook: Hook configuration dictionary Returns: True if this is a usage tracking hook, False otherwise """ if not isinstance(hook, dict) or "hooks" not in hook: return False for h in hook.get("hooks", []): command = h.get("command", "") # Support both old-style (--update-usage) and new-style (update-usage) # Also support both claude-goblin and ccg aliases if ("claude-goblin --update-usage" in command or "claude-goblin update-usage" in command or "ccg --update-usage" in command or "ccg update-usage" in command): return True return False #endregion ``` -------------------------------------------------------------------------------- /src/commands/limits.py: -------------------------------------------------------------------------------- ```python #region Imports import subprocess import re import os import pty import select import time from rich.console import Console #endregion #region Functions def _strip_ansi(text: str) -> str: """ Remove ANSI escape codes from text. Args: text: Text with ANSI codes Returns: Clean text without ANSI codes """ ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) def capture_limits() -> dict | None: """ Capture usage limits from `claude /usage` without displaying output. Returns: Dictionary with keys: session_pct, week_pct, opus_pct, session_reset, week_reset, opus_reset, or None if capture failed """ try: # Create a pseudo-terminal pair master, slave = pty.openpty() # Start claude /usage with the PTY process = subprocess.Popen( ['claude', '/usage'], stdin=slave, stdout=slave, stderr=slave, close_fds=True ) # Close slave in parent process (child keeps it open) os.close(slave) # Read output until we see complete data output = b'' start_time = time.time() max_wait = 10 while time.time() - start_time < max_wait: # Check if data is available to read ready, _, _ = select.select([master], [], [], 0.1) if ready: try: chunk = os.read(master, 4096) if chunk: output += chunk # Check if we hit trust prompt early - no point waiting if b'Do you trust the files in this folder?' in output: # We got the trust prompt, stop waiting time.sleep(0.5) # Give it a bit more time to finish rendering break # Check if we have complete data # Look for the usage screen's exit message, not the loading screen's "esc to interrupt" if b'Current week (Opus)' in output and b'Esc to exit' in output: # Wait a tiny bit more to ensure all data is flushed time.sleep(0.2) # Try to read any remaining data try: while True: ready, _, _ = select.select([master], [], [], 0.05) if not ready: break chunk = os.read(master, 4096) if chunk: output += chunk except: pass break except OSError: break # Send ESC to exit cleanly try: os.write(master, b'\x1b') time.sleep(0.1) except: pass # Clean up try: process.terminate() process.wait(timeout=1) except: process.kill() os.close(master) # Decode output output_str = output.decode('utf-8', errors='replace') # Strip ANSI codes clean_output = _strip_ansi(output_str) # Check if we hit the trust prompt if 'Do you trust the files in this folder?' in clean_output: return { "error": "trust_prompt", "message": "Claude prompted for folder trust. Please run 'claude' in a trusted folder first, or cd to a project directory." } # Parse for percentages and reset times session_match = re.search(r'Current session.*?(\d+)%\s+used.*?Resets\s+(.+?)(?:\n|$)', clean_output, re.DOTALL) week_match = re.search(r'Current week \(all models\).*?(\d+)%\s+used.*?Resets\s+(.+?)(?:\n|$)', clean_output, re.DOTALL) opus_match = re.search(r'Current week \(Opus\).*?(\d+)%\s+used.*?Resets\s+(.+?)(?:\n|$)', clean_output, re.DOTALL) if session_match and week_match and opus_match: return { "session_pct": int(session_match.group(1)), "week_pct": int(week_match.group(1)), "opus_pct": int(opus_match.group(1)), "session_reset": session_match.group(2).strip(), "week_reset": week_match.group(2).strip(), "opus_reset": opus_match.group(2).strip(), } return None except Exception as e: # Debug: print the error to help diagnose issues import sys print(f"[DEBUG] capture_limits failed: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return None def run(console: Console) -> None: """ Show current usage limits by parsing `claude /usage` output. Uses Python's pty module to create a pseudo-terminal for capturing TUI output from `claude /usage`, then strips ANSI codes and extracts percentage values. Args: console: Rich console for output """ try: limits = capture_limits() console.print() if limits: # Check if it's an error response if "error" in limits: console.print(f"[yellow]{limits['message']}[/yellow]") else: console.print(f"[bold]Session:[/bold] [#ff8800]{limits['session_pct']}%[/#ff8800] (resets [not bold cyan]{limits['session_reset']}[/not bold cyan])") console.print(f"[bold]Week:[/bold] [#ff8800]{limits['week_pct']}%[/#ff8800] (resets [not bold cyan]{limits['week_reset']}[/not bold cyan])") console.print(f"[bold]Opus:[/bold] [#ff8800]{limits['opus_pct']}%[/#ff8800] (resets [not bold cyan]{limits['opus_reset']}[/not bold cyan])") else: console.print("[yellow]Could not parse usage data from 'claude /usage'[/yellow]") console.print() except FileNotFoundError: console.print("[red]Error: 'claude' command not found[/red]") except Exception as e: console.print(f"[red]Error: {e}[/red]") import traceback traceback.print_exc() #endregion ``` -------------------------------------------------------------------------------- /src/commands/export.py: -------------------------------------------------------------------------------- ```python #region Imports import sys from datetime import datetime from pathlib import Path from rich.console import Console from src.aggregation.daily_stats import aggregate_all from src.commands.limits import capture_limits from src.config.settings import get_claude_jsonl_files from src.config.user_config import get_tracking_mode, get_storage_mode from src.data.jsonl_parser import parse_all_jsonl_files from src.storage.snapshot_db import ( load_historical_records, get_limits_data, save_limits_snapshot, save_snapshot, get_database_stats, DEFAULT_DB_PATH, ) from src.utils._system import open_file #endregion #region Functions def run(console: Console) -> None: """ Export the heatmap to PNG or SVG. Exports a GitHub-style activity heatmap as an image file. Supports PNG (default) and SVG formats, with optional file opening. Args: console: Rich console for output Flags: svg: Export as SVG instead of PNG --open: Open file after export --fast: Skip updates, read directly from database (faster) --year YYYY or -y YYYY: Filter by year (default: current year) -o FILE or --output FILE: Specify output file path """ from src.visualization.export import export_heatmap_svg, export_heatmap_png # Check for --fast flag fast_mode = "--fast" in sys.argv # Determine format from arguments (PNG is default) format_type = "png" if "svg" in sys.argv: format_type = "svg" # Check for --open flag should_open = "--open" in sys.argv # Parse year filter (--year YYYY) year_filter = None for i, arg in enumerate(sys.argv): if arg in ["--year", "-y"] and i + 1 < len(sys.argv): try: year_filter = int(sys.argv[i + 1]) except ValueError: console.print(f"[red]Invalid year: {sys.argv[i + 1]}[/red]") return break # Default to current year if not specified if year_filter is None: year_filter = datetime.now().year # Determine output path output_file = None custom_output = False for i, arg in enumerate(sys.argv): if arg in ["-o", "--output"] and i + 1 < len(sys.argv): output_file = sys.argv[i + 1] custom_output = True break if not output_file: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"claude-usage-{timestamp}.{format_type}" # Use absolute path, or resolve based on whether -o flag was used output_path = Path(output_file) if not output_path.is_absolute(): if custom_output: # If -o flag was used, resolve relative to current working directory output_path = Path.cwd() / output_path else: # Default location: ~/.claude/usage/ default_dir = Path.home() / ".claude" / "usage" default_dir.mkdir(parents=True, exist_ok=True) output_path = default_dir / output_file try: # Check if database exists when using --fast if fast_mode and not DEFAULT_DB_PATH.exists(): console.print("[red]Error: Cannot use --fast flag without existing database.[/red]") console.print("[yellow]Run 'ccg usage' or 'ccg update-usage' first to create the database.[/yellow]") return # If fast mode, show warning with last update timestamp if fast_mode: db_stats = get_database_stats() if db_stats.get("newest_timestamp"): # Format ISO timestamp to be more readable timestamp_str = db_stats["newest_timestamp"] try: dt = datetime.fromisoformat(timestamp_str) formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") console.print(f"[bold red]⚠ Fast mode: Reading from last update ({formatted_time})[/bold red]") except (ValueError, AttributeError): console.print(f"[bold red]⚠ Fast mode: Reading from last update ({timestamp_str})[/bold red]") else: console.print("[bold red]⚠ Fast mode: Reading from database (no timestamp available)[/bold red]") # Update data unless in fast mode if not fast_mode: # Step 1: Update usage data with console.status("[bold #ff8800]Updating usage data...", spinner="dots", spinner_style="#ff8800"): jsonl_files = get_claude_jsonl_files() if jsonl_files: current_records = parse_all_jsonl_files(jsonl_files) if current_records: save_snapshot(current_records, storage_mode=get_storage_mode()) # Step 2: Update limits data (if enabled) tracking_mode = get_tracking_mode() if tracking_mode in ["both", "limits"]: with console.status("[bold #ff8800]Updating usage limits...", spinner="dots", spinner_style="#ff8800"): limits = capture_limits() if limits and "error" not in limits: save_limits_snapshot( session_pct=limits["session_pct"], week_pct=limits["week_pct"], opus_pct=limits["opus_pct"], session_reset=limits["session_reset"], week_reset=limits["week_reset"], opus_reset=limits["opus_reset"], ) # Load data from database with console.status(f"[bold #ff8800]Loading data for {year_filter}...", spinner="dots", spinner_style="#ff8800"): all_records = load_historical_records() if not all_records: console.print("[yellow]No usage data found in database. Run 'ccg usage' to ingest data first.[/yellow]") return stats = aggregate_all(all_records) # Load limits data and tracking mode limits_data = get_limits_data() tracking_mode = get_tracking_mode() console.print(f"[cyan]Exporting to {format_type.upper()}...[/cyan]") if format_type == "png": export_heatmap_png(stats, output_path, limits_data=limits_data, year=year_filter, tracking_mode=tracking_mode) else: export_heatmap_svg(stats, output_path, year=year_filter) console.print(f"[green]✓ Exported to: {output_path.absolute()}[/green]") # Open the file if --open flag is present if should_open: console.print(f"[cyan]Opening {format_type.upper()}...[/cyan]") open_file(output_path) except ImportError as e: console.print(f"[red]{e}[/red]") except Exception as e: console.print(f"[red]Error exporting: {e}[/red]") import traceback traceback.print_exc() #endregion ``` -------------------------------------------------------------------------------- /src/hooks/manager.py: -------------------------------------------------------------------------------- ```python #region Imports import json from pathlib import Path from typing import Optional from rich.console import Console from src.hooks import usage, audio, png, audio_tts #endregion #region Functions def setup_hooks(console: Console, hook_type: Optional[str] = None) -> None: """ Set up Claude Code hooks for automation. Args: console: Rich console for output hook_type: Type of hook to set up ('usage', 'audio', 'png', or None for menu) """ settings_path = Path.home() / ".claude" / "settings.json" if hook_type is None: # Show menu console.print("[bold cyan]Available hooks to set up:[/bold cyan]\n") console.print(" [bold]usage[/bold] - Auto-track usage after each response") console.print(" [bold]audio[/bold] - Play sounds for completion & permission requests") console.print(" [bold]audio-tts[/bold] - Speak permission requests using TTS (macOS only)") console.print(" [bold]png[/bold] - Auto-update usage PNG after each response\n") console.print("Usage: ccg setup-hooks <type>") console.print("Example: ccg setup-hooks usage") return console.print(f"[bold cyan]Setting up {hook_type} hook[/bold cyan]\n") try: # Read existing settings if settings_path.exists(): with open(settings_path, "r") as f: settings = json.load(f) else: settings = {} # Initialize hooks structure if "hooks" not in settings: settings["hooks"] = {} if "Stop" not in settings["hooks"]: settings["hooks"]["Stop"] = [] if "Notification" not in settings["hooks"]: settings["hooks"]["Notification"] = [] # Delegate to specific hook module if hook_type == "usage": usage.setup(console, settings, settings_path) elif hook_type == "audio": audio.setup(console, settings, settings_path) elif hook_type == "audio-tts": audio_tts.setup(console, settings, settings_path) elif hook_type == "png": png.setup(console, settings, settings_path) else: console.print(f"[red]Unknown hook type: {hook_type}[/red]") console.print("Valid types: usage, audio, audio-tts, png") return # Write settings back with open(settings_path, "w") as f: json.dump(settings, f, indent=2) console.print("\n[dim]Hook location: ~/.claude/settings.json[/dim]") console.print(f"[dim]To remove: ccg remove-hooks {hook_type}[/dim]") except Exception as e: console.print(f"[red]Error setting up hooks: {e}[/red]") import traceback traceback.print_exc() def remove_hooks(console: Console, hook_type: Optional[str] = None) -> None: """ Remove Claude Code hooks configured by this tool. Args: console: Rich console for output hook_type: Type of hook to remove ('usage', 'audio', 'png', or None for all) """ settings_path = Path.home() / ".claude" / "settings.json" if not settings_path.exists(): console.print("[yellow]No Claude Code settings file found.[/yellow]") return console.print(f"[bold cyan]Removing hooks[/bold cyan]\n") try: # Read existing settings with open(settings_path, "r") as f: settings = json.load(f) if "hooks" not in settings: console.print("[yellow]No hooks configured.[/yellow]") return # Initialize hook lists if they don't exist if "Stop" not in settings["hooks"]: settings["hooks"]["Stop"] = [] if "Notification" not in settings["hooks"]: settings["hooks"]["Notification"] = [] if "PreCompact" not in settings["hooks"]: settings["hooks"]["PreCompact"] = [] original_stop_count = len(settings["hooks"]["Stop"]) original_notification_count = len(settings["hooks"]["Notification"]) original_precompact_count = len(settings["hooks"]["PreCompact"]) # Remove hooks based on type if hook_type == "usage": settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not usage.is_hook(hook) ] removed_type = "usage tracking" elif hook_type == "audio": settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not audio.is_hook(hook) ] settings["hooks"]["Notification"] = [ hook for hook in settings["hooks"]["Notification"] if not audio.is_hook(hook) ] settings["hooks"]["PreCompact"] = [ hook for hook in settings["hooks"]["PreCompact"] if not audio.is_hook(hook) ] removed_type = "audio notification" elif hook_type == "audio-tts": settings["hooks"]["Notification"] = [ hook for hook in settings["hooks"]["Notification"] if not audio_tts.is_hook(hook) ] settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not audio_tts.is_hook(hook) ] settings["hooks"]["PreCompact"] = [ hook for hook in settings["hooks"]["PreCompact"] if not audio_tts.is_hook(hook) ] removed_type = "audio TTS" elif hook_type == "png": settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not png.is_hook(hook) ] removed_type = "PNG auto-update" else: # Remove all our hooks settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not (usage.is_hook(hook) or audio.is_hook(hook) or png.is_hook(hook)) ] settings["hooks"]["Notification"] = [ hook for hook in settings["hooks"]["Notification"] if not (usage.is_hook(hook) or audio.is_hook(hook) or png.is_hook(hook) or audio_tts.is_hook(hook)) ] settings["hooks"]["PreCompact"] = [ hook for hook in settings["hooks"]["PreCompact"] if not (audio.is_hook(hook) or audio_tts.is_hook(hook)) ] removed_type = "all claude-goblin" removed_count = (original_stop_count - len(settings["hooks"]["Stop"])) + \ (original_notification_count - len(settings["hooks"]["Notification"])) + \ (original_precompact_count - len(settings["hooks"]["PreCompact"])) if removed_count == 0: console.print(f"[yellow]No {removed_type} hooks found to remove.[/yellow]") return # Write settings back with open(settings_path, "w") as f: json.dump(settings, f, indent=2) console.print(f"[green]✓ Removed {removed_count} {removed_type} hook(s)[/green]") console.print(f"[dim]Settings file: ~/.claude/settings.json[/dim]") except Exception as e: console.print(f"[red]Error removing hooks: {e}[/red]") import traceback traceback.print_exc() #endregion ``` -------------------------------------------------------------------------------- /src/commands/usage.py: -------------------------------------------------------------------------------- ```python #region Imports import sys import time from pathlib import Path from rich.console import Console from src.aggregation.daily_stats import aggregate_all from src.commands.limits import capture_limits from src.config.settings import ( DEFAULT_REFRESH_INTERVAL, get_claude_jsonl_files, ) from src.config.user_config import get_storage_mode, get_tracking_mode from src.data.jsonl_parser import parse_all_jsonl_files from src.storage.snapshot_db import ( get_database_stats, load_historical_records, save_limits_snapshot, save_snapshot, ) from src.visualization.dashboard import render_dashboard #endregion #region Functions def run(console: Console, live: bool = False, fast: bool = False, anon: bool = False) -> None: """ Handle the usage command. Loads Claude Code usage data and displays a dashboard with GitHub-style activity graph and statistics. Supports live refresh mode. Args: console: Rich console for output live: Enable auto-refresh mode (default: False) fast: Skip limits fetching for faster rendering (default: False) anon: Anonymize project names to project-001, project-002, etc (default: False) Exit: Exits with status 0 on success, 1 on error """ # Check sys.argv for backward compatibility (hooks still use old style) run_live = live or "--live" in sys.argv skip_limits = fast or "--fast" in sys.argv anonymize = anon or "--anon" in sys.argv try: with console.status("[bold #ff8800]Loading Claude Code usage data...", spinner="dots", spinner_style="#ff8800"): jsonl_files = get_claude_jsonl_files() if not jsonl_files: console.print( "[yellow]No Claude Code data found. " "Make sure you've used Claude Code at least once.[/yellow]" ) return console.print(f"[dim]Found {len(jsonl_files)} session files[/dim]", end="") # Run with or without live refresh if run_live: _run_live_dashboard(jsonl_files, console, skip_limits, anonymize) else: _display_dashboard(jsonl_files, console, skip_limits, anonymize) except FileNotFoundError as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) except KeyboardInterrupt: console.print("\n[cyan]Exiting...[/cyan]") sys.exit(0) except Exception as e: console.print(f"[red]Unexpected error: {e}[/red]") import traceback traceback.print_exc() sys.exit(1) def _run_live_dashboard(jsonl_files: list[Path], console: Console, skip_limits: bool = False, anonymize: bool = False) -> None: """ Run dashboard with auto-refresh. Args: jsonl_files: List of JSONL files to parse console: Rich console for output skip_limits: Skip limits fetching for faster rendering anonymize: Anonymize project names """ console.print( f"[dim]Auto-refreshing every {DEFAULT_REFRESH_INTERVAL} seconds. " "Press Ctrl+C to exit.[/dim]\n" ) while True: try: _display_dashboard(jsonl_files, console, skip_limits, anonymize) time.sleep(DEFAULT_REFRESH_INTERVAL) except KeyboardInterrupt: raise def _display_dashboard(jsonl_files: list[Path], console: Console, skip_limits: bool = False, anonymize: bool = False) -> None: """ Ingest JSONL data and display dashboard. This performs two steps: 1. Ingestion: Read JSONL files and save to DB (with deduplication) 2. Display: Read from DB and render dashboard Args: jsonl_files: List of JSONL files to parse console: Rich console for output skip_limits: Skip ALL updates, read directly from DB (fast mode) anonymize: Anonymize project names to project-001, project-002, etc """ from src.storage.snapshot_db import get_latest_limits, DEFAULT_DB_PATH, get_database_stats # Check if database exists when using --fast if skip_limits and not DEFAULT_DB_PATH.exists(): console.clear() console.print("[red]Error: Cannot use --fast flag without existing database.[/red]") console.print("[yellow]Run 'ccg usage' (without --fast) first to create the database.[/yellow]") return # Update data unless in fast mode if not skip_limits: # Step 1: Update usage data with console.status("[bold #ff8800]Updating usage data...", spinner="dots", spinner_style="#ff8800"): current_records = parse_all_jsonl_files(jsonl_files) # Save to database (with automatic deduplication via UNIQUE constraint) if current_records: save_snapshot(current_records, storage_mode=get_storage_mode()) # Step 2: Update limits data (if enabled) tracking_mode = get_tracking_mode() if tracking_mode in ["both", "limits"]: with console.status("[bold #ff8800]Updating usage limits...", spinner="dots", spinner_style="#ff8800"): limits = capture_limits() if limits and "error" not in limits: save_limits_snapshot( session_pct=limits["session_pct"], week_pct=limits["week_pct"], opus_pct=limits["opus_pct"], session_reset=limits["session_reset"], week_reset=limits["week_reset"], opus_reset=limits["opus_reset"], ) # Step 3: Prepare dashboard from database with console.status("[bold #ff8800]Preparing dashboard...", spinner="dots", spinner_style="#ff8800"): all_records = load_historical_records() # Get latest limits from DB (if we saved them above or if they exist) limits_from_db = get_latest_limits() if not all_records: console.clear() console.print( "[yellow]No usage data found in database. Run --update-usage to ingest data.[/yellow]" ) return # Clear screen before displaying dashboard console.clear() # Get date range for footer dates = sorted(set(r.date_key for r in all_records)) date_range = None if dates: date_range = f"{dates[0]} to {dates[-1]}" # Anonymize project names if requested if anonymize: all_records = _anonymize_projects(all_records) # Aggregate statistics stats = aggregate_all(all_records) # Render dashboard with limits from DB (no live fetch needed) render_dashboard(stats, all_records, console, skip_limits=True, clear_screen=False, date_range=date_range, limits_from_db=limits_from_db, fast_mode=skip_limits) def _anonymize_projects(records: list) -> list: """ Anonymize project folder names by ranking them by total tokens and replacing with project-001, project-002, etc (where project-001 is the highest usage). Args: records: List of UsageRecord objects Returns: List of UsageRecord objects with anonymized folder names """ from collections import defaultdict from dataclasses import replace # Calculate total tokens per project project_totals = defaultdict(int) for record in records: if record.token_usage: project_totals[record.folder] += record.token_usage.total_tokens # Sort projects by total tokens (descending) and create mapping sorted_projects = sorted(project_totals.items(), key=lambda x: x[1], reverse=True) project_mapping = { folder: f"project-{str(i+1).zfill(3)}" for i, (folder, _) in enumerate(sorted_projects) } # Replace folder names in records anonymized_records = [] for record in records: anonymized_records.append( replace(record, folder=project_mapping.get(record.folder, record.folder)) ) return anonymized_records #endregion ``` -------------------------------------------------------------------------------- /src/aggregation/usage_limits.py: -------------------------------------------------------------------------------- ```python #region Imports from datetime import datetime, timedelta, timezone from typing import Optional from dataclasses import dataclass from src.models.usage_record import UsageRecord #endregion #region Constants # Known token limits per 5-hour session (from community research) SESSION_LIMITS = { "pro": 44_000, "max_5x": 88_000, "max_20x": 220_000, } # Weekly limits (estimated based on usage data) # These are approximate - Claude doesn't publish exact limits WEEKLY_LIMITS = { "pro": { "total": 300_000, # Rough estimate for total weekly tokens "opus": 0, # Pro doesn't get Opus access }, "max_5x": { "total": 1_500_000, # Rough estimate "opus": 150_000, # Switches at 20% usage }, "max_20x": { "total": 3_000_000, # Rough estimate "opus": 300_000, # Switches at 50% usage }, } #endregion #region Data Classes @dataclass class SessionUsage: """Usage data for a single 5-hour session.""" session_id: str start_time: datetime end_time: datetime total_tokens: int input_tokens: int output_tokens: int cache_creation_tokens: int cache_read_tokens: int records: list[UsageRecord] @dataclass class WeeklyUsage: """Usage data for a week (7 days).""" start_date: datetime end_date: datetime total_tokens: int opus_tokens: int sonnet_tokens: int haiku_tokens: int sessions: list[SessionUsage] @dataclass class UsageLimits: """Usage limits and current usage percentages.""" plan_type: str # Current session (5-hour window) current_session_tokens: int session_limit: int session_percentage: float session_reset_time: Optional[datetime] # Current week (7 days) current_week_tokens: int week_limit: int week_percentage: float week_reset_time: Optional[datetime] # Opus-specific (for Max plans) current_week_opus_tokens: int opus_limit: int opus_percentage: float #endregion #region Functions def get_current_session_usage( records: list[UsageRecord], session_window_hours: int = 5 ) -> tuple[int, Optional[datetime]]: """ Calculate token usage for the current 5-hour session window. Claude's usage limits are based on rolling 5-hour windows. A session starts with the first message and expires 5 hours later. Args: records: List of usage records session_window_hours: Hours in the session window (default: 5) Returns: Tuple of (total_tokens, session_reset_time) Common failure modes: - Empty records list returns (0, None) - Records without timestamps are skipped """ if not records: return 0, None # Sort records by timestamp (most recent first) sorted_records = sorted( records, key=lambda r: r.timestamp, reverse=True ) # Find the most recent session now = datetime.now(timezone.utc) session_window = timedelta(hours=session_window_hours) # The current session started with the most recent message most_recent = sorted_records[0] session_start = most_recent.timestamp session_end = session_start + session_window # Calculate tokens used in this session window total_tokens = 0 for record in sorted_records: # Ensure timezone-aware comparison record_time = record.timestamp if record_time.tzinfo is None: record_time = record_time.replace(tzinfo=timezone.utc) # Only count records within the current session window if session_start <= record_time <= now: if record.token_usage: total_tokens += record.token_usage.total_tokens else: # Records are sorted, so we can break early break return total_tokens, session_end def get_weekly_usage( records: list[UsageRecord], weeks_back: int = 0 ) -> WeeklyUsage: """ Calculate token usage for the current or past week. Args: records: List of usage records weeks_back: Number of weeks to look back (0 = current week) Returns: WeeklyUsage object with token totals by model Common failure modes: - Empty records list returns WeeklyUsage with all zeros - Records without token_usage are skipped """ now = datetime.now(timezone.utc) # Calculate week boundaries # Week starts on Monday (isoweekday() returns 1 for Monday) days_since_monday = now.isoweekday() - 1 week_start = (now - timedelta(days=days_since_monday + (weeks_back * 7))).replace( hour=0, minute=0, second=0, microsecond=0 ) week_end = week_start + timedelta(days=7) # Filter records within the week total_tokens = 0 opus_tokens = 0 sonnet_tokens = 0 haiku_tokens = 0 for record in records: # Ensure timezone-aware comparison record_time = record.timestamp if record_time.tzinfo is None: record_time = record_time.replace(tzinfo=timezone.utc) if week_start <= record_time < week_end: if record.token_usage: tokens = record.token_usage.total_tokens total_tokens += tokens # Categorize by model if record.model and "opus" in record.model.lower(): opus_tokens += tokens elif record.model and "sonnet" in record.model.lower(): sonnet_tokens += tokens elif record.model and "haiku" in record.model.lower(): haiku_tokens += tokens return WeeklyUsage( start_date=week_start, end_date=week_end, total_tokens=total_tokens, opus_tokens=opus_tokens, sonnet_tokens=sonnet_tokens, haiku_tokens=haiku_tokens, sessions=[], ) def calculate_usage_limits( records: list[UsageRecord], plan_type: str = "max_20x" ) -> UsageLimits: """ Calculate usage limits and percentages for the current session and week. This function provides the same percentage calculations that Claude's /usage command shows, based on known plan limits. Args: records: List of usage records plan_type: One of "pro", "max_5x", "max_20x" Returns: UsageLimits object with current usage and percentages Common failure modes: - Invalid plan_type defaults to "max_20x" - Empty records list returns all zeros """ if plan_type not in SESSION_LIMITS: plan_type = "max_20x" # Get session usage session_tokens, session_reset = get_current_session_usage(records) session_limit = SESSION_LIMITS[plan_type] session_percentage = (session_tokens / session_limit * 100) if session_limit > 0 else 0.0 # Get weekly usage weekly = get_weekly_usage(records) week_limit = WEEKLY_LIMITS[plan_type]["total"] week_percentage = (weekly.total_tokens / week_limit * 100) if week_limit > 0 else 0.0 # Get Opus-specific usage opus_limit = WEEKLY_LIMITS[plan_type]["opus"] opus_percentage = (weekly.opus_tokens / opus_limit * 100) if opus_limit > 0 else 0.0 # Calculate week reset time (next Monday at 00:00) now = datetime.now(timezone.utc) days_until_monday = (7 - now.isoweekday() + 1) % 7 if days_until_monday == 0: days_until_monday = 7 week_reset = (now + timedelta(days=days_until_monday)).replace( hour=0, minute=0, second=0, microsecond=0 ) return UsageLimits( plan_type=plan_type, current_session_tokens=session_tokens, session_limit=session_limit, session_percentage=session_percentage, session_reset_time=session_reset, current_week_tokens=weekly.total_tokens, week_limit=week_limit, week_percentage=week_percentage, week_reset_time=week_reset, current_week_opus_tokens=weekly.opus_tokens, opus_limit=opus_limit, opus_percentage=opus_percentage, ) #endregion ``` -------------------------------------------------------------------------------- /src/commands/stats.py: -------------------------------------------------------------------------------- ```python #region Imports import sys from datetime import datetime from rich.console import Console from src.commands.limits import capture_limits from src.config.settings import get_claude_jsonl_files from src.config.user_config import get_storage_mode, get_tracking_mode from src.data.jsonl_parser import parse_all_jsonl_files from src.storage.snapshot_db import ( DEFAULT_DB_PATH, get_database_stats, get_text_analysis_stats, save_limits_snapshot, save_snapshot, ) #endregion #region Functions def run(console: Console, fast: bool = False) -> None: """ Show statistics about the historical database. Displays comprehensive statistics including: - Summary: total tokens, prompts, responses, sessions, days tracked - Cost analysis: estimated API costs vs Max Plan costs - Averages: tokens per session/response, cost per session/response - Text analysis: prompt length, politeness markers, phrase counts - Usage by model: token distribution across different models Args: console: Rich console for output fast: Skip updates, read directly from database (default: False) """ # Check for --fast flag in sys.argv for backward compatibility fast_mode = fast or "--fast" in sys.argv # Check if database exists when using --fast if fast_mode and not DEFAULT_DB_PATH.exists(): console.print("[red]Error: Cannot use --fast flag without existing database.[/red]") console.print("[yellow]Run 'ccg stats' (without --fast) first to create the database.[/yellow]") return # If fast mode, show warning with last update timestamp if fast_mode: db_stats_temp = get_database_stats() if db_stats_temp.get("newest_timestamp"): # Format ISO timestamp to be more readable timestamp_str = db_stats_temp["newest_timestamp"] try: dt = datetime.fromisoformat(timestamp_str) formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") console.print(f"[bold red]⚠ Fast mode: Reading from last update ({formatted_time})[/bold red]\n") except (ValueError, AttributeError): console.print(f"[bold red]⚠ Fast mode: Reading from last update ({timestamp_str})[/bold red]\n") else: console.print("[bold red]⚠ Fast mode: Reading from database (no timestamp available)[/bold red]\n") # Update data unless in fast mode if not fast_mode: # Step 1: Ingestion - parse JSONL and save to DB with console.status("[bold #ff8800]Updating database...", spinner="dots", spinner_style="#ff8800"): jsonl_files = get_claude_jsonl_files() if jsonl_files: current_records = parse_all_jsonl_files(jsonl_files) if current_records: save_snapshot(current_records, storage_mode=get_storage_mode()) # Step 2: Update limits data (if enabled) tracking_mode = get_tracking_mode() if tracking_mode in ["both", "limits"]: with console.status("[bold #ff8800]Updating usage limits...", spinner="dots", spinner_style="#ff8800"): limits = capture_limits() if limits and "error" not in limits: save_limits_snapshot( session_pct=limits["session_pct"], week_pct=limits["week_pct"], opus_pct=limits["opus_pct"], session_reset=limits["session_reset"], week_reset=limits["week_reset"], opus_reset=limits["opus_reset"], ) # Step 3: Display stats from DB db_stats = get_database_stats() if db_stats["total_records"] == 0 and db_stats["total_prompts"] == 0: console.print("[yellow]No historical data found. Run ccg usage to start tracking.[/yellow]") return console.print("[bold cyan]Claude Code Usage Statistics[/bold cyan]\n") # Summary Statistics console.print("[bold]Summary[/bold]") console.print(f" Total Tokens: {db_stats['total_tokens']:>15,}") console.print(f" Total Prompts: {db_stats['total_prompts']:>15,}") console.print(f" Total Responses: {db_stats['total_responses']:>15,}") console.print(f" Total Sessions: {db_stats['total_sessions']:>15,}") console.print(f" Days Tracked: {db_stats['total_days']:>15,}") console.print(f" Date Range: {db_stats['oldest_date']} to {db_stats['newest_date']}") # Cost Summary (if using API pricing) if db_stats['total_cost'] > 0: # Calculate actual months covered from date range start_date = datetime.strptime(db_stats['oldest_date'], "%Y-%m-%d") end_date = datetime.strptime(db_stats['newest_date'], "%Y-%m-%d") # Count unique months covered months_covered = set() current = start_date while current <= end_date: months_covered.add((current.year, current.month)) # Move to next month if current.month == 12: current = current.replace(year=current.year + 1, month=1, day=1) else: current = current.replace(month=current.month + 1, day=1) num_months = len(months_covered) plan_cost = num_months * 200.0 # $200/month Max Plan savings = db_stats['total_cost'] - plan_cost console.print(f"\n[bold]Cost Analysis[/bold]") console.print(f" Est. Cost (if using API): ${db_stats['total_cost']:>10,.2f}") console.print(f" Plan Cost: ${plan_cost:>14,.2f} ({num_months} month{'s' if num_months > 1 else ''} @ $200/mo)") if savings > 0: console.print(f" You Saved: ${savings:>14,.2f} (vs API)") else: overpaid = abs(savings) console.print(f" Plan Costs More: ${overpaid:>14,.2f}") console.print(f" [dim]Light usage - API would be cheaper[/dim]") # Averages console.print(f"\n[bold]Averages[/bold]") console.print(f" Tokens per Session: {db_stats['avg_tokens_per_session']:>15,}") console.print(f" Tokens per Response: {db_stats['avg_tokens_per_response']:>15,}") if db_stats['total_cost'] > 0: console.print(f" Cost per Session: ${db_stats['avg_cost_per_session']:>14,.2f}") console.print(f" Cost per Response: ${db_stats['avg_cost_per_response']:>14,.4f}") # Text Analysis (from current JSONL files) text_stats = get_text_analysis_stats() if text_stats["avg_user_prompt_chars"] > 0: console.print(f"\n[bold]Text Analysis[/bold]") console.print(f" Avg Prompt Length: {text_stats['avg_user_prompt_chars']:>15,} chars") console.print(f" User Swears: {text_stats['user_swears']:>15,}") console.print(f" Claude Swears: {text_stats['assistant_swears']:>15,}") console.print(f" User Thanks: {text_stats['user_thanks']:>15,}") console.print(f" User Please: {text_stats['user_please']:>15,}") console.print(f" Claude \"Perfect!\"/\"Excellent!\": {text_stats['perfect_count']:>10,}") console.print(f" Claude \"You're absolutely right!\": {text_stats['absolutely_right_count']:>6,}") # Tokens by Model if db_stats["tokens_by_model"]: console.print(f"\n[bold]Usage by Model[/bold]") for model, tokens in db_stats["tokens_by_model"].items(): percentage = (tokens / db_stats['total_tokens'] * 100) if db_stats['total_tokens'] > 0 else 0 cost = db_stats["cost_by_model"].get(model, 0.0) if cost > 0: console.print(f" {model:30s} {tokens:>15,} ({percentage:5.1f}%) ${cost:>10,.2f}") else: console.print(f" {model:30s} {tokens:>15,} ({percentage:5.1f}%)") # Database Info console.print(f"\n[dim]Database: ~/.claude/usage/usage_history.db[/dim]") if db_stats["total_records"] > 0: console.print(f"[dim]Detail records: {db_stats['total_records']:,} (full analytics mode)[/dim]") else: console.print(f"[dim]Storage mode: aggregate (daily totals only)[/dim]") #endregion ``` -------------------------------------------------------------------------------- /src/hooks/audio.py: -------------------------------------------------------------------------------- ```python #region Imports import platform from pathlib import Path from typing import Optional from rich.console import Console from src.utils._system import get_sound_command #endregion #region Functions def setup(console: Console, settings: dict, settings_path: Path) -> None: """ Set up the audio notification hook. Args: console: Rich console for output settings: Settings dictionary to modify settings_path: Path to settings.json file """ # Offer sound choices console.print("[bold cyan]Choose notification sounds:[/bold cyan]\n") console.print("[dim]You'll pick three sounds: completion, permission requests, and conversation compaction[/dim]\n") # Check if audio-tts hook exists if "Notification" in settings.get("hooks", {}): from src.hooks import audio_tts existing_tts_hooks = [hook for hook in settings["hooks"]["Notification"] if audio_tts.is_hook(hook)] if existing_tts_hooks: console.print("[yellow]⚠ Warning: You already have an audio TTS hook configured.[/yellow]") console.print("[yellow]Setting up audio will replace it with simple sound notifications.[/yellow]\n") console.print("[dim]Continue? (y/n):[/dim] ", end="") try: user_input = input().strip().lower() if user_input != "y": console.print("[yellow]Cancelled[/yellow]") return except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return console.print() system = platform.system() if system == "Darwin": sounds = [ ("Glass", "Clear glass sound (recommended for completion)"), ("Ping", "Short ping sound (recommended for permission)"), ("Purr", "Soft purr sound"), ("Tink", "Quick tink sound"), ("Pop", "Pop sound"), ("Basso", "Low bass sound"), ("Blow", "Blow sound"), ("Bottle", "Bottle sound"), ("Frog", "Frog sound"), ("Funk", "Funk sound"), ] elif system == "Windows": sounds = [ ("Windows Notify", "Default notification"), ("Windows Ding", "Ding sound"), ("chimes", "Chimes sound"), ("chord", "Chord sound"), ("notify", "System notify"), ] else: # Linux sounds = [ ("complete", "Completion sound"), ("bell", "Bell sound"), ("message", "Message sound"), ("dialog-information", "Info dialog"), ("service-login", "Login sound"), ] # Choose completion sound console.print("[bold]Sound for when Claude finishes responding:[/bold]") for idx, (name, desc) in enumerate(sounds, 1): console.print(f" {idx}. {name} - {desc}") console.print("\n[dim]Enter number (default: 1):[/dim] ", end="") try: user_input = input().strip() if user_input == "": completion_sound = sounds[0][0] elif user_input.isdigit() and 1 <= int(user_input) <= len(sounds): completion_sound = sounds[int(user_input) - 1][0] else: console.print("[yellow]Invalid selection, using default[/yellow]") completion_sound = sounds[0][0] except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return # Choose permission sound console.print("\n[bold]Sound for when Claude requests permission:[/bold]") for idx, (name, desc) in enumerate(sounds, 1): console.print(f" {idx}. {name} - {desc}") console.print("\n[dim]Enter number (default: 2):[/dim] ", end="") try: user_input = input().strip() if user_input == "": # Default to second sound if available permission_sound = sounds[1][0] if len(sounds) > 1 else sounds[0][0] elif user_input.isdigit() and 1 <= int(user_input) <= len(sounds): permission_sound = sounds[int(user_input) - 1][0] else: console.print("[yellow]Invalid selection, using default[/yellow]") permission_sound = sounds[1][0] if len(sounds) > 1 else sounds[0][0] except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return # Choose compaction sound console.print("\n[bold]Sound for before conversation compaction:[/bold]") for idx, (name, desc) in enumerate(sounds, 1): console.print(f" {idx}. {name} - {desc}") console.print("\n[dim]Enter number (default: 3):[/dim] ", end="") try: user_input = input().strip() if user_input == "": # Default to third sound if available compaction_sound = sounds[2][0] if len(sounds) > 2 else sounds[0][0] elif user_input.isdigit() and 1 <= int(user_input) <= len(sounds): compaction_sound = sounds[int(user_input) - 1][0] else: console.print("[yellow]Invalid selection, using default[/yellow]") compaction_sound = sounds[2][0] if len(sounds) > 2 else sounds[0][0] except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return completion_command = get_sound_command(completion_sound) permission_command = get_sound_command(permission_sound) compaction_command = get_sound_command(compaction_sound) if not completion_command or not permission_command or not compaction_command: console.print("[red]Audio hooks not supported on this platform[/red]") return # Initialize hook structures if "Stop" not in settings["hooks"]: settings["hooks"]["Stop"] = [] if "Notification" not in settings["hooks"]: settings["hooks"]["Notification"] = [] if "PreCompact" not in settings["hooks"]: settings["hooks"]["PreCompact"] = [] # Remove existing audio hooks stop_removed = len(settings["hooks"]["Stop"]) notification_removed = len(settings["hooks"]["Notification"]) precompact_removed = len(settings["hooks"]["PreCompact"]) settings["hooks"]["Stop"] = [ hook for hook in settings["hooks"]["Stop"] if not is_hook(hook) ] # Remove both regular audio hooks and TTS hooks from src.hooks import audio_tts settings["hooks"]["Notification"] = [ hook for hook in settings["hooks"]["Notification"] if not is_hook(hook) and not audio_tts.is_hook(hook) ] settings["hooks"]["PreCompact"] = [ hook for hook in settings["hooks"]["PreCompact"] if not is_hook(hook) and not audio_tts.is_hook(hook) ] stop_removed = stop_removed > len(settings["hooks"]["Stop"]) notification_removed = notification_removed > len(settings["hooks"]["Notification"]) precompact_removed = precompact_removed > len(settings["hooks"]["PreCompact"]) # Add new hooks settings["hooks"]["Stop"].append({ "matcher": "*", "hooks": [{ "type": "command", "command": completion_command }] }) settings["hooks"]["Notification"].append({ "hooks": [{ "type": "command", "command": permission_command }] }) settings["hooks"]["PreCompact"].append({ "hooks": [{ "type": "command", "command": compaction_command }] }) if stop_removed or notification_removed or precompact_removed: console.print("[cyan]Replaced existing audio notification hooks[/cyan]") console.print(f"[green]✓ Successfully configured audio notification hooks[/green]") console.print("\n[bold]What this does:[/bold]") console.print(f" • Completion sound ({completion_sound}): Plays when Claude finishes responding") console.print(f" • Permission sound ({permission_sound}): Plays when Claude requests permission") console.print(f" • Compaction sound ({compaction_sound}): Plays before conversation compaction") console.print(" • All hooks run in the background") def is_hook(hook) -> bool: """ Check if a hook is an audio notification hook. Args: hook: Hook configuration dictionary Returns: True if this is an audio notification hook, False otherwise """ if not isinstance(hook, dict) or "hooks" not in hook: return False for h in hook.get("hooks", []): cmd = h.get("command", "") if any(audio_cmd in cmd for audio_cmd in ["afplay", "powershell", "paplay", "aplay"]): return True return False #endregion ``` -------------------------------------------------------------------------------- /src/cli.py: -------------------------------------------------------------------------------- ```python """ Claude Goblin CLI - Command-line interface using typer. Main entry point for all claude-goblin commands. """ from typing import Optional import typer from rich.console import Console from src.commands import ( usage, update_usage, stats, export, delete_usage, restore_backup, help as help_cmd, limits, status_bar, ) from src.hooks.manager import setup_hooks, remove_hooks # Create typer app app = typer.Typer( name="claude-goblin", help="Python CLI for Claude Code utilities and usage tracking/analytics", add_completion=False, no_args_is_help=True, ) # Create console for commands console = Console() @app.command(name="usage") def usage_command( live: bool = typer.Option(False, "--live", help="Auto-refresh dashboard every 5 seconds"), fast: bool = typer.Option(False, "--fast", help="Skip updates, read from database only (faster)"), anon: bool = typer.Option(False, "--anon", help="Anonymize project names to project-001, project-002, etc"), ): """ Show usage dashboard with KPI cards and breakdowns. Displays comprehensive usage statistics including: - Total tokens, prompts, and sessions - Current usage limits (session, weekly, Opus) - Token breakdown by model - Token breakdown by project Use --live for auto-refreshing dashboard. Use --fast to skip all updates and read from database only (requires existing database). Use --anon to anonymize project names (ranked by usage, project-001 is highest). """ usage.run(console, live=live, fast=fast, anon=anon) @app.command(name="stats") def stats_command( fast: bool = typer.Option(False, "--fast", help="Skip updates, read from database only (faster)"), ): """ Show detailed statistics and cost analysis. Displays comprehensive statistics including: - Summary: total tokens, prompts, responses, sessions, days tracked - Cost analysis: estimated API costs vs Max Plan costs - Averages: tokens per session/response, cost per session/response - Text analysis: prompt length, politeness markers, phrase counts - Usage by model: token distribution across different models Use --fast to skip all updates and read from database only (requires existing database). """ stats.run(console, fast=fast) @app.command(name="limits") def limits_command(): """ Show current usage limits (session, week, Opus). Displays current usage percentages and reset times for: - Session limit (resets after inactivity) - Weekly limit for all models (resets weekly) - Weekly Opus limit (resets weekly) Note: Must be run from a trusted folder where Claude Code has been used. """ limits.run(console) @app.command(name="export") def export_command( svg: bool = typer.Option(False, "--svg", help="Export as SVG instead of PNG"), open_file: bool = typer.Option(False, "--open", help="Open file after export"), fast: bool = typer.Option(False, "--fast", help="Skip updates, read from database only (faster)"), year: Optional[int] = typer.Option(None, "--year", "-y", help="Filter by year (default: current year)"), output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file path"), ): """ Export yearly heatmap as PNG or SVG. Generates a GitHub-style activity heatmap showing your Claude Code usage throughout the year. By default exports as PNG for the current year. Use --fast to skip all updates and read from database only (requires existing database). Examples: ccg export --open Export current year as PNG and open it ccg export --svg Export as SVG instead ccg export --fast Export from database without updating ccg export -y 2024 Export specific year ccg export -o ~/usage.png Specify output path """ # Pass parameters via sys.argv for backward compatibility with export command import sys if svg and "svg" not in sys.argv: sys.argv.append("svg") if open_file and "--open" not in sys.argv: sys.argv.append("--open") if fast and "--fast" not in sys.argv: sys.argv.append("--fast") if year is not None: if "--year" not in sys.argv and "-y" not in sys.argv: sys.argv.extend(["--year", str(year)]) if output is not None: if "--output" not in sys.argv and "-o" not in sys.argv: sys.argv.extend(["--output", output]) export.run(console) @app.command(name="update-usage") def update_usage_command(): """ Update historical database with latest data. This command: 1. Saves current usage data from JSONL files 2. Fills in missing days with zero-usage records 3. Ensures complete date coverage from earliest record to today Useful for ensuring continuous heatmap data without gaps. """ update_usage.run(console) @app.command(name="delete-usage") def delete_usage_command( force: bool = typer.Option(False, "--force", "-f", help="Force deletion without confirmation"), ): """ Delete historical usage database. WARNING: This will permanently delete all historical usage data! Requires --force flag to prevent accidental deletion. A backup is automatically created before deletion. Example: ccg delete-usage --force """ # Pass force flag via command module's own sys.argv check for backward compatibility import sys if force and "--force" not in sys.argv: sys.argv.append("--force") delete_usage.run(console) @app.command(name="restore-backup") def restore_backup_command(): """ Restore database from backup file. Restores the usage history database from a backup file (.db.bak). Creates a safety backup of the current database before restoring. Expected backup location: ~/.claude/usage/usage_history.db.bak """ restore_backup.run(console) @app.command(name="status-bar") def status_bar_command( limit_type: str = typer.Argument("weekly", help="Type of limit to display: session, weekly, or opus"), ): """ Launch macOS menu bar app (macOS only). Displays "CC: XX%" in your menu bar, showing current usage percentage. Updates automatically every 5 minutes. Arguments: limit_type: Which limit to display (session, weekly, or opus). Defaults to weekly. Examples: ccg status-bar weekly Show weekly usage (default) ccg status-bar session Show session usage ccg status-bar opus Show Opus weekly usage Running in background: nohup ccg status-bar weekly > /dev/null 2>&1 & """ if limit_type not in ["session", "weekly", "opus"]: console.print(f"[red]Error: Invalid limit type '{limit_type}'[/red]") console.print("[yellow]Valid types: session, weekly, opus[/yellow]") raise typer.Exit(1) status_bar.run(console, limit_type) @app.command(name="setup-hooks") def setup_hooks_command( hook_type: Optional[str] = typer.Argument(None, help="Hook type: usage, audio, audio-tts, or png"), ): """ Setup Claude Code hooks for automation. Available hooks: - usage: Auto-track usage after each Claude response - audio: Play sounds for completion, permission, and compaction (3 sounds) - audio-tts: Speak messages using TTS with hook selection (macOS only) - png: Auto-update usage PNG after each Claude response Examples: ccg setup-hooks usage Enable automatic usage tracking ccg setup-hooks audio Enable audio notifications (3 sounds) ccg setup-hooks audio-tts Enable TTS (choose which hooks) ccg setup-hooks png Enable automatic PNG exports """ setup_hooks(console, hook_type) @app.command(name="remove-hooks") def remove_hooks_command( hook_type: Optional[str] = typer.Argument(None, help="Hook type to remove: usage, audio, audio-tts, png, or leave empty for all"), ): """ Remove Claude Code hooks configured by this tool. Examples: ccg remove-hooks Remove all hooks ccg remove-hooks usage Remove only usage tracking hook ccg remove-hooks audio Remove only audio notification hook ccg remove-hooks audio-tts Remove only audio TTS hook ccg remove-hooks png Remove only PNG export hook """ remove_hooks(console, hook_type) @app.command(name="help", hidden=True) def help_command(): """ Show detailed help message. Displays comprehensive usage information including: - Available commands and their flags - Key features of the tool - Data sources and storage locations - Recommended setup workflow """ help_cmd.run(console) def main() -> None: """ Main CLI entry point for Claude Goblin Usage tracker. Loads Claude Code usage data and provides commands for viewing, analyzing, and exporting usage statistics. Usage: ccg --help Show available commands ccg usage Show usage dashboard ccg usage --live Show dashboard with auto-refresh ccg stats Show detailed statistics ccg export Export yearly heatmap Exit: Press Ctrl+C to exit """ app() if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/hooks/audio_tts.py: -------------------------------------------------------------------------------- ```python #region Imports import json import subprocess import sys import platform from pathlib import Path from rich.console import Console #endregion #region Functions def setup(console: Console, settings: dict, settings_path: Path) -> None: """ Set up the audio TTS notification hook. Speaks messages using the system's text-to-speech engine (macOS 'say' command). Args: console: Rich console for output settings: Settings dictionary to modify settings_path: Path to settings.json file """ # Check if macOS (currently only supports macOS 'say' command) system = platform.system() if system != "Darwin": console.print("[red]Error: Audio TTS hook is currently only supported on macOS[/red]") console.print("[yellow]Requires the 'say' command which is macOS-specific[/yellow]") return console.print("[bold cyan]Setting up Audio TTS Hook[/bold cyan]\n") console.print("[dim]This hook speaks messages aloud using macOS text-to-speech.[/dim]\n") # Check if regular audio notification hook exists if "Notification" in settings.get("hooks", {}) or "Stop" in settings.get("hooks", {}) or "PreCompact" in settings.get("hooks", {}): from src.hooks import audio existing_audio_hooks = [] for hook_type in ["Notification", "Stop", "PreCompact"]: if hook_type in settings.get("hooks", {}): existing_audio_hooks.extend([hook for hook in settings["hooks"][hook_type] if audio.is_hook(hook)]) if existing_audio_hooks: console.print("[yellow]⚠ Warning: You already have audio notification hooks configured.[/yellow]") console.print("[yellow]Setting up audio-tts will replace them with TTS notifications.[/yellow]\n") console.print("[dim]Continue? (y/n):[/dim] ", end="") try: user_input = input().strip().lower() if user_input != "y": console.print("[yellow]Cancelled[/yellow]") return except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return console.print() # Hook type selection console.print("[bold]Which hooks do you want to enable TTS for?[/bold]") console.print(" 1. Notification only (permission requests) [recommended]") console.print(" 2. Stop only (when Claude finishes responding)") console.print(" 3. PreCompact only (before conversation compaction)") console.print(" 4. Notification + Stop") console.print(" 5. Notification + PreCompact") console.print(" 6. Stop + PreCompact") console.print(" 7. All three (Notification + Stop + PreCompact)") console.print("\n[dim]Enter number (default: 1 - Notification only):[/dim] ", end="") try: user_input = input().strip() if user_input == "" or user_input == "1": hook_types = ["Notification"] elif user_input == "2": hook_types = ["Stop"] elif user_input == "3": hook_types = ["PreCompact"] elif user_input == "4": hook_types = ["Notification", "Stop"] elif user_input == "5": hook_types = ["Notification", "PreCompact"] elif user_input == "6": hook_types = ["Stop", "PreCompact"] elif user_input == "7": hook_types = ["Notification", "Stop", "PreCompact"] else: console.print("[yellow]Invalid selection, using default (Notification only)[/yellow]") hook_types = ["Notification"] except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return console.print() # Voice selection console.print("[bold]Choose a voice for TTS:[/bold]") voices = [ ("Samantha", "Clear, natural female voice (recommended)"), ("Alex", "Clear, natural male voice"), ("Daniel", "British English male voice"), ("Karen", "Australian English female voice"), ("Moira", "Irish English female voice"), ("Fred", "Classic robotic voice"), ("Zarvox", "Sci-fi robotic voice"), ] for idx, (name, desc) in enumerate(voices, 1): console.print(f" {idx}. {name} - {desc}") console.print("\n[dim]Enter number (default: 1 - Samantha):[/dim] ", end="") try: user_input = input().strip() if user_input == "": voice = voices[0][0] elif user_input.isdigit() and 1 <= int(user_input) <= len(voices): voice = voices[int(user_input) - 1][0] else: console.print("[yellow]Invalid selection, using default (Samantha)[/yellow]") voice = voices[0][0] except (EOFError, KeyboardInterrupt): console.print("\n[yellow]Cancelled[/yellow]") return # Get path to the TTS hook script hook_script = Path(__file__).parent / "scripts" / "audio_tts_hook.sh" # Create the hook script if it doesn't exist hook_script.parent.mkdir(parents=True, exist_ok=True) # Write the hook script with selected voice hook_script_content = f"""#!/bin/bash # Audio TTS Hook for Claude Code # Reads hook JSON from stdin and speaks it using macOS 'say' # Read JSON from stdin json_input=$(cat) # Extract the message content from the JSON # Try different fields depending on hook type message=$(echo "$json_input" | python3 -c " import sys import json try: data = json.load(sys.stdin) hook_type = data.get('hook_event_name', '') # Get appropriate message based on hook type if hook_type == 'Notification': msg = data.get('message', 'Claude requesting permission') elif hook_type == 'Stop': msg = 'Claude finished responding' elif hook_type == 'PreCompact': trigger = data.get('trigger', 'unknown') if trigger == 'auto': msg = 'Auto compacting conversation' else: msg = 'Manually compacting conversation' else: msg = data.get('message', 'Claude event') print(msg) except: print('Claude event') ") # Speak the message using macOS 'say' with selected voice (run in background to avoid blocking) echo "$message" | say -v {voice} & # Optional: Log for debugging # echo "$(date): TTS spoke: $message" >> ~/.claude/tts_hook.log """ hook_script.write_text(hook_script_content) hook_script.chmod(0o755) # Make executable # Initialize hook structures for hook_type in ["Notification", "Stop", "PreCompact"]: if hook_type not in settings["hooks"]: settings["hooks"][hook_type] = [] # Remove existing TTS hooks and regular audio hooks from selected hook types removed_count = 0 for hook_type in hook_types: original_count = len(settings["hooks"][hook_type]) settings["hooks"][hook_type] = [ hook for hook in settings["hooks"][hook_type] if not is_hook(hook) and not _is_audio_hook(hook) ] removed_count += original_count - len(settings["hooks"][hook_type]) # Add new TTS hook to selected hook types for hook_type in hook_types: hook_config = { "hooks": [{ "type": "command", "command": str(hook_script.absolute()) }] } # Add matcher for Stop hook if hook_type == "Stop": hook_config["matcher"] = "*" settings["hooks"][hook_type].append(hook_config) if removed_count > 0: console.print(f"[cyan]Replaced {removed_count} existing audio notification hook(s)[/cyan]") console.print(f"[green]✓ Successfully configured audio TTS hooks[/green]") console.print("\n[bold]What this does:[/bold]") for hook_type in hook_types: if hook_type == "Notification": console.print(" • Notification: Speaks permission request messages aloud") elif hook_type == "Stop": console.print(" • Stop: Announces when Claude finishes responding") elif hook_type == "PreCompact": console.print(" • PreCompact: Announces before conversation compaction") console.print(f" • Uses the '{voice}' voice") console.print(" • Runs in background to avoid blocking Claude Code") console.print(f"\n[dim]Hook script: {hook_script}[/dim]") def _is_audio_hook(hook) -> bool: """ Check if a hook is a regular audio notification hook (not TTS). Args: hook: Hook configuration dictionary Returns: True if this is a regular audio notification hook, False otherwise """ if not isinstance(hook, dict) or "hooks" not in hook: return False for h in hook.get("hooks", []): cmd = h.get("command", "") if any(audio_cmd in cmd for audio_cmd in ["afplay", "powershell", "paplay", "aplay"]) and "audio_tts_hook.sh" not in cmd: return True return False def is_hook(hook) -> bool: """ Check if a hook is an audio TTS notification hook. Args: hook: Hook configuration dictionary Returns: True if this is an audio TTS notification hook, False otherwise """ if not isinstance(hook, dict) or "hooks" not in hook: return False for h in hook.get("hooks", []): cmd = h.get("command", "") if "audio_tts_hook.sh" in cmd: return True return False #endregion ``` -------------------------------------------------------------------------------- /src/visualization/activity_graph.py: -------------------------------------------------------------------------------- ```python #region Imports from datetime import datetime, timedelta from typing import Optional from rich.console import Console, Group from rich.panel import Panel from rich.table import Table from rich.text import Text from src.aggregation.daily_stats import AggregatedStats, DailyStats #endregion #region Constants # Claude UI color scheme CLAUDE_BG = "#262624" CLAUDE_TEXT = "#FAF9F5" CLAUDE_TEXT_SECONDARY = "#C2C0B7" CLAUDE_DARK_GREY = "grey15" # Past days with no activity CLAUDE_LIGHT_GREY = "grey50" # Future days # Claude orange base color (fully bright) CLAUDE_ORANGE_RGB = (203, 123, 93) # #CB7B5D # Dot sizes for terminal visualization (smallest to largest) DOT_SIZES = [" ", "·", "•", "●", "⬤"] # Empty space for 0, then dots of increasing size DAYS_OF_WEEK = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] #endregion #region Functions def render_activity_graph(stats: AggregatedStats, console: Console) -> None: """ Render a GitHub-style activity graph to the console. Displays a heatmap of token usage over the past 365 days, along with summary statistics. Args: stats: Aggregated statistics to visualize console: Rich console instance for rendering """ # Create the main layout layout = _create_layout(stats) # Render to console console.clear() console.print(layout) def _create_layout(stats: AggregatedStats) -> Group: """ Create the complete layout with graph and statistics. Args: stats: Aggregated statistics Returns: Rich Group containing all visualization elements """ # Create header header = _create_header(stats.overall_totals, stats.daily_stats) # Create timeline view timeline = _create_timeline_view(stats.daily_stats) # Create statistics table stats_table = _create_stats_table(stats.overall_totals) # Create breakdown tables breakdown = _create_breakdown_tables(stats.overall_totals) return Group( header, Text(""), # Blank line timeline, Text(""), # Blank line stats_table, Text(""), # Blank line breakdown, ) def _create_header(overall: DailyStats, daily_stats: dict[str, DailyStats]) -> Panel: """ Create header panel with title and key metrics. Args: overall: Overall statistics daily_stats: Dictionary of daily statistics to determine date range Returns: Rich Panel with header information """ # Get date range if daily_stats: dates = sorted(daily_stats.keys()) date_range_str = f"{dates[0]} to {dates[-1]}" else: date_range_str = "No data" header_text = Text() header_text.append("Claude Code Usage Tracker", style="bold cyan") header_text.append(f" ({date_range_str})", style="dim") header_text.append("\n") header_text.append(f"Total Tokens: ", style="white") header_text.append(f"{overall.total_tokens:,}", style="bold yellow") header_text.append(" | ", style="dim") header_text.append(f"Prompts: ", style="white") header_text.append(f"{overall.total_prompts:,}", style="bold yellow") header_text.append(" | ", style="dim") header_text.append(f"Sessions: ", style="white") header_text.append(f"{overall.total_sessions:,}", style="bold yellow") header_text.append("\n") header_text.append("Note: Claude Code keeps ~30 days of history (rolling window)", style="dim italic") return Panel(header_text, border_style="cyan") def _create_activity_graph(daily_stats: dict[str, DailyStats]) -> Panel: """ Create the GitHub-style activity heatmap showing full year. Args: daily_stats: Dictionary of daily statistics Returns: Rich Panel containing the activity graph """ # Always show full year: Jan 1 to Dec 31 of current year today = datetime.now().date() start_date = datetime(today.year, 1, 1).date() end_date = datetime(today.year, 12, 31).date() # Calculate max tokens for scaling max_tokens = max( (stats.total_tokens for stats in daily_stats.values()), default=1 ) if daily_stats else 1 # Build weeks structure # GitHub starts weeks on Sunday, so calculate which day of week Jan 1 is # weekday() returns 0=Monday, 6=Sunday # We want 0=Sunday, 6=Saturday jan1_day = (start_date.weekday() + 1) % 7 # Convert to Sunday=0 weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] = [] current_week: list[tuple[Optional[DailyStats], Optional[datetime.date]]] = [] # Pad the first week with None entries before Jan 1 for i in range(jan1_day): # Use None for padding - we'll handle this specially in rendering current_week.append((None, None)) # Now add all days from Jan 1 to Dec 31 current_date = start_date while current_date <= end_date: date_key = current_date.strftime("%Y-%m-%d") day_stats = daily_stats.get(date_key) current_week.append((day_stats, current_date)) # If we've completed a week (Sunday-Saturday), start a new one if len(current_week) == 7: weeks.append(current_week) current_week = [] current_date += timedelta(days=1) # Add any remaining days and pad the final week if current_week: while len(current_week) < 7: # Pad with None for dates after Dec 31 current_week.append((None, None)) weeks.append(current_week) # Create month labels for the top row month_labels = _create_month_labels_github_style(weeks) # Create table for graph with equal spacing between columns # Use width=4 for better spacing and readability table = Table.grid(padding=(0, 0)) table.add_column(justify="right", style=CLAUDE_TEXT_SECONDARY, width=5) # Day labels for _ in range(len(weeks)): table.add_column(justify="center", width=4) # Wider columns for better spacing # Add month labels row at the top with Claude secondary color month_labels_styled = [Text(label, style=CLAUDE_TEXT_SECONDARY) for label in month_labels] table.add_row("", *month_labels_styled) # Show all day labels for clarity with Claude secondary color day_labels = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] # Render each day of week as a row (Sunday=0 to Saturday=6) for day_idx in range(7): row = [Text(day_labels[day_idx], style=CLAUDE_TEXT_SECONDARY)] for week in weeks: if day_idx < len(week): day_stats, date = week[day_idx] cell = _get_intensity_cell(day_stats, max_tokens, date) else: # Shouldn't happen, but handle it anyway cell = Text(" ", style="dim") row.append(cell) table.add_row(*row) # Create legend with dot sizes (skip the first one which is empty space) legend = Text() legend.append("Less ", style=CLAUDE_TEXT_SECONDARY) # Show all dot sizes from smallest to largest (skip index 0 which is empty space) for i, dot in enumerate(DOT_SIZES[1:], start=1): # Map to intensity range intensity = 0.3 + ((i - 1) / (len(DOT_SIZES) - 2)) * 0.7 r = int(CLAUDE_ORANGE_RGB[0] * intensity) g = int(CLAUDE_ORANGE_RGB[1] * intensity) b = int(CLAUDE_ORANGE_RGB[2] * intensity) legend.append(dot, style=f"rgb({r},{g},{b})") legend.append(" ", style="dim") legend.append(" More", style=CLAUDE_TEXT_SECONDARY) # Add contribution count total_days = len([d for w in weeks for d, _ in w if d is not None]) contrib_text = Text() contrib_text.append(f"{total_days} days with activity in {today.year}", style="dim") return Panel( Group(table, Text(""), legend, Text(""), contrib_text), title=f"Activity in {today.year}", border_style="blue", expand=False, # Don't expand to full terminal width width=None, # Let content determine width ) def _create_month_labels_github_style( weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] ) -> list[str]: """ Create month labels for the X-axis in GitHub style. Shows month name at the start of each month that appears in the graph. Args: weeks: List of weeks (each week is a list of day tuples) Returns: List of strings for month labels (one per week column) """ labels: list[str] = [] last_month = None for week_idx, week in enumerate(weeks): # Get the first valid date in this week week_start_month = None month_name = "" for day_stats, date in week: if date is not None: week_start_month = date.month month_name = date.strftime("%b") break # Show month label when month changes, with proper width for new column size if week_start_month and week_start_month != last_month: # Center month abbreviation in 4-char width labels.append(f"{month_name:^4}") last_month = week_start_month else: labels.append(" ") return labels def _create_month_labels( weeks: list[list[tuple[Optional[DailyStats], datetime.date]]], week_dates: list[datetime.date] ) -> list[Text]: """ Create month labels for the X-axis of the activity graph. Args: weeks: List of weeks (each week is a list of day tuples) week_dates: List of dates for the first day of each week Returns: List of Text objects for month labels (one per week column) """ labels: list[Text] = [] last_month = None for week_idx, week_start in enumerate(week_dates): current_month = week_start.strftime("%b") # Show month label on first week or when month changes if last_month != current_month and week_idx < len(weeks): labels.append(Text(current_month[:3], style="dim")) last_month = current_month else: labels.append(Text(" ", style="dim")) return labels def _create_timeline_view(daily_stats: dict[str, DailyStats]) -> Panel: """ Create a detailed timeline view showing daily activity with bar chart. Args: daily_stats: Dictionary of daily statistics Returns: Rich Panel containing timeline visualization """ if not daily_stats: return Panel(Text("No activity data", style="dim"), title="Daily Timeline", border_style="yellow") # Get sorted dates dates = sorted(daily_stats.keys()) # Calculate max for scaling max_prompts = max((stats.total_prompts for stats in daily_stats.values()), default=1) max_tokens = max((stats.total_tokens for stats in daily_stats.values()), default=1) # Create table table = Table(title="Daily Activity Timeline", border_style="yellow", show_header=True) table.add_column("Date", style="cyan", justify="left", width=12) table.add_column("Prompts", style="magenta", justify="right", width=8) table.add_column("Activity", style="green", justify="left", width=40) table.add_column("Tokens", style="yellow", justify="right", width=15) # Show last 15 days with activity recent_dates = dates[-15:] for date in recent_dates: stats = daily_stats[date] # Format date date_obj = datetime.strptime(date, "%Y-%m-%d").date() date_str = date_obj.strftime("%b %d") # Create bar for prompts bar_width = int((stats.total_prompts / max_prompts) * 30) bar = "█" * bar_width # Format tokens (abbreviated) if stats.total_tokens >= 1_000_000: tokens_str = f"{stats.total_tokens / 1_000_000:.1f}M" elif stats.total_tokens >= 1_000: tokens_str = f"{stats.total_tokens / 1_000:.1f}K" else: tokens_str = str(stats.total_tokens) table.add_row( date_str, f"{stats.total_prompts:,}", bar, tokens_str, ) return Panel(table, border_style="yellow") def _get_intensity_cell( day_stats: Optional[DailyStats], max_tokens: int, date: Optional[datetime.date] ) -> Text: """ Get the colored cell for a specific day based on token usage. Uses different-sized dots for terminal, gradient for export. Args: day_stats: Statistics for the day (None if no activity) max_tokens: Maximum tokens in any day (for scaling) date: The date of this cell (None for padding) Returns: Rich Text object with appropriate color and symbol """ if date is None: # Padding cell return Text(" ", style="dim") today = datetime.now().date() # Future days: empty space if date > today: return Text(" ") # Past days with no activity: empty space if not day_stats or day_stats.total_tokens == 0: return Text(" ") # Calculate intensity ratio (0.0 to 1.0) ratio = day_stats.total_tokens / max_tokens if max_tokens > 0 else 0 # Apply non-linear scaling to make differences more visible # Using square root makes lower values more distinguishable ratio = ratio ** 0.5 # Choose dot size based on activity level (1-4, since 0 is empty space) if ratio >= 0.8: dot_idx = 4 # Largest (⬤) elif ratio >= 0.6: dot_idx = 3 # Large (●) elif ratio >= 0.4: dot_idx = 2 # Medium (•) else: dot_idx = 1 # Small (·) - for any activity > 0 dot = DOT_SIZES[dot_idx] # Calculate color intensity base_r, base_g, base_b = CLAUDE_ORANGE_RGB min_intensity = 0.3 intensity = min_intensity + (ratio * (1.0 - min_intensity)) r = int(base_r * intensity) g = int(base_g * intensity) b = int(base_b * intensity) return Text(dot, style=f"rgb({r},{g},{b})") def _create_stats_table(overall: DailyStats) -> Table: """ Create table with detailed token statistics. Args: overall: Overall statistics Returns: Rich Table with token breakdown """ table = Table(title="Token Usage Breakdown", border_style="green") table.add_column("Metric", style="cyan", justify="left") table.add_column("Count", style="yellow", justify="right") table.add_column("Percentage", style="magenta", justify="right") total = overall.total_tokens if overall.total_tokens > 0 else 1 table.add_row( "Input Tokens", f"{overall.input_tokens:,}", f"{(overall.input_tokens / total * 100):.1f}%", ) table.add_row( "Output Tokens", f"{overall.output_tokens:,}", f"{(overall.output_tokens / total * 100):.1f}%", ) table.add_row( "Cache Creation", f"{overall.cache_creation_tokens:,}", f"{(overall.cache_creation_tokens / total * 100):.1f}%", ) table.add_row( "Cache Read", f"{overall.cache_read_tokens:,}", f"{(overall.cache_read_tokens / total * 100):.1f}%", ) table.add_row( "Total", f"{overall.total_tokens:,}", "100.0%", style="bold", ) return table def _create_breakdown_tables(overall: DailyStats) -> Group: """ Create tables showing breakdown by model and folder. Args: overall: Overall statistics Returns: Rich Group containing breakdown tables """ # Models table models_table = Table(title="Models Used", border_style="blue") models_table.add_column("Model", style="cyan") for model in sorted(overall.models): # Shorten long model names for display display_name = model.split("/")[-1] if "/" in model else model models_table.add_row(display_name) # Folders table folders_table = Table(title="Project Folders", border_style="yellow") folders_table.add_column("Folder", style="cyan") for folder in sorted(overall.folders): # Show only last 2 parts of path for brevity parts = folder.split("/") display_name = "/".join(parts[-2:]) if len(parts) > 2 else folder folders_table.add_row(display_name) # Create side-by-side layout using Table.grid layout = Table.grid(padding=(0, 2)) layout.add_column() layout.add_column() layout.add_row(models_table, folders_table) return Group(layout) #endregion ``` -------------------------------------------------------------------------------- /src/visualization/dashboard.py: -------------------------------------------------------------------------------- ```python #region Imports from collections import defaultdict from datetime import datetime from rich.console import Console, Group from rich.panel import Panel from rich.table import Table from rich.text import Text from rich.layout import Layout from rich.progress import Progress, BarColumn, TextColumn from rich.spinner import Spinner from src.aggregation.daily_stats import AggregatedStats from src.models.usage_record import UsageRecord from src.storage.snapshot_db import get_limits_data #endregion #region Constants # Claude-inspired color scheme ORANGE = "#ff8800" CYAN = "cyan" DIM = "grey50" BAR_WIDTH = 20 #endregion #region Functions def _format_number(num: int) -> str: """ Format number with thousands separator and appropriate suffix. Args: num: Number to format Returns: Formatted string (e.g., "1.4bn", "523.7M", "45.2K", "1.234") """ if num >= 1_000_000_000: return f"{num / 1_000_000_000:.1f}bn".replace(".", ".") elif num >= 1_000_000: return f"{num / 1_000_000:.1f}M".replace(".", ".") elif num >= 1_000: return f"{num / 1_000:.1f}K".replace(".", ".") else: # Add thousands separator for numbers < 1000 return f"{num:,}".replace(",", ".") def _create_bar(value: int, max_value: int, width: int = BAR_WIDTH, color: str = ORANGE) -> Text: """ Create a simple text bar for visualization. Args: value: Current value max_value: Maximum value for scaling width: Width of bar in characters color: Color for the filled portion of the bar Returns: Rich Text object with colored bar """ if max_value == 0: return Text("░" * width, style=DIM) filled = int((value / max_value) * width) bar = Text() bar.append("█" * filled, style=color) bar.append("░" * (width - filled), style=DIM) return bar def render_dashboard(stats: AggregatedStats, records: list[UsageRecord], console: Console, skip_limits: bool = False, clear_screen: bool = True, date_range: str = None, limits_from_db: dict | None = None, fast_mode: bool = False) -> None: """ Render a concise, modern dashboard with KPI cards and breakdowns. Args: stats: Aggregated statistics records: Raw usage records for detailed breakdowns console: Rich console for rendering skip_limits: If True, skip fetching current limits for faster display clear_screen: If True, clear the screen before rendering (default True) date_range: Optional date range string to display in footer limits_from_db: Pre-fetched limits from database (avoids live fetch) fast_mode: If True, show warning that data is from last update """ # Create KPI cards with limits (shows spinner if loading limits) kpi_section = _create_kpi_section(stats.overall_totals, skip_limits=skip_limits, console=console, limits_from_db=limits_from_db) # Create breakdowns model_breakdown = _create_model_breakdown(records) project_breakdown = _create_project_breakdown(records) # Create footer with export info and date range footer = _create_footer(date_range, fast_mode=fast_mode) # Optionally clear screen and render all components if clear_screen: console.clear() console.print(kpi_section, end="") console.print() # Blank line between sections console.print(model_breakdown, end="") console.print() # Blank line between sections console.print(project_breakdown, end="") console.print() # Blank line before footer console.print(footer) def _create_kpi_section(overall, skip_limits: bool = False, console: Console = None, limits_from_db: dict | None = None) -> Group: """ Create KPI cards with individual limit boxes beneath each. Args: overall: Overall statistics skip_limits: If True, skip fetching current limits (faster) console: Console instance for showing spinner limits_from_db: Pre-fetched limits from database (avoids live fetch) Returns: Group containing KPI cards and limit boxes """ # Use limits from DB if provided, otherwise fetch live (unless skipped) limits = limits_from_db if limits is None and not skip_limits: from src.commands.limits import capture_limits if console: with console.status(f"[bold {ORANGE}]Loading usage limits...", spinner="dots", spinner_style=ORANGE): limits = capture_limits() else: limits = capture_limits() # Create KPI cards kpi_grid = Table.grid(padding=(0, 2), expand=False) kpi_grid.add_column(justify="center") kpi_grid.add_column(justify="center") kpi_grid.add_column(justify="center") # Total Tokens card tokens_card = Panel( Text(_format_number(overall.total_tokens), style=f"bold {ORANGE}"), title="Total Tokens", border_style="white", width=28, ) # Total Prompts card prompts_card = Panel( Text(_format_number(overall.total_prompts), style="bold white"), title="Prompts Sent", border_style="white", width=28, ) # Total Sessions card sessions_card = Panel( Text(_format_number(overall.total_sessions), style="bold white"), title="Active Sessions", border_style="white", width=28, ) kpi_grid.add_row(tokens_card, prompts_card, sessions_card) # Create individual limit boxes if available if limits and "error" not in limits: limit_grid = Table.grid(padding=(0, 2), expand=False) limit_grid.add_column(justify="center") limit_grid.add_column(justify="center") limit_grid.add_column(justify="center") # Remove timezone info from reset dates session_reset = limits['session_reset'].split(' (')[0] if '(' in limits['session_reset'] else limits['session_reset'] week_reset = limits['week_reset'].split(' (')[0] if '(' in limits['week_reset'] else limits['week_reset'] opus_reset = limits['opus_reset'].split(' (')[0] if '(' in limits['opus_reset'] else limits['opus_reset'] # Session limit box session_bar = _create_bar(limits["session_pct"], 100, width=16, color="red") session_content = Text() session_content.append(f"{limits['session_pct']}% ", style="bold red") session_content.append(session_bar) session_content.append(f"\nResets: {session_reset}", style="white") session_box = Panel( session_content, title="[red]Session Limit", border_style="white", width=28, ) # Week limit box week_bar = _create_bar(limits["week_pct"], 100, width=16, color="red") week_content = Text() week_content.append(f"{limits['week_pct']}% ", style="bold red") week_content.append(week_bar) week_content.append(f"\nResets: {week_reset}", style="white") week_box = Panel( week_content, title="[red]Weekly Limit", border_style="white", width=28, ) # Opus limit box opus_bar = _create_bar(limits["opus_pct"], 100, width=16, color="red") opus_content = Text() opus_content.append(f"{limits['opus_pct']}% ", style="bold red") opus_content.append(opus_bar) opus_content.append(f"\nResets: {opus_reset}", style="white") opus_box = Panel( opus_content, title="[red]Opus Limit", border_style="white", width=28, ) limit_grid.add_row(session_box, week_box, opus_box) # Add spacing between KPI cards and limits with a simple newline spacing = Text("\n") return Group(kpi_grid, spacing, limit_grid) else: return Group(kpi_grid) def _create_kpi_cards(overall) -> Table: """ Create 3 KPI cards showing key metrics. Args: overall: Overall statistics Returns: Table grid with KPI cards """ grid = Table.grid(padding=(0, 2), expand=False) grid.add_column(justify="center") grid.add_column(justify="center") grid.add_column(justify="center") # Total Tokens card tokens_card = Panel( Text.assemble( (_format_number(overall.total_tokens), f"bold {ORANGE}"), "\n", ("Total Tokens", DIM), ), border_style="white", width=28, ) # Total Prompts card prompts_card = Panel( Text.assemble( (_format_number(overall.total_prompts), f"bold {ORANGE}"), "\n", ("Prompts Sent", DIM), ), border_style="white", width=28, ) # Total Sessions card sessions_card = Panel( Text.assemble( (_format_number(overall.total_sessions), f"bold {ORANGE}"), "\n", ("Active Sessions", DIM), ), border_style="white", width=28, ) grid.add_row(tokens_card, prompts_card, sessions_card) return grid def _create_limits_bars() -> Panel | None: """ Create progress bars showing current usage limits. Returns: Panel with limit progress bars, or None if no limits data """ # Try to capture current limits from src.commands.limits import capture_limits limits = capture_limits() if not limits or "error" in limits: return None table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("Label", style="white", justify="left") table.add_column("Bar", justify="left") table.add_column("Percent", style=ORANGE, justify="right") table.add_column("Reset", style=CYAN, justify="left") # Session limit session_bar = _create_bar(limits["session_pct"], 100, width=30) table.add_row( "[bold]Session", session_bar, f"{limits['session_pct']}%", f"resets {limits['session_reset']}", ) # Week limit week_bar = _create_bar(limits["week_pct"], 100, width=30) table.add_row( "[bold]Week", week_bar, f"{limits['week_pct']}%", f"resets {limits['week_reset']}", ) # Opus limit opus_bar = _create_bar(limits["opus_pct"], 100, width=30) table.add_row( "[bold]Opus", opus_bar, f"{limits['opus_pct']}%", f"resets {limits['opus_reset']}", ) return Panel( table, title="[bold]Usage Limits", border_style="white", ) def _create_model_breakdown(records: list[UsageRecord]) -> Panel: """ Create table showing token usage per model. Args: records: List of usage records Returns: Panel with model breakdown table """ # Aggregate tokens by model model_tokens: dict[str, int] = defaultdict(int) for record in records: if record.model and record.token_usage and record.model != "<synthetic>": model_tokens[record.model] += record.token_usage.total_tokens if not model_tokens: return Panel( Text("No model data available", style=DIM), title="[bold]Tokens by Model", border_style="white", ) # Calculate total and max total_tokens = sum(model_tokens.values()) max_tokens = max(model_tokens.values()) # Sort by usage sorted_models = sorted(model_tokens.items(), key=lambda x: x[1], reverse=True) # Create table table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("Model", style="white", justify="left", width=25) table.add_column("Bar", justify="left") table.add_column("Tokens", style=ORANGE, justify="right") table.add_column("Percentage", style=CYAN, justify="right") for model, tokens in sorted_models: # Shorten model name display_name = model.split("/")[-1] if "/" in model else model if "claude" in display_name.lower(): display_name = display_name.replace("claude-", "") percentage = (tokens / total_tokens * 100) if total_tokens > 0 else 0 # Create bar bar = _create_bar(tokens, max_tokens, width=20) table.add_row( display_name, bar, _format_number(tokens), f"{percentage:.1f}%", ) return Panel( table, title="[bold]Tokens by Model", border_style="white", ) def _create_project_breakdown(records: list[UsageRecord]) -> Panel: """ Create table showing token usage per project. Args: records: List of usage records Returns: Panel with project breakdown table """ # Aggregate tokens by folder folder_tokens: dict[str, int] = defaultdict(int) for record in records: if record.token_usage: folder_tokens[record.folder] += record.token_usage.total_tokens if not folder_tokens: return Panel( Text("No project data available", style=DIM), title="[bold]Tokens by Project", border_style="white", ) # Calculate total and max total_tokens = sum(folder_tokens.values()) # Sort by usage sorted_folders = sorted(folder_tokens.items(), key=lambda x: x[1], reverse=True) # Limit to top 10 projects sorted_folders = sorted_folders[:10] max_tokens = max(tokens for _, tokens in sorted_folders) # Create table table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("Project", style="white", justify="left", overflow="crop") table.add_column("Bar", justify="left", overflow="crop") table.add_column("Tokens", style=ORANGE, justify="right") table.add_column("Percentage", style=CYAN, justify="right") for folder, tokens in sorted_folders: # Show only last 2-3 parts of path and truncate if needed parts = folder.split("/") if len(parts) > 3: display_name = ".../" + "/".join(parts[-2:]) elif len(parts) > 2: display_name = "/".join(parts[-2:]) else: display_name = folder # Manually truncate to 35 chars without ellipses if len(display_name) > 35: display_name = display_name[:35] percentage = (tokens / total_tokens * 100) if total_tokens > 0 else 0 # Create bar bar = _create_bar(tokens, max_tokens, width=20) table.add_row( display_name, bar, _format_number(tokens), f"{percentage:.1f}%", ) return Panel( table, title="[bold]Tokens by Project", border_style="white", ) def _create_footer(date_range: str = None, fast_mode: bool = False) -> Text: """ Create footer with export command info and date range. Args: date_range: Optional date range string to display fast_mode: If True, show warning about fast mode Returns: Text with export instructions and date range """ footer = Text() # Add fast mode warning if enabled if fast_mode: from src.storage.snapshot_db import get_database_stats db_stats = get_database_stats() if db_stats.get("newest_timestamp"): # Format ISO timestamp to be more readable timestamp_str = db_stats["newest_timestamp"] try: dt = datetime.fromisoformat(timestamp_str) formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") footer.append("⚠ Fast mode: Reading from last update (", style="bold red") footer.append(f"{formatted_time}", style="bold red") footer.append(")\n\n", style="bold red") except (ValueError, AttributeError): footer.append(f"⚠ Fast mode: Reading from last update ({timestamp_str})\n\n", style="bold red") else: footer.append("⚠ Fast mode: Reading from database (no timestamp available)\n\n", style="bold red") # Add date range if provided if date_range: footer.append("Data range: ", style=DIM) footer.append(f"{date_range}\n", style=f"bold {CYAN}") # Add export tip footer.append("Tip: ", style=DIM) footer.append("View yearly heatmap with ", style=DIM) footer.append("ccg export --open", style=f"bold {CYAN}") return footer #endregion ```