This is page 2 of 2. Use http://codebase.md/data-goblin/claude-goblin?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .gitignore ├── .python-version ├── CHANGELOG.md ├── docs │ ├── commands.md │ ├── images │ │ ├── dashboard.png │ │ ├── heatmap.png │ │ └── status-bar.png │ └── versions │ ├── 0.1.0.md │ ├── 0.1.1.md │ ├── 0.1.2.md │ └── 0.1.3.md ├── LICENSE ├── pyproject.toml ├── README.md └── src ├── __init__.py ├── aggregation │ ├── __init__.py │ ├── daily_stats.py │ └── usage_limits.py ├── cli.py ├── commands │ ├── __init__.py │ ├── delete_usage.py │ ├── export.py │ ├── help.py │ ├── limits.py │ ├── restore_backup.py │ ├── stats.py │ ├── status_bar.py │ ├── update_usage.py │ └── usage.py ├── config │ ├── __init__.py │ ├── settings.py │ └── user_config.py ├── data │ ├── __init__.py │ └── jsonl_parser.py ├── hooks │ ├── __init__.py │ ├── audio_tts.py │ ├── audio.py │ ├── manager.py │ ├── png.py │ ├── scripts │ │ └── audio_tts_hook.sh │ └── usage.py ├── models │ ├── __init__.py │ └── usage_record.py ├── storage │ ├── __init__.py │ └── snapshot_db.py ├── utils │ ├── __init__.py │ ├── _system.py │ └── text_analysis.py └── visualization ├── __init__.py ├── activity_graph.py ├── dashboard.py ├── export.py └── usage_bars.py ``` # Files -------------------------------------------------------------------------------- /src/hooks/audio_tts.py: -------------------------------------------------------------------------------- ```python 1 | #region Imports 2 | import json 3 | import subprocess 4 | import sys 5 | import platform 6 | from pathlib import Path 7 | 8 | from rich.console import Console 9 | #endregion 10 | 11 | 12 | #region Functions 13 | 14 | 15 | def setup(console: Console, settings: dict, settings_path: Path) -> None: 16 | """ 17 | Set up the audio TTS notification hook. 18 | 19 | Speaks messages using the system's text-to-speech engine (macOS 'say' command). 20 | 21 | Args: 22 | console: Rich console for output 23 | settings: Settings dictionary to modify 24 | settings_path: Path to settings.json file 25 | """ 26 | # Check if macOS (currently only supports macOS 'say' command) 27 | system = platform.system() 28 | if system != "Darwin": 29 | console.print("[red]Error: Audio TTS hook is currently only supported on macOS[/red]") 30 | console.print("[yellow]Requires the 'say' command which is macOS-specific[/yellow]") 31 | return 32 | 33 | console.print("[bold cyan]Setting up Audio TTS Hook[/bold cyan]\n") 34 | console.print("[dim]This hook speaks messages aloud using macOS text-to-speech.[/dim]\n") 35 | 36 | # Check if regular audio notification hook exists 37 | if "Notification" in settings.get("hooks", {}) or "Stop" in settings.get("hooks", {}) or "PreCompact" in settings.get("hooks", {}): 38 | from src.hooks import audio 39 | existing_audio_hooks = [] 40 | for hook_type in ["Notification", "Stop", "PreCompact"]: 41 | if hook_type in settings.get("hooks", {}): 42 | existing_audio_hooks.extend([hook for hook in settings["hooks"][hook_type] if audio.is_hook(hook)]) 43 | 44 | if existing_audio_hooks: 45 | console.print("[yellow]⚠ Warning: You already have audio notification hooks configured.[/yellow]") 46 | console.print("[yellow]Setting up audio-tts will replace them with TTS notifications.[/yellow]\n") 47 | console.print("[dim]Continue? (y/n):[/dim] ", end="") 48 | try: 49 | user_input = input().strip().lower() 50 | if user_input != "y": 51 | console.print("[yellow]Cancelled[/yellow]") 52 | return 53 | except (EOFError, KeyboardInterrupt): 54 | console.print("\n[yellow]Cancelled[/yellow]") 55 | return 56 | console.print() 57 | 58 | # Hook type selection 59 | console.print("[bold]Which hooks do you want to enable TTS for?[/bold]") 60 | console.print(" 1. Notification only (permission requests) [recommended]") 61 | console.print(" 2. Stop only (when Claude finishes responding)") 62 | console.print(" 3. PreCompact only (before conversation compaction)") 63 | console.print(" 4. Notification + Stop") 64 | console.print(" 5. Notification + PreCompact") 65 | console.print(" 6. Stop + PreCompact") 66 | console.print(" 7. All three (Notification + Stop + PreCompact)") 67 | 68 | console.print("\n[dim]Enter number (default: 1 - Notification only):[/dim] ", end="") 69 | 70 | try: 71 | user_input = input().strip() 72 | if user_input == "" or user_input == "1": 73 | hook_types = ["Notification"] 74 | elif user_input == "2": 75 | hook_types = ["Stop"] 76 | elif user_input == "3": 77 | hook_types = ["PreCompact"] 78 | elif user_input == "4": 79 | hook_types = ["Notification", "Stop"] 80 | elif user_input == "5": 81 | hook_types = ["Notification", "PreCompact"] 82 | elif user_input == "6": 83 | hook_types = ["Stop", "PreCompact"] 84 | elif user_input == "7": 85 | hook_types = ["Notification", "Stop", "PreCompact"] 86 | else: 87 | console.print("[yellow]Invalid selection, using default (Notification only)[/yellow]") 88 | hook_types = ["Notification"] 89 | except (EOFError, KeyboardInterrupt): 90 | console.print("\n[yellow]Cancelled[/yellow]") 91 | return 92 | 93 | console.print() 94 | 95 | # Voice selection 96 | console.print("[bold]Choose a voice for TTS:[/bold]") 97 | voices = [ 98 | ("Samantha", "Clear, natural female voice (recommended)"), 99 | ("Alex", "Clear, natural male voice"), 100 | ("Daniel", "British English male voice"), 101 | ("Karen", "Australian English female voice"), 102 | ("Moira", "Irish English female voice"), 103 | ("Fred", "Classic robotic voice"), 104 | ("Zarvox", "Sci-fi robotic voice"), 105 | ] 106 | 107 | for idx, (name, desc) in enumerate(voices, 1): 108 | console.print(f" {idx}. {name} - {desc}") 109 | 110 | console.print("\n[dim]Enter number (default: 1 - Samantha):[/dim] ", end="") 111 | 112 | try: 113 | user_input = input().strip() 114 | if user_input == "": 115 | voice = voices[0][0] 116 | elif user_input.isdigit() and 1 <= int(user_input) <= len(voices): 117 | voice = voices[int(user_input) - 1][0] 118 | else: 119 | console.print("[yellow]Invalid selection, using default (Samantha)[/yellow]") 120 | voice = voices[0][0] 121 | except (EOFError, KeyboardInterrupt): 122 | console.print("\n[yellow]Cancelled[/yellow]") 123 | return 124 | 125 | # Get path to the TTS hook script 126 | hook_script = Path(__file__).parent / "scripts" / "audio_tts_hook.sh" 127 | 128 | # Create the hook script if it doesn't exist 129 | hook_script.parent.mkdir(parents=True, exist_ok=True) 130 | 131 | # Write the hook script with selected voice 132 | hook_script_content = f"""#!/bin/bash 133 | # Audio TTS Hook for Claude Code 134 | # Reads hook JSON from stdin and speaks it using macOS 'say' 135 | 136 | # Read JSON from stdin 137 | json_input=$(cat) 138 | 139 | # Extract the message content from the JSON 140 | # Try different fields depending on hook type 141 | message=$(echo "$json_input" | python3 -c " 142 | import sys 143 | import json 144 | try: 145 | data = json.load(sys.stdin) 146 | hook_type = data.get('hook_event_name', '') 147 | 148 | # Get appropriate message based on hook type 149 | if hook_type == 'Notification': 150 | msg = data.get('message', 'Claude requesting permission') 151 | elif hook_type == 'Stop': 152 | msg = 'Claude finished responding' 153 | elif hook_type == 'PreCompact': 154 | trigger = data.get('trigger', 'unknown') 155 | if trigger == 'auto': 156 | msg = 'Auto compacting conversation' 157 | else: 158 | msg = 'Manually compacting conversation' 159 | else: 160 | msg = data.get('message', 'Claude event') 161 | 162 | print(msg) 163 | except: 164 | print('Claude event') 165 | ") 166 | 167 | # Speak the message using macOS 'say' with selected voice (run in background to avoid blocking) 168 | echo "$message" | say -v {voice} & 169 | 170 | # Optional: Log for debugging 171 | # echo "$(date): TTS spoke: $message" >> ~/.claude/tts_hook.log 172 | """ 173 | 174 | hook_script.write_text(hook_script_content) 175 | hook_script.chmod(0o755) # Make executable 176 | 177 | # Initialize hook structures 178 | for hook_type in ["Notification", "Stop", "PreCompact"]: 179 | if hook_type not in settings["hooks"]: 180 | settings["hooks"][hook_type] = [] 181 | 182 | # Remove existing TTS hooks and regular audio hooks from selected hook types 183 | removed_count = 0 184 | for hook_type in hook_types: 185 | original_count = len(settings["hooks"][hook_type]) 186 | settings["hooks"][hook_type] = [ 187 | hook for hook in settings["hooks"][hook_type] 188 | if not is_hook(hook) and not _is_audio_hook(hook) 189 | ] 190 | removed_count += original_count - len(settings["hooks"][hook_type]) 191 | 192 | # Add new TTS hook to selected hook types 193 | for hook_type in hook_types: 194 | hook_config = { 195 | "hooks": [{ 196 | "type": "command", 197 | "command": str(hook_script.absolute()) 198 | }] 199 | } 200 | 201 | # Add matcher for Stop hook 202 | if hook_type == "Stop": 203 | hook_config["matcher"] = "*" 204 | 205 | settings["hooks"][hook_type].append(hook_config) 206 | 207 | if removed_count > 0: 208 | console.print(f"[cyan]Replaced {removed_count} existing audio notification hook(s)[/cyan]") 209 | 210 | console.print(f"[green]✓ Successfully configured audio TTS hooks[/green]") 211 | console.print("\n[bold]What this does:[/bold]") 212 | for hook_type in hook_types: 213 | if hook_type == "Notification": 214 | console.print(" • Notification: Speaks permission request messages aloud") 215 | elif hook_type == "Stop": 216 | console.print(" • Stop: Announces when Claude finishes responding") 217 | elif hook_type == "PreCompact": 218 | console.print(" • PreCompact: Announces before conversation compaction") 219 | console.print(f" • Uses the '{voice}' voice") 220 | console.print(" • Runs in background to avoid blocking Claude Code") 221 | console.print(f"\n[dim]Hook script: {hook_script}[/dim]") 222 | 223 | 224 | def _is_audio_hook(hook) -> bool: 225 | """ 226 | Check if a hook is a regular audio notification hook (not TTS). 227 | 228 | Args: 229 | hook: Hook configuration dictionary 230 | 231 | Returns: 232 | True if this is a regular audio notification hook, False otherwise 233 | """ 234 | if not isinstance(hook, dict) or "hooks" not in hook: 235 | return False 236 | for h in hook.get("hooks", []): 237 | cmd = h.get("command", "") 238 | if any(audio_cmd in cmd for audio_cmd in ["afplay", "powershell", "paplay", "aplay"]) and "audio_tts_hook.sh" not in cmd: 239 | return True 240 | return False 241 | 242 | 243 | def is_hook(hook) -> bool: 244 | """ 245 | Check if a hook is an audio TTS notification hook. 246 | 247 | Args: 248 | hook: Hook configuration dictionary 249 | 250 | Returns: 251 | True if this is an audio TTS notification hook, False otherwise 252 | """ 253 | if not isinstance(hook, dict) or "hooks" not in hook: 254 | return False 255 | for h in hook.get("hooks", []): 256 | cmd = h.get("command", "") 257 | if "audio_tts_hook.sh" in cmd: 258 | return True 259 | return False 260 | 261 | 262 | #endregion 263 | ``` -------------------------------------------------------------------------------- /src/visualization/activity_graph.py: -------------------------------------------------------------------------------- ```python 1 | #region Imports 2 | from datetime import datetime, timedelta 3 | from typing import Optional 4 | 5 | from rich.console import Console, Group 6 | from rich.panel import Panel 7 | from rich.table import Table 8 | from rich.text import Text 9 | 10 | from src.aggregation.daily_stats import AggregatedStats, DailyStats 11 | #endregion 12 | 13 | 14 | #region Constants 15 | # Claude UI color scheme 16 | CLAUDE_BG = "#262624" 17 | CLAUDE_TEXT = "#FAF9F5" 18 | CLAUDE_TEXT_SECONDARY = "#C2C0B7" 19 | CLAUDE_DARK_GREY = "grey15" # Past days with no activity 20 | CLAUDE_LIGHT_GREY = "grey50" # Future days 21 | 22 | # Claude orange base color (fully bright) 23 | CLAUDE_ORANGE_RGB = (203, 123, 93) # #CB7B5D 24 | 25 | # Dot sizes for terminal visualization (smallest to largest) 26 | DOT_SIZES = [" ", "·", "•", "●", "⬤"] # Empty space for 0, then dots of increasing size 27 | 28 | DAYS_OF_WEEK = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] 29 | #endregion 30 | 31 | 32 | #region Functions 33 | 34 | 35 | def render_activity_graph(stats: AggregatedStats, console: Console) -> None: 36 | """ 37 | Render a GitHub-style activity graph to the console. 38 | 39 | Displays a heatmap of token usage over the past 365 days, 40 | along with summary statistics. 41 | 42 | Args: 43 | stats: Aggregated statistics to visualize 44 | console: Rich console instance for rendering 45 | """ 46 | # Create the main layout 47 | layout = _create_layout(stats) 48 | 49 | # Render to console 50 | console.clear() 51 | console.print(layout) 52 | 53 | 54 | def _create_layout(stats: AggregatedStats) -> Group: 55 | """ 56 | Create the complete layout with graph and statistics. 57 | 58 | Args: 59 | stats: Aggregated statistics 60 | 61 | Returns: 62 | Rich Group containing all visualization elements 63 | """ 64 | # Create header 65 | header = _create_header(stats.overall_totals, stats.daily_stats) 66 | 67 | # Create timeline view 68 | timeline = _create_timeline_view(stats.daily_stats) 69 | 70 | # Create statistics table 71 | stats_table = _create_stats_table(stats.overall_totals) 72 | 73 | # Create breakdown tables 74 | breakdown = _create_breakdown_tables(stats.overall_totals) 75 | 76 | return Group( 77 | header, 78 | Text(""), # Blank line 79 | timeline, 80 | Text(""), # Blank line 81 | stats_table, 82 | Text(""), # Blank line 83 | breakdown, 84 | ) 85 | 86 | 87 | def _create_header(overall: DailyStats, daily_stats: dict[str, DailyStats]) -> Panel: 88 | """ 89 | Create header panel with title and key metrics. 90 | 91 | Args: 92 | overall: Overall statistics 93 | daily_stats: Dictionary of daily statistics to determine date range 94 | 95 | Returns: 96 | Rich Panel with header information 97 | """ 98 | # Get date range 99 | if daily_stats: 100 | dates = sorted(daily_stats.keys()) 101 | date_range_str = f"{dates[0]} to {dates[-1]}" 102 | else: 103 | date_range_str = "No data" 104 | 105 | header_text = Text() 106 | header_text.append("Claude Code Usage Tracker", style="bold cyan") 107 | header_text.append(f" ({date_range_str})", style="dim") 108 | header_text.append("\n") 109 | header_text.append(f"Total Tokens: ", style="white") 110 | header_text.append(f"{overall.total_tokens:,}", style="bold yellow") 111 | header_text.append(" | ", style="dim") 112 | header_text.append(f"Prompts: ", style="white") 113 | header_text.append(f"{overall.total_prompts:,}", style="bold yellow") 114 | header_text.append(" | ", style="dim") 115 | header_text.append(f"Sessions: ", style="white") 116 | header_text.append(f"{overall.total_sessions:,}", style="bold yellow") 117 | header_text.append("\n") 118 | header_text.append("Note: Claude Code keeps ~30 days of history (rolling window)", style="dim italic") 119 | 120 | return Panel(header_text, border_style="cyan") 121 | 122 | 123 | def _create_activity_graph(daily_stats: dict[str, DailyStats]) -> Panel: 124 | """ 125 | Create the GitHub-style activity heatmap showing full year. 126 | 127 | Args: 128 | daily_stats: Dictionary of daily statistics 129 | 130 | Returns: 131 | Rich Panel containing the activity graph 132 | """ 133 | # Always show full year: Jan 1 to Dec 31 of current year 134 | today = datetime.now().date() 135 | start_date = datetime(today.year, 1, 1).date() 136 | end_date = datetime(today.year, 12, 31).date() 137 | 138 | # Calculate max tokens for scaling 139 | max_tokens = max( 140 | (stats.total_tokens for stats in daily_stats.values()), default=1 141 | ) if daily_stats else 1 142 | 143 | # Build weeks structure 144 | # GitHub starts weeks on Sunday, so calculate which day of week Jan 1 is 145 | # weekday() returns 0=Monday, 6=Sunday 146 | # We want 0=Sunday, 6=Saturday 147 | jan1_day = (start_date.weekday() + 1) % 7 # Convert to Sunday=0 148 | 149 | weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] = [] 150 | current_week: list[tuple[Optional[DailyStats], Optional[datetime.date]]] = [] 151 | 152 | # Pad the first week with None entries before Jan 1 153 | for i in range(jan1_day): 154 | # Use None for padding - we'll handle this specially in rendering 155 | current_week.append((None, None)) 156 | 157 | # Now add all days from Jan 1 to Dec 31 158 | current_date = start_date 159 | while current_date <= end_date: 160 | date_key = current_date.strftime("%Y-%m-%d") 161 | day_stats = daily_stats.get(date_key) 162 | current_week.append((day_stats, current_date)) 163 | 164 | # If we've completed a week (Sunday-Saturday), start a new one 165 | if len(current_week) == 7: 166 | weeks.append(current_week) 167 | current_week = [] 168 | 169 | current_date += timedelta(days=1) 170 | 171 | # Add any remaining days and pad the final week 172 | if current_week: 173 | while len(current_week) < 7: 174 | # Pad with None for dates after Dec 31 175 | current_week.append((None, None)) 176 | weeks.append(current_week) 177 | 178 | # Create month labels for the top row 179 | month_labels = _create_month_labels_github_style(weeks) 180 | 181 | # Create table for graph with equal spacing between columns 182 | # Use width=4 for better spacing and readability 183 | table = Table.grid(padding=(0, 0)) 184 | table.add_column(justify="right", style=CLAUDE_TEXT_SECONDARY, width=5) # Day labels 185 | 186 | for _ in range(len(weeks)): 187 | table.add_column(justify="center", width=4) # Wider columns for better spacing 188 | 189 | # Add month labels row at the top with Claude secondary color 190 | month_labels_styled = [Text(label, style=CLAUDE_TEXT_SECONDARY) for label in month_labels] 191 | table.add_row("", *month_labels_styled) 192 | 193 | # Show all day labels for clarity with Claude secondary color 194 | day_labels = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] 195 | 196 | # Render each day of week as a row (Sunday=0 to Saturday=6) 197 | for day_idx in range(7): 198 | row = [Text(day_labels[day_idx], style=CLAUDE_TEXT_SECONDARY)] 199 | 200 | for week in weeks: 201 | if day_idx < len(week): 202 | day_stats, date = week[day_idx] 203 | cell = _get_intensity_cell(day_stats, max_tokens, date) 204 | else: 205 | # Shouldn't happen, but handle it anyway 206 | cell = Text(" ", style="dim") 207 | row.append(cell) 208 | 209 | table.add_row(*row) 210 | 211 | # Create legend with dot sizes (skip the first one which is empty space) 212 | legend = Text() 213 | legend.append("Less ", style=CLAUDE_TEXT_SECONDARY) 214 | # Show all dot sizes from smallest to largest (skip index 0 which is empty space) 215 | for i, dot in enumerate(DOT_SIZES[1:], start=1): 216 | # Map to intensity range 217 | intensity = 0.3 + ((i - 1) / (len(DOT_SIZES) - 2)) * 0.7 218 | r = int(CLAUDE_ORANGE_RGB[0] * intensity) 219 | g = int(CLAUDE_ORANGE_RGB[1] * intensity) 220 | b = int(CLAUDE_ORANGE_RGB[2] * intensity) 221 | legend.append(dot, style=f"rgb({r},{g},{b})") 222 | legend.append(" ", style="dim") 223 | legend.append(" More", style=CLAUDE_TEXT_SECONDARY) 224 | 225 | # Add contribution count 226 | total_days = len([d for w in weeks for d, _ in w if d is not None]) 227 | contrib_text = Text() 228 | contrib_text.append(f"{total_days} days with activity in {today.year}", style="dim") 229 | 230 | return Panel( 231 | Group(table, Text(""), legend, Text(""), contrib_text), 232 | title=f"Activity in {today.year}", 233 | border_style="blue", 234 | expand=False, # Don't expand to full terminal width 235 | width=None, # Let content determine width 236 | ) 237 | 238 | 239 | def _create_month_labels_github_style( 240 | weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] 241 | ) -> list[str]: 242 | """ 243 | Create month labels for the X-axis in GitHub style. 244 | 245 | Shows month name at the start of each month that appears in the graph. 246 | 247 | Args: 248 | weeks: List of weeks (each week is a list of day tuples) 249 | 250 | Returns: 251 | List of strings for month labels (one per week column) 252 | """ 253 | labels: list[str] = [] 254 | last_month = None 255 | 256 | for week_idx, week in enumerate(weeks): 257 | # Get the first valid date in this week 258 | week_start_month = None 259 | month_name = "" 260 | for day_stats, date in week: 261 | if date is not None: 262 | week_start_month = date.month 263 | month_name = date.strftime("%b") 264 | break 265 | 266 | # Show month label when month changes, with proper width for new column size 267 | if week_start_month and week_start_month != last_month: 268 | # Center month abbreviation in 4-char width 269 | labels.append(f"{month_name:^4}") 270 | last_month = week_start_month 271 | else: 272 | labels.append(" ") 273 | 274 | return labels 275 | 276 | 277 | def _create_month_labels( 278 | weeks: list[list[tuple[Optional[DailyStats], datetime.date]]], 279 | week_dates: list[datetime.date] 280 | ) -> list[Text]: 281 | """ 282 | Create month labels for the X-axis of the activity graph. 283 | 284 | Args: 285 | weeks: List of weeks (each week is a list of day tuples) 286 | week_dates: List of dates for the first day of each week 287 | 288 | Returns: 289 | List of Text objects for month labels (one per week column) 290 | """ 291 | labels: list[Text] = [] 292 | last_month = None 293 | 294 | for week_idx, week_start in enumerate(week_dates): 295 | current_month = week_start.strftime("%b") 296 | 297 | # Show month label on first week or when month changes 298 | if last_month != current_month and week_idx < len(weeks): 299 | labels.append(Text(current_month[:3], style="dim")) 300 | last_month = current_month 301 | else: 302 | labels.append(Text(" ", style="dim")) 303 | 304 | return labels 305 | 306 | 307 | def _create_timeline_view(daily_stats: dict[str, DailyStats]) -> Panel: 308 | """ 309 | Create a detailed timeline view showing daily activity with bar chart. 310 | 311 | Args: 312 | daily_stats: Dictionary of daily statistics 313 | 314 | Returns: 315 | Rich Panel containing timeline visualization 316 | """ 317 | if not daily_stats: 318 | return Panel(Text("No activity data", style="dim"), title="Daily Timeline", border_style="yellow") 319 | 320 | # Get sorted dates 321 | dates = sorted(daily_stats.keys()) 322 | 323 | # Calculate max for scaling 324 | max_prompts = max((stats.total_prompts for stats in daily_stats.values()), default=1) 325 | max_tokens = max((stats.total_tokens for stats in daily_stats.values()), default=1) 326 | 327 | # Create table 328 | table = Table(title="Daily Activity Timeline", border_style="yellow", show_header=True) 329 | table.add_column("Date", style="cyan", justify="left", width=12) 330 | table.add_column("Prompts", style="magenta", justify="right", width=8) 331 | table.add_column("Activity", style="green", justify="left", width=40) 332 | table.add_column("Tokens", style="yellow", justify="right", width=15) 333 | 334 | # Show last 15 days with activity 335 | recent_dates = dates[-15:] 336 | 337 | for date in recent_dates: 338 | stats = daily_stats[date] 339 | 340 | # Format date 341 | date_obj = datetime.strptime(date, "%Y-%m-%d").date() 342 | date_str = date_obj.strftime("%b %d") 343 | 344 | # Create bar for prompts 345 | bar_width = int((stats.total_prompts / max_prompts) * 30) 346 | bar = "█" * bar_width 347 | 348 | # Format tokens (abbreviated) 349 | if stats.total_tokens >= 1_000_000: 350 | tokens_str = f"{stats.total_tokens / 1_000_000:.1f}M" 351 | elif stats.total_tokens >= 1_000: 352 | tokens_str = f"{stats.total_tokens / 1_000:.1f}K" 353 | else: 354 | tokens_str = str(stats.total_tokens) 355 | 356 | table.add_row( 357 | date_str, 358 | f"{stats.total_prompts:,}", 359 | bar, 360 | tokens_str, 361 | ) 362 | 363 | return Panel(table, border_style="yellow") 364 | 365 | 366 | def _get_intensity_cell( 367 | day_stats: Optional[DailyStats], max_tokens: int, date: Optional[datetime.date] 368 | ) -> Text: 369 | """ 370 | Get the colored cell for a specific day based on token usage. 371 | Uses different-sized dots for terminal, gradient for export. 372 | 373 | Args: 374 | day_stats: Statistics for the day (None if no activity) 375 | max_tokens: Maximum tokens in any day (for scaling) 376 | date: The date of this cell (None for padding) 377 | 378 | Returns: 379 | Rich Text object with appropriate color and symbol 380 | """ 381 | if date is None: 382 | # Padding cell 383 | return Text(" ", style="dim") 384 | 385 | today = datetime.now().date() 386 | 387 | # Future days: empty space 388 | if date > today: 389 | return Text(" ") 390 | 391 | # Past days with no activity: empty space 392 | if not day_stats or day_stats.total_tokens == 0: 393 | return Text(" ") 394 | 395 | # Calculate intensity ratio (0.0 to 1.0) 396 | ratio = day_stats.total_tokens / max_tokens if max_tokens > 0 else 0 397 | 398 | # Apply non-linear scaling to make differences more visible 399 | # Using square root makes lower values more distinguishable 400 | ratio = ratio ** 0.5 401 | 402 | # Choose dot size based on activity level (1-4, since 0 is empty space) 403 | if ratio >= 0.8: 404 | dot_idx = 4 # Largest (⬤) 405 | elif ratio >= 0.6: 406 | dot_idx = 3 # Large (●) 407 | elif ratio >= 0.4: 408 | dot_idx = 2 # Medium (•) 409 | else: 410 | dot_idx = 1 # Small (·) - for any activity > 0 411 | 412 | dot = DOT_SIZES[dot_idx] 413 | 414 | # Calculate color intensity 415 | base_r, base_g, base_b = CLAUDE_ORANGE_RGB 416 | min_intensity = 0.3 417 | intensity = min_intensity + (ratio * (1.0 - min_intensity)) 418 | 419 | r = int(base_r * intensity) 420 | g = int(base_g * intensity) 421 | b = int(base_b * intensity) 422 | 423 | return Text(dot, style=f"rgb({r},{g},{b})") 424 | 425 | 426 | def _create_stats_table(overall: DailyStats) -> Table: 427 | """ 428 | Create table with detailed token statistics. 429 | 430 | Args: 431 | overall: Overall statistics 432 | 433 | Returns: 434 | Rich Table with token breakdown 435 | """ 436 | table = Table(title="Token Usage Breakdown", border_style="green") 437 | 438 | table.add_column("Metric", style="cyan", justify="left") 439 | table.add_column("Count", style="yellow", justify="right") 440 | table.add_column("Percentage", style="magenta", justify="right") 441 | 442 | total = overall.total_tokens if overall.total_tokens > 0 else 1 443 | 444 | table.add_row( 445 | "Input Tokens", 446 | f"{overall.input_tokens:,}", 447 | f"{(overall.input_tokens / total * 100):.1f}%", 448 | ) 449 | table.add_row( 450 | "Output Tokens", 451 | f"{overall.output_tokens:,}", 452 | f"{(overall.output_tokens / total * 100):.1f}%", 453 | ) 454 | table.add_row( 455 | "Cache Creation", 456 | f"{overall.cache_creation_tokens:,}", 457 | f"{(overall.cache_creation_tokens / total * 100):.1f}%", 458 | ) 459 | table.add_row( 460 | "Cache Read", 461 | f"{overall.cache_read_tokens:,}", 462 | f"{(overall.cache_read_tokens / total * 100):.1f}%", 463 | ) 464 | table.add_row( 465 | "Total", 466 | f"{overall.total_tokens:,}", 467 | "100.0%", 468 | style="bold", 469 | ) 470 | 471 | return table 472 | 473 | 474 | def _create_breakdown_tables(overall: DailyStats) -> Group: 475 | """ 476 | Create tables showing breakdown by model and folder. 477 | 478 | Args: 479 | overall: Overall statistics 480 | 481 | Returns: 482 | Rich Group containing breakdown tables 483 | """ 484 | # Models table 485 | models_table = Table(title="Models Used", border_style="blue") 486 | models_table.add_column("Model", style="cyan") 487 | for model in sorted(overall.models): 488 | # Shorten long model names for display 489 | display_name = model.split("/")[-1] if "/" in model else model 490 | models_table.add_row(display_name) 491 | 492 | # Folders table 493 | folders_table = Table(title="Project Folders", border_style="yellow") 494 | folders_table.add_column("Folder", style="cyan") 495 | for folder in sorted(overall.folders): 496 | # Show only last 2 parts of path for brevity 497 | parts = folder.split("/") 498 | display_name = "/".join(parts[-2:]) if len(parts) > 2 else folder 499 | folders_table.add_row(display_name) 500 | 501 | # Create side-by-side layout using Table.grid 502 | layout = Table.grid(padding=(0, 2)) 503 | layout.add_column() 504 | layout.add_column() 505 | layout.add_row(models_table, folders_table) 506 | 507 | return Group(layout) 508 | #endregion 509 | ``` -------------------------------------------------------------------------------- /src/visualization/dashboard.py: -------------------------------------------------------------------------------- ```python 1 | #region Imports 2 | from collections import defaultdict 3 | from datetime import datetime 4 | 5 | from rich.console import Console, Group 6 | from rich.panel import Panel 7 | from rich.table import Table 8 | from rich.text import Text 9 | from rich.layout import Layout 10 | from rich.progress import Progress, BarColumn, TextColumn 11 | from rich.spinner import Spinner 12 | 13 | from src.aggregation.daily_stats import AggregatedStats 14 | from src.models.usage_record import UsageRecord 15 | from src.storage.snapshot_db import get_limits_data 16 | #endregion 17 | 18 | 19 | #region Constants 20 | # Claude-inspired color scheme 21 | ORANGE = "#ff8800" 22 | CYAN = "cyan" 23 | DIM = "grey50" 24 | BAR_WIDTH = 20 25 | #endregion 26 | 27 | 28 | #region Functions 29 | 30 | 31 | def _format_number(num: int) -> str: 32 | """ 33 | Format number with thousands separator and appropriate suffix. 34 | 35 | Args: 36 | num: Number to format 37 | 38 | Returns: 39 | Formatted string (e.g., "1.4bn", "523.7M", "45.2K", "1.234") 40 | """ 41 | if num >= 1_000_000_000: 42 | return f"{num / 1_000_000_000:.1f}bn".replace(".", ".") 43 | elif num >= 1_000_000: 44 | return f"{num / 1_000_000:.1f}M".replace(".", ".") 45 | elif num >= 1_000: 46 | return f"{num / 1_000:.1f}K".replace(".", ".") 47 | else: 48 | # Add thousands separator for numbers < 1000 49 | return f"{num:,}".replace(",", ".") 50 | 51 | 52 | def _create_bar(value: int, max_value: int, width: int = BAR_WIDTH, color: str = ORANGE) -> Text: 53 | """ 54 | Create a simple text bar for visualization. 55 | 56 | Args: 57 | value: Current value 58 | max_value: Maximum value for scaling 59 | width: Width of bar in characters 60 | color: Color for the filled portion of the bar 61 | 62 | Returns: 63 | Rich Text object with colored bar 64 | """ 65 | if max_value == 0: 66 | return Text("░" * width, style=DIM) 67 | 68 | filled = int((value / max_value) * width) 69 | bar = Text() 70 | bar.append("█" * filled, style=color) 71 | bar.append("░" * (width - filled), style=DIM) 72 | return bar 73 | 74 | 75 | def render_dashboard(stats: AggregatedStats, records: list[UsageRecord], console: Console, skip_limits: bool = False, clear_screen: bool = True, date_range: str = None, limits_from_db: dict | None = None, fast_mode: bool = False) -> None: 76 | """ 77 | Render a concise, modern dashboard with KPI cards and breakdowns. 78 | 79 | Args: 80 | stats: Aggregated statistics 81 | records: Raw usage records for detailed breakdowns 82 | console: Rich console for rendering 83 | skip_limits: If True, skip fetching current limits for faster display 84 | clear_screen: If True, clear the screen before rendering (default True) 85 | date_range: Optional date range string to display in footer 86 | limits_from_db: Pre-fetched limits from database (avoids live fetch) 87 | fast_mode: If True, show warning that data is from last update 88 | """ 89 | # Create KPI cards with limits (shows spinner if loading limits) 90 | kpi_section = _create_kpi_section(stats.overall_totals, skip_limits=skip_limits, console=console, limits_from_db=limits_from_db) 91 | 92 | # Create breakdowns 93 | model_breakdown = _create_model_breakdown(records) 94 | project_breakdown = _create_project_breakdown(records) 95 | 96 | # Create footer with export info and date range 97 | footer = _create_footer(date_range, fast_mode=fast_mode) 98 | 99 | # Optionally clear screen and render all components 100 | if clear_screen: 101 | console.clear() 102 | console.print(kpi_section, end="") 103 | console.print() # Blank line between sections 104 | console.print(model_breakdown, end="") 105 | console.print() # Blank line between sections 106 | console.print(project_breakdown, end="") 107 | console.print() # Blank line before footer 108 | console.print(footer) 109 | 110 | 111 | def _create_kpi_section(overall, skip_limits: bool = False, console: Console = None, limits_from_db: dict | None = None) -> Group: 112 | """ 113 | Create KPI cards with individual limit boxes beneath each. 114 | 115 | Args: 116 | overall: Overall statistics 117 | skip_limits: If True, skip fetching current limits (faster) 118 | console: Console instance for showing spinner 119 | limits_from_db: Pre-fetched limits from database (avoids live fetch) 120 | 121 | Returns: 122 | Group containing KPI cards and limit boxes 123 | """ 124 | # Use limits from DB if provided, otherwise fetch live (unless skipped) 125 | limits = limits_from_db 126 | if limits is None and not skip_limits: 127 | from src.commands.limits import capture_limits 128 | if console: 129 | with console.status(f"[bold {ORANGE}]Loading usage limits...", spinner="dots", spinner_style=ORANGE): 130 | limits = capture_limits() 131 | else: 132 | limits = capture_limits() 133 | 134 | # Create KPI cards 135 | kpi_grid = Table.grid(padding=(0, 2), expand=False) 136 | kpi_grid.add_column(justify="center") 137 | kpi_grid.add_column(justify="center") 138 | kpi_grid.add_column(justify="center") 139 | 140 | # Total Tokens card 141 | tokens_card = Panel( 142 | Text(_format_number(overall.total_tokens), style=f"bold {ORANGE}"), 143 | title="Total Tokens", 144 | border_style="white", 145 | width=28, 146 | ) 147 | 148 | # Total Prompts card 149 | prompts_card = Panel( 150 | Text(_format_number(overall.total_prompts), style="bold white"), 151 | title="Prompts Sent", 152 | border_style="white", 153 | width=28, 154 | ) 155 | 156 | # Total Sessions card 157 | sessions_card = Panel( 158 | Text(_format_number(overall.total_sessions), style="bold white"), 159 | title="Active Sessions", 160 | border_style="white", 161 | width=28, 162 | ) 163 | 164 | kpi_grid.add_row(tokens_card, prompts_card, sessions_card) 165 | 166 | # Create individual limit boxes if available 167 | if limits and "error" not in limits: 168 | limit_grid = Table.grid(padding=(0, 2), expand=False) 169 | limit_grid.add_column(justify="center") 170 | limit_grid.add_column(justify="center") 171 | limit_grid.add_column(justify="center") 172 | 173 | # Remove timezone info from reset dates 174 | session_reset = limits['session_reset'].split(' (')[0] if '(' in limits['session_reset'] else limits['session_reset'] 175 | week_reset = limits['week_reset'].split(' (')[0] if '(' in limits['week_reset'] else limits['week_reset'] 176 | opus_reset = limits['opus_reset'].split(' (')[0] if '(' in limits['opus_reset'] else limits['opus_reset'] 177 | 178 | # Session limit box 179 | session_bar = _create_bar(limits["session_pct"], 100, width=16, color="red") 180 | session_content = Text() 181 | session_content.append(f"{limits['session_pct']}% ", style="bold red") 182 | session_content.append(session_bar) 183 | session_content.append(f"\nResets: {session_reset}", style="white") 184 | session_box = Panel( 185 | session_content, 186 | title="[red]Session Limit", 187 | border_style="white", 188 | width=28, 189 | ) 190 | 191 | # Week limit box 192 | week_bar = _create_bar(limits["week_pct"], 100, width=16, color="red") 193 | week_content = Text() 194 | week_content.append(f"{limits['week_pct']}% ", style="bold red") 195 | week_content.append(week_bar) 196 | week_content.append(f"\nResets: {week_reset}", style="white") 197 | week_box = Panel( 198 | week_content, 199 | title="[red]Weekly Limit", 200 | border_style="white", 201 | width=28, 202 | ) 203 | 204 | # Opus limit box 205 | opus_bar = _create_bar(limits["opus_pct"], 100, width=16, color="red") 206 | opus_content = Text() 207 | opus_content.append(f"{limits['opus_pct']}% ", style="bold red") 208 | opus_content.append(opus_bar) 209 | opus_content.append(f"\nResets: {opus_reset}", style="white") 210 | opus_box = Panel( 211 | opus_content, 212 | title="[red]Opus Limit", 213 | border_style="white", 214 | width=28, 215 | ) 216 | 217 | limit_grid.add_row(session_box, week_box, opus_box) 218 | 219 | # Add spacing between KPI cards and limits with a simple newline 220 | spacing = Text("\n") 221 | return Group(kpi_grid, spacing, limit_grid) 222 | else: 223 | return Group(kpi_grid) 224 | 225 | 226 | def _create_kpi_cards(overall) -> Table: 227 | """ 228 | Create 3 KPI cards showing key metrics. 229 | 230 | Args: 231 | overall: Overall statistics 232 | 233 | Returns: 234 | Table grid with KPI cards 235 | """ 236 | grid = Table.grid(padding=(0, 2), expand=False) 237 | grid.add_column(justify="center") 238 | grid.add_column(justify="center") 239 | grid.add_column(justify="center") 240 | 241 | # Total Tokens card 242 | tokens_card = Panel( 243 | Text.assemble( 244 | (_format_number(overall.total_tokens), f"bold {ORANGE}"), 245 | "\n", 246 | ("Total Tokens", DIM), 247 | ), 248 | border_style="white", 249 | width=28, 250 | ) 251 | 252 | # Total Prompts card 253 | prompts_card = Panel( 254 | Text.assemble( 255 | (_format_number(overall.total_prompts), f"bold {ORANGE}"), 256 | "\n", 257 | ("Prompts Sent", DIM), 258 | ), 259 | border_style="white", 260 | width=28, 261 | ) 262 | 263 | # Total Sessions card 264 | sessions_card = Panel( 265 | Text.assemble( 266 | (_format_number(overall.total_sessions), f"bold {ORANGE}"), 267 | "\n", 268 | ("Active Sessions", DIM), 269 | ), 270 | border_style="white", 271 | width=28, 272 | ) 273 | 274 | grid.add_row(tokens_card, prompts_card, sessions_card) 275 | return grid 276 | 277 | 278 | def _create_limits_bars() -> Panel | None: 279 | """ 280 | Create progress bars showing current usage limits. 281 | 282 | Returns: 283 | Panel with limit progress bars, or None if no limits data 284 | """ 285 | # Try to capture current limits 286 | from src.commands.limits import capture_limits 287 | 288 | limits = capture_limits() 289 | if not limits or "error" in limits: 290 | return None 291 | 292 | table = Table(show_header=False, box=None, padding=(0, 2)) 293 | table.add_column("Label", style="white", justify="left") 294 | table.add_column("Bar", justify="left") 295 | table.add_column("Percent", style=ORANGE, justify="right") 296 | table.add_column("Reset", style=CYAN, justify="left") 297 | 298 | # Session limit 299 | session_bar = _create_bar(limits["session_pct"], 100, width=30) 300 | table.add_row( 301 | "[bold]Session", 302 | session_bar, 303 | f"{limits['session_pct']}%", 304 | f"resets {limits['session_reset']}", 305 | ) 306 | 307 | # Week limit 308 | week_bar = _create_bar(limits["week_pct"], 100, width=30) 309 | table.add_row( 310 | "[bold]Week", 311 | week_bar, 312 | f"{limits['week_pct']}%", 313 | f"resets {limits['week_reset']}", 314 | ) 315 | 316 | # Opus limit 317 | opus_bar = _create_bar(limits["opus_pct"], 100, width=30) 318 | table.add_row( 319 | "[bold]Opus", 320 | opus_bar, 321 | f"{limits['opus_pct']}%", 322 | f"resets {limits['opus_reset']}", 323 | ) 324 | 325 | return Panel( 326 | table, 327 | title="[bold]Usage Limits", 328 | border_style="white", 329 | ) 330 | 331 | 332 | def _create_model_breakdown(records: list[UsageRecord]) -> Panel: 333 | """ 334 | Create table showing token usage per model. 335 | 336 | Args: 337 | records: List of usage records 338 | 339 | Returns: 340 | Panel with model breakdown table 341 | """ 342 | # Aggregate tokens by model 343 | model_tokens: dict[str, int] = defaultdict(int) 344 | 345 | for record in records: 346 | if record.model and record.token_usage and record.model != "<synthetic>": 347 | model_tokens[record.model] += record.token_usage.total_tokens 348 | 349 | if not model_tokens: 350 | return Panel( 351 | Text("No model data available", style=DIM), 352 | title="[bold]Tokens by Model", 353 | border_style="white", 354 | ) 355 | 356 | # Calculate total and max 357 | total_tokens = sum(model_tokens.values()) 358 | max_tokens = max(model_tokens.values()) 359 | 360 | # Sort by usage 361 | sorted_models = sorted(model_tokens.items(), key=lambda x: x[1], reverse=True) 362 | 363 | # Create table 364 | table = Table(show_header=False, box=None, padding=(0, 2)) 365 | table.add_column("Model", style="white", justify="left", width=25) 366 | table.add_column("Bar", justify="left") 367 | table.add_column("Tokens", style=ORANGE, justify="right") 368 | table.add_column("Percentage", style=CYAN, justify="right") 369 | 370 | for model, tokens in sorted_models: 371 | # Shorten model name 372 | display_name = model.split("/")[-1] if "/" in model else model 373 | if "claude" in display_name.lower(): 374 | display_name = display_name.replace("claude-", "") 375 | 376 | percentage = (tokens / total_tokens * 100) if total_tokens > 0 else 0 377 | 378 | # Create bar 379 | bar = _create_bar(tokens, max_tokens, width=20) 380 | 381 | table.add_row( 382 | display_name, 383 | bar, 384 | _format_number(tokens), 385 | f"{percentage:.1f}%", 386 | ) 387 | 388 | return Panel( 389 | table, 390 | title="[bold]Tokens by Model", 391 | border_style="white", 392 | ) 393 | 394 | 395 | def _create_project_breakdown(records: list[UsageRecord]) -> Panel: 396 | """ 397 | Create table showing token usage per project. 398 | 399 | Args: 400 | records: List of usage records 401 | 402 | Returns: 403 | Panel with project breakdown table 404 | """ 405 | # Aggregate tokens by folder 406 | folder_tokens: dict[str, int] = defaultdict(int) 407 | 408 | for record in records: 409 | if record.token_usage: 410 | folder_tokens[record.folder] += record.token_usage.total_tokens 411 | 412 | if not folder_tokens: 413 | return Panel( 414 | Text("No project data available", style=DIM), 415 | title="[bold]Tokens by Project", 416 | border_style="white", 417 | ) 418 | 419 | # Calculate total and max 420 | total_tokens = sum(folder_tokens.values()) 421 | 422 | # Sort by usage 423 | sorted_folders = sorted(folder_tokens.items(), key=lambda x: x[1], reverse=True) 424 | 425 | # Limit to top 10 projects 426 | sorted_folders = sorted_folders[:10] 427 | max_tokens = max(tokens for _, tokens in sorted_folders) 428 | 429 | # Create table 430 | table = Table(show_header=False, box=None, padding=(0, 2)) 431 | table.add_column("Project", style="white", justify="left", overflow="crop") 432 | table.add_column("Bar", justify="left", overflow="crop") 433 | table.add_column("Tokens", style=ORANGE, justify="right") 434 | table.add_column("Percentage", style=CYAN, justify="right") 435 | 436 | for folder, tokens in sorted_folders: 437 | # Show only last 2-3 parts of path and truncate if needed 438 | parts = folder.split("/") 439 | if len(parts) > 3: 440 | display_name = ".../" + "/".join(parts[-2:]) 441 | elif len(parts) > 2: 442 | display_name = "/".join(parts[-2:]) 443 | else: 444 | display_name = folder 445 | 446 | # Manually truncate to 35 chars without ellipses 447 | if len(display_name) > 35: 448 | display_name = display_name[:35] 449 | 450 | percentage = (tokens / total_tokens * 100) if total_tokens > 0 else 0 451 | 452 | # Create bar 453 | bar = _create_bar(tokens, max_tokens, width=20) 454 | 455 | table.add_row( 456 | display_name, 457 | bar, 458 | _format_number(tokens), 459 | f"{percentage:.1f}%", 460 | ) 461 | 462 | return Panel( 463 | table, 464 | title="[bold]Tokens by Project", 465 | border_style="white", 466 | ) 467 | 468 | 469 | def _create_footer(date_range: str = None, fast_mode: bool = False) -> Text: 470 | """ 471 | Create footer with export command info and date range. 472 | 473 | Args: 474 | date_range: Optional date range string to display 475 | fast_mode: If True, show warning about fast mode 476 | 477 | Returns: 478 | Text with export instructions and date range 479 | """ 480 | footer = Text() 481 | 482 | # Add fast mode warning if enabled 483 | if fast_mode: 484 | from src.storage.snapshot_db import get_database_stats 485 | db_stats = get_database_stats() 486 | if db_stats.get("newest_timestamp"): 487 | # Format ISO timestamp to be more readable 488 | timestamp_str = db_stats["newest_timestamp"] 489 | try: 490 | dt = datetime.fromisoformat(timestamp_str) 491 | formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") 492 | footer.append("⚠ Fast mode: Reading from last update (", style="bold red") 493 | footer.append(f"{formatted_time}", style="bold red") 494 | footer.append(")\n\n", style="bold red") 495 | except (ValueError, AttributeError): 496 | footer.append(f"⚠ Fast mode: Reading from last update ({timestamp_str})\n\n", style="bold red") 497 | else: 498 | footer.append("⚠ Fast mode: Reading from database (no timestamp available)\n\n", style="bold red") 499 | 500 | # Add date range if provided 501 | if date_range: 502 | footer.append("Data range: ", style=DIM) 503 | footer.append(f"{date_range}\n", style=f"bold {CYAN}") 504 | 505 | # Add export tip 506 | footer.append("Tip: ", style=DIM) 507 | footer.append("View yearly heatmap with ", style=DIM) 508 | footer.append("ccg export --open", style=f"bold {CYAN}") 509 | 510 | return footer 511 | 512 | 513 | #endregion 514 | ``` -------------------------------------------------------------------------------- /src/visualization/export.py: -------------------------------------------------------------------------------- ```python 1 | #region Imports 2 | from datetime import datetime, timedelta 3 | from pathlib import Path 4 | from typing import Optional 5 | 6 | from src.aggregation.daily_stats import AggregatedStats, DailyStats 7 | #endregion 8 | 9 | 10 | #region Constants 11 | # Claude UI color scheme 12 | CLAUDE_BG = "#262624" 13 | CLAUDE_TEXT = "#FAF9F5" 14 | CLAUDE_TEXT_SECONDARY = "#C2C0B7" 15 | CLAUDE_DARK_GREY = "#3C3C3A" # Past days with no activity 16 | CLAUDE_LIGHT_GREY = "#6B6B68" # Future days 17 | CLAUDE_ORANGE_RGB = (203, 123, 93) # #CB7B5D 18 | 19 | # Diverging gradients for limits (matching orange tone) 20 | CLAUDE_BLUE_RGB = (93, 150, 203) # Blue with similar tone to orange 21 | CLAUDE_GREEN_RGB = (93, 203, 123) # Green with similar tone to orange 22 | CLAUDE_RED_RGB = (203, 93, 93) # Red for 100% 23 | CLAUDE_DARK_RED_RGB = (120, 40, 80) # Dark red/purple for >100% 24 | 25 | # Export at higher resolution for sharp output 26 | SCALE_FACTOR = 3 # 3x resolution 27 | CELL_SIZE = 12 * SCALE_FACTOR 28 | CELL_GAP = 3 * SCALE_FACTOR 29 | CELL_TOTAL = CELL_SIZE + CELL_GAP 30 | #endregion 31 | 32 | 33 | #region Functions 34 | 35 | 36 | def export_heatmap_svg( 37 | stats: AggregatedStats, 38 | output_path: Path, 39 | title: Optional[str] = None, 40 | year: Optional[int] = None 41 | ) -> None: 42 | """ 43 | Export the activity heatmap as an SVG file. 44 | 45 | Args: 46 | stats: Aggregated statistics to visualize 47 | output_path: Path where SVG file will be saved 48 | title: Optional title for the graph 49 | year: Year to display (defaults to current year) 50 | 51 | Raises: 52 | IOError: If file cannot be written 53 | """ 54 | # Show full year: Jan 1 to Dec 31 55 | today = datetime.now().date() 56 | display_year = year if year is not None else today.year 57 | start_date = datetime(display_year, 1, 1).date() 58 | end_date = datetime(display_year, 12, 31).date() 59 | 60 | # Build weeks structure 61 | jan1_day = (start_date.weekday() + 1) % 7 62 | weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] = [] 63 | current_week: list[tuple[Optional[DailyStats], Optional[datetime.date]]] = [] 64 | 65 | # Pad first week with None 66 | for _ in range(jan1_day): 67 | current_week.append((None, None)) 68 | 69 | # Add all days from Jan 1 to Dec 31 70 | current_date = start_date 71 | while current_date <= end_date: 72 | date_key = current_date.strftime("%Y-%m-%d") 73 | day_stats = stats.daily_stats.get(date_key) 74 | current_week.append((day_stats, current_date)) 75 | 76 | if len(current_week) == 7: 77 | weeks.append(current_week) 78 | current_week = [] 79 | 80 | current_date += timedelta(days=1) 81 | 82 | # Pad final week with None 83 | if current_week: 84 | while len(current_week) < 7: 85 | current_week.append((None, None)) 86 | weeks.append(current_week) 87 | 88 | # Calculate dimensions 89 | num_weeks = len(weeks) 90 | width = (num_weeks * CELL_TOTAL) + 120 # Extra space for labels 91 | height = (7 * CELL_TOTAL) + 80 # Extra space for title and legend 92 | 93 | # Calculate max tokens for scaling 94 | max_tokens = max( 95 | (s.total_tokens for s in stats.daily_stats.values()), default=1 96 | ) if stats.daily_stats else 1 97 | 98 | # Generate SVG with dynamic title 99 | default_title = f"Your Claude Code activity in {display_year}" 100 | svg = _generate_svg(weeks, width, height, max_tokens, title or default_title) 101 | 102 | # Write to file 103 | output_path.write_text(svg, encoding="utf-8") 104 | 105 | 106 | def export_heatmap_png( 107 | stats: AggregatedStats, 108 | output_path: Path, 109 | limits_data: Optional[dict[str, dict[str, int]]] = None, 110 | title: Optional[str] = None, 111 | year: Optional[int] = None, 112 | tracking_mode: str = "both" 113 | ) -> None: 114 | """ 115 | Export activity heatmaps as a PNG file: tokens, week %, and/or opus %. 116 | 117 | Requires Pillow: pip install pillow 118 | 119 | Args: 120 | stats: Aggregated statistics to visualize 121 | output_path: Path where PNG file will be saved 122 | limits_data: Dictionary mapping dates to {"week_pct": int, "opus_pct": int} 123 | title: Optional title for the graph 124 | year: Year to display (defaults to current year) 125 | tracking_mode: One of "both", "tokens", or "limits" 126 | 127 | Raises: 128 | ImportError: If Pillow is not installed 129 | IOError: If file cannot be written 130 | """ 131 | if limits_data is None: 132 | limits_data = {} 133 | try: 134 | from PIL import Image, ImageDraw, ImageFont 135 | except ImportError: 136 | raise ImportError( 137 | "PNG export requires Pillow. " 138 | "Install with: pip install pillow" 139 | ) 140 | 141 | # Build weeks structure (same as SVG) 142 | today = datetime.now().date() 143 | display_year = year if year is not None else today.year 144 | start_date = datetime(display_year, 1, 1).date() 145 | end_date = datetime(display_year, 12, 31).date() 146 | 147 | jan1_day = (start_date.weekday() + 1) % 7 148 | weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] = [] 149 | current_week: list[tuple[Optional[DailyStats], Optional[datetime.date]]] = [] 150 | 151 | for _ in range(jan1_day): 152 | current_week.append((None, None)) 153 | 154 | current_date = start_date 155 | while current_date <= end_date: 156 | date_key = current_date.strftime("%Y-%m-%d") 157 | day_stats = stats.daily_stats.get(date_key) 158 | current_week.append((day_stats, current_date)) 159 | 160 | if len(current_week) == 7: 161 | weeks.append(current_week) 162 | current_week = [] 163 | 164 | current_date += timedelta(days=1) 165 | 166 | if current_week: 167 | while len(current_week) < 7: 168 | current_week.append((None, None)) 169 | weeks.append(current_week) 170 | 171 | # Calculate dimensions based on tracking mode 172 | num_weeks = len(weeks) 173 | 174 | # Base grid dimensions (one heatmap) 175 | grid_width = num_weeks * CELL_TOTAL 176 | grid_height = 7 * CELL_TOTAL 177 | 178 | # Layout: Vertical stack with titles and legends for each 179 | base_padding = int(40 * SCALE_FACTOR * 0.66) 180 | day_label_space = 35 * SCALE_FACTOR 181 | heatmap_vertical_gap = 40 * SCALE_FACTOR # Gap between vertically stacked heatmaps 182 | heatmap_title_space = 20 * SCALE_FACTOR # Space for individual heatmap titles 183 | month_label_space = 12 * SCALE_FACTOR # Space for month labels above each grid 184 | legend_height = CELL_SIZE + (8 * SCALE_FACTOR) # Legend squares + small buffer 185 | 186 | # Main title at the top 187 | main_title_height = 20 * SCALE_FACTOR 188 | main_title_to_first_heatmap = 25 * SCALE_FACTOR 189 | 190 | # Each heatmap section includes: title + month labels + grid + legend 191 | single_heatmap_section_height = heatmap_title_space + month_label_space + grid_height + legend_height 192 | 193 | # Calculate number of heatmaps based on tracking mode 194 | if tracking_mode == "tokens": 195 | num_heatmaps = 1 196 | elif tracking_mode == "limits": 197 | num_heatmaps = 2 # Week and Opus 198 | else: # "both" 199 | num_heatmaps = 3 200 | 201 | # Total height 202 | top_padding = base_padding + main_title_height + main_title_to_first_heatmap 203 | content_height = (num_heatmaps * single_heatmap_section_height) + ((num_heatmaps - 1) * heatmap_vertical_gap) 204 | bottom_padding = base_padding 205 | 206 | width = base_padding + day_label_space + grid_width + base_padding 207 | height = top_padding + content_height + bottom_padding 208 | 209 | # Calculate max tokens 210 | max_tokens = max( 211 | (s.total_tokens for s in stats.daily_stats.values()), default=1 212 | ) if stats.daily_stats else 1 213 | 214 | # Create image 215 | img = Image.new('RGB', (width, height), _hex_to_rgb(CLAUDE_BG)) 216 | draw = ImageDraw.Draw(img) 217 | 218 | # Try to load a system font with scaled sizes (cross-platform) 219 | try: 220 | # Try common font paths across different systems 221 | font_paths = [ 222 | "/System/Library/Fonts/Helvetica.ttc", # macOS 223 | "C:\\Windows\\Fonts\\arial.ttf", # Windows 224 | "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", # Linux 225 | "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", # Linux alternative 226 | ] 227 | 228 | title_font = None 229 | label_font = None 230 | 231 | for font_path in font_paths: 232 | try: 233 | title_font = ImageFont.truetype(font_path, 16 * SCALE_FACTOR) 234 | label_font = ImageFont.truetype(font_path, 10 * SCALE_FACTOR) 235 | break 236 | except: 237 | continue 238 | 239 | if title_font is None: 240 | raise Exception("No system font found") 241 | except: 242 | title_font = ImageFont.load_default() 243 | label_font = ImageFont.load_default() 244 | 245 | # Calculate common X positions 246 | day_label_x = base_padding 247 | grid_x = base_padding + day_label_space 248 | 249 | # Calculate Y positions for each heatmap section dynamically 250 | heatmap_y_positions = [] 251 | current_y = top_padding 252 | for i in range(num_heatmaps): 253 | heatmap_y_positions.append(current_y) 254 | current_y += single_heatmap_section_height + heatmap_vertical_gap 255 | 256 | # Draw main title and icon at the very top 257 | title_x = base_padding 258 | title_y = base_padding 259 | pixel_size = int(SCALE_FACTOR * 4) 260 | icon_width = _draw_claude_guy(draw, title_x, title_y, pixel_size) 261 | title_text_x = title_x + icon_width + (8 * SCALE_FACTOR) 262 | default_title = f"Your Claude Code activity in {display_year}" 263 | draw.text((title_text_x, title_y), title or default_title, fill=_hex_to_rgb(CLAUDE_TEXT), font=title_font) 264 | 265 | corner_radius = 2 * SCALE_FACTOR 266 | day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] 267 | 268 | # Helper function to draw one complete heatmap section 269 | def draw_heatmap_section(section_y_start, heatmap_title, gradient_func, is_tokens=True, base_color_rgb=None, reset_info=None): 270 | # Positions within this section 271 | title_y = section_y_start 272 | month_y = title_y + heatmap_title_space 273 | grid_y = month_y + month_label_space 274 | legend_y = grid_y + grid_height + (CELL_GAP * 2) 275 | legend_square_y = legend_y - (CELL_SIZE // 4) 276 | 277 | # Parse reset date if provided 278 | reset_date = None 279 | if reset_info: 280 | import re 281 | # Parse "Oct 16, 10:59am (Europe/Brussels)" format 282 | match = re.search(r'([A-Za-z]+)\s+(\d+)', reset_info) 283 | if match: 284 | month_name = match.group(1) 285 | day = int(match.group(2)) 286 | # Convert month name to number 287 | from datetime import datetime 288 | month_num = datetime.strptime(month_name, "%b").month 289 | reset_date = datetime(display_year, month_num, day).date() 290 | 291 | # Draw heatmap title 292 | draw.text((grid_x, title_y), heatmap_title, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) 293 | 294 | # Draw day labels (vertically centered with row) 295 | for day_idx, day_name in enumerate(day_names): 296 | y = grid_y + (day_idx * CELL_TOTAL) + (CELL_SIZE // 2) 297 | draw.text((day_label_x, y), day_name, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font, anchor="lm") # left-middle anchor 298 | 299 | # Draw month labels 300 | last_month = None 301 | for week_idx, week in enumerate(weeks): 302 | for day_stats, date in week: 303 | if date is not None: 304 | month = date.month 305 | if month != last_month: 306 | x = grid_x + (week_idx * CELL_TOTAL) 307 | month_name = date.strftime("%b") 308 | draw.text((x, month_y), month_name, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) 309 | last_month = month 310 | break 311 | 312 | # Draw heatmap cells 313 | for week_idx, week in enumerate(weeks): 314 | for day_idx, (day_stats, date) in enumerate(week): 315 | if date is None: 316 | continue 317 | 318 | x = grid_x + (week_idx * CELL_TOTAL) 319 | y = grid_y + (day_idx * CELL_TOTAL) 320 | 321 | color = gradient_func(day_stats, date) 322 | draw.rounded_rectangle([x, y, x + CELL_SIZE, y + CELL_SIZE], 323 | radius=corner_radius, fill=color, outline=_hex_to_rgb(CLAUDE_BG)) 324 | 325 | # Draw orange dot on reset day 326 | if reset_date and date == reset_date: 327 | dot_radius = int(CELL_SIZE * 0.2) # 20% of cell size 328 | dot_center_x = x + (CELL_SIZE // 2) 329 | dot_center_y = y + (CELL_SIZE // 2) 330 | draw.ellipse([ 331 | dot_center_x - dot_radius, 332 | dot_center_y - dot_radius, 333 | dot_center_x + dot_radius, 334 | dot_center_y + dot_radius 335 | ], fill=CLAUDE_ORANGE_RGB) 336 | 337 | # Draw legend 338 | draw.text((grid_x, legend_y), "Less" if is_tokens else "0%", fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) 339 | text_bbox = draw.textbbox((grid_x, legend_y), "Less" if is_tokens else "0%", font=label_font) 340 | text_width = text_bbox[2] - text_bbox[0] 341 | 342 | legend_extra_gap = int(CELL_GAP * 0.3) 343 | legend_square_spacing = CELL_SIZE + CELL_GAP + legend_extra_gap 344 | squares_start = grid_x + text_width + (CELL_GAP * 2) 345 | 346 | if is_tokens: 347 | # Token legend: dark grey + orange gradient 348 | draw.rounded_rectangle([squares_start, legend_square_y, squares_start + CELL_SIZE, legend_square_y + CELL_SIZE], 349 | radius=corner_radius, fill=_hex_to_rgb(CLAUDE_DARK_GREY)) 350 | for i in range(1, 5): 351 | intensity = 0.2 + ((i - 1) / 3) * 0.8 352 | r = int(CLAUDE_ORANGE_RGB[0] * intensity) 353 | g = int(CLAUDE_ORANGE_RGB[1] * intensity) 354 | b = int(CLAUDE_ORANGE_RGB[2] * intensity) 355 | x = squares_start + (i * legend_square_spacing) 356 | draw.rounded_rectangle([x, legend_square_y, x + CELL_SIZE, legend_square_y + CELL_SIZE], 357 | radius=corner_radius, fill=(r, g, b)) 358 | end_text = "More" 359 | else: 360 | # Limits legend: base_color → red → dark red 361 | for i in range(5): 362 | if i == 0: 363 | color = base_color_rgb 364 | elif i == 4: 365 | color = CLAUDE_DARK_RED_RGB 366 | else: 367 | ratio = i / 3.0 368 | r = int(base_color_rgb[0] + (CLAUDE_RED_RGB[0] - base_color_rgb[0]) * ratio) 369 | g = int(base_color_rgb[1] + (CLAUDE_RED_RGB[1] - base_color_rgb[1]) * ratio) 370 | b = int(base_color_rgb[2] + (CLAUDE_RED_RGB[2] - base_color_rgb[2]) * ratio) 371 | color = (r, g, b) 372 | x = squares_start + (i * legend_square_spacing) 373 | draw.rounded_rectangle([x, legend_square_y, x + CELL_SIZE, legend_square_y + CELL_SIZE], 374 | radius=corner_radius, fill=color) 375 | end_text = "100%+" 376 | 377 | more_x = squares_start + (5 * legend_square_spacing) + CELL_GAP 378 | draw.text((more_x, legend_y), end_text, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) 379 | 380 | # Add reset dot legend if applicable 381 | if reset_info: 382 | reset_legend_x = more_x + 100 * SCALE_FACTOR # Space after "100%+" 383 | dot_radius = int(CELL_SIZE * 0.2) 384 | dot_center_x = reset_legend_x + dot_radius + (CELL_GAP) 385 | dot_center_y = legend_square_y + (CELL_SIZE // 2) 386 | 387 | # Draw orange dot 388 | draw.ellipse([ 389 | dot_center_x - dot_radius, 390 | dot_center_y - dot_radius, 391 | dot_center_x + dot_radius, 392 | dot_center_y + dot_radius 393 | ], fill=CLAUDE_ORANGE_RGB) 394 | 395 | # Draw label 396 | reset_text = f"Resets: {reset_info}" 397 | reset_text_x = dot_center_x + dot_radius + (CELL_GAP) 398 | draw.text((reset_text_x, legend_y), reset_text, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) 399 | 400 | # Define gradient functions for each heatmap 401 | def tokens_gradient(day_stats, date): 402 | color_str = _get_color(day_stats, max_tokens, date, today) 403 | return _parse_rgb(color_str) if color_str.startswith('rgb(') else _hex_to_rgb(color_str) 404 | 405 | def week_gradient(day_stats, date): 406 | date_key = date.strftime("%Y-%m-%d") 407 | week_pct = limits_data.get(date_key, {}).get("week_pct", 0) 408 | return _get_limits_color(week_pct, CLAUDE_BLUE_RGB, date, today) 409 | 410 | def opus_gradient(day_stats, date): 411 | date_key = date.strftime("%Y-%m-%d") 412 | opus_pct = limits_data.get(date_key, {}).get("opus_pct", 0) 413 | return _get_limits_color(opus_pct, CLAUDE_GREEN_RGB, date, today) 414 | 415 | # Get most recent reset info from limits_data 416 | week_reset_info = None 417 | opus_reset_info = None 418 | if limits_data: 419 | # Get the most recent limits snapshot to extract reset times 420 | from src.storage.snapshot_db import DEFAULT_DB_PATH 421 | import sqlite3 422 | try: 423 | conn = sqlite3.connect(DEFAULT_DB_PATH) 424 | cursor = conn.cursor() 425 | cursor.execute("SELECT week_reset, opus_reset FROM limits_snapshots ORDER BY timestamp DESC LIMIT 1") 426 | result = cursor.fetchone() 427 | if result: 428 | week_reset_info = result[0] 429 | opus_reset_info = result[1] 430 | conn.close() 431 | except: 432 | pass 433 | 434 | # Draw heatmap sections based on tracking mode 435 | heatmap_idx = 0 436 | 437 | if tracking_mode in ["both", "tokens"]: 438 | draw_heatmap_section(heatmap_y_positions[heatmap_idx], "Token Usage", tokens_gradient, is_tokens=True) 439 | heatmap_idx += 1 440 | 441 | if tracking_mode in ["both", "limits"]: 442 | draw_heatmap_section(heatmap_y_positions[heatmap_idx], "Week Limit %", week_gradient, is_tokens=False, base_color_rgb=CLAUDE_BLUE_RGB, reset_info=week_reset_info) 443 | heatmap_idx += 1 444 | draw_heatmap_section(heatmap_y_positions[heatmap_idx], "Opus Limit %", opus_gradient, is_tokens=False, base_color_rgb=CLAUDE_GREEN_RGB, reset_info=opus_reset_info) 445 | 446 | # Save image 447 | img.save(output_path, 'PNG') 448 | 449 | 450 | def _generate_svg( 451 | weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]], 452 | width: int, 453 | height: int, 454 | max_tokens: int, 455 | title: str 456 | ) -> str: 457 | """ 458 | Generate SVG markup for the heatmap. 459 | 460 | Args: 461 | weeks: List of weeks with daily stats 462 | width: SVG width in pixels 463 | height: SVG height in pixels 464 | max_tokens: Maximum token count for scaling 465 | title: Title text 466 | 467 | Returns: 468 | SVG markup as a string 469 | """ 470 | svg_parts = [ 471 | f'<svg width="{width}" height="{height}" xmlns="http://www.w3.org/2000/svg">', 472 | '<style>', 473 | f' .day-cell {{ stroke: {CLAUDE_BG}; stroke-width: 1; }}', 474 | f' .month-label {{ fill: {CLAUDE_TEXT_SECONDARY}; font: 12px -apple-system, sans-serif; }}', 475 | f' .day-label {{ fill: {CLAUDE_TEXT_SECONDARY}; font: 10px -apple-system, sans-serif; }}', 476 | f' .title {{ fill: {CLAUDE_TEXT}; font: bold 16px -apple-system, sans-serif; }}', 477 | f' .legend-text {{ fill: {CLAUDE_TEXT_SECONDARY}; font: 10px -apple-system, sans-serif; }}', 478 | '</style>', 479 | f'<rect width="{width}" height="{height}" fill="{CLAUDE_BG}"/>', 480 | ] 481 | 482 | # Draw Claude guy (Clawd) icon in SVG 483 | clawd_svg = _generate_clawd_svg(10, 10, 3) 484 | svg_parts.append(clawd_svg) 485 | 486 | # Title (positioned after Clawd icon) 487 | title_x = 10 + (8 * 3) + 8 # Icon width + gap 488 | svg_parts.append(f'<text x="{title_x}" y="25" class="title">{title}</text>') 489 | 490 | # Day labels (Y-axis) 491 | day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] 492 | for day_idx, day_name in enumerate(day_names): 493 | y = 60 + (day_idx * CELL_TOTAL) + (CELL_SIZE // 2) 494 | svg_parts.append(f'<text x="5" y="{y + 4}" class="day-label" text-anchor="start">{day_name}</text>') 495 | 496 | # Month labels (X-axis) 497 | last_month = None 498 | for week_idx, week in enumerate(weeks): 499 | for day_stats, date in week: 500 | if date is not None: 501 | month = date.month 502 | if month != last_month: 503 | x = 40 + (week_idx * CELL_TOTAL) 504 | month_name = date.strftime("%b") 505 | svg_parts.append(f'<text x="{x}" y="50" class="month-label">{month_name}</text>') 506 | last_month = month 507 | break 508 | 509 | # Heatmap cells 510 | today = datetime.now().date() 511 | for week_idx, week in enumerate(weeks): 512 | for day_idx, (day_stats, date) in enumerate(week): 513 | if date is None: 514 | # Skip padding cells 515 | continue 516 | 517 | x = 40 + (week_idx * CELL_TOTAL) 518 | y = 60 + (day_idx * CELL_TOTAL) 519 | 520 | color = _get_color(day_stats, max_tokens, date, today) 521 | 522 | # Add tooltip with date and stats 523 | if day_stats and day_stats.total_tokens > 0: 524 | tooltip = f"{date}: {day_stats.total_prompts} prompts, {day_stats.total_tokens:,} tokens" 525 | elif date > today: 526 | tooltip = f"{date}: Future" 527 | else: 528 | tooltip = f"{date}: No activity" 529 | 530 | svg_parts.append(f'<rect x="{x}" y="{y}" width="{CELL_SIZE}" height="{CELL_SIZE}" fill="{color}" class="day-cell"><title>{tooltip}</title></rect>') 531 | 532 | # Legend - show gradient from dark to bright orange 533 | legend_y = height - 20 534 | legend_x = 40 535 | svg_parts.append(f'<text x="{legend_x}" y="{legend_y}" class="legend-text">Less</text>') 536 | 537 | # Show 5 sample cells from gradient 538 | for i in range(5): 539 | intensity = 0.2 + (i / 4) * 0.8 540 | r = int(CLAUDE_ORANGE_RGB[0] * intensity) 541 | g = int(CLAUDE_ORANGE_RGB[1] * intensity) 542 | b = int(CLAUDE_ORANGE_RGB[2] * intensity) 543 | color = f"rgb({r},{g},{b})" 544 | x = legend_x + 35 + (i * (CELL_SIZE + 2)) 545 | svg_parts.append(f'<rect x="{x}" y="{legend_y - CELL_SIZE + 2}" width="{CELL_SIZE}" height="{CELL_SIZE}" fill="{color}" class="day-cell"/>') 546 | 547 | svg_parts.append(f'<text x="{legend_x + 35 + (5 * (CELL_SIZE + 2)) + 5}" y="{legend_y}" class="legend-text">More</text>') 548 | 549 | svg_parts.append('</svg>') 550 | 551 | return '\n'.join(svg_parts) 552 | 553 | 554 | def _get_limits_color( 555 | pct: int, 556 | base_color_rgb: tuple[int, int, int], 557 | date: datetime.date, 558 | today: datetime.date 559 | ) -> tuple[int, int, int]: 560 | """ 561 | Get diverging gradient color for limits percentage. 562 | 563 | 0% = base_color (blue/green), 100% = red, >100% = dark red/purple 564 | 565 | Args: 566 | pct: Usage percentage (0-100+) 567 | base_color_rgb: Base color (blue for week, green for opus) 568 | date: The date of this cell 569 | today: Today's date 570 | 571 | Returns: 572 | RGB color tuple 573 | """ 574 | # Future days: light grey 575 | if date > today: 576 | return _hex_to_rgb(CLAUDE_LIGHT_GREY) 577 | 578 | # Past days with no data: use dark grey 579 | if pct == 0: 580 | return _hex_to_rgb(CLAUDE_DARK_GREY) 581 | 582 | # >100%: dark red/purple 583 | if pct > 100: 584 | return CLAUDE_DARK_RED_RGB 585 | 586 | # 0-100%: interpolate from base_color to red 587 | ratio = pct / 100.0 588 | 589 | r = int(base_color_rgb[0] + (CLAUDE_RED_RGB[0] - base_color_rgb[0]) * ratio) 590 | g = int(base_color_rgb[1] + (CLAUDE_RED_RGB[1] - base_color_rgb[1]) * ratio) 591 | b = int(base_color_rgb[2] + (CLAUDE_RED_RGB[2] - base_color_rgb[2]) * ratio) 592 | 593 | return (r, g, b) 594 | 595 | 596 | def _get_color( 597 | day_stats: Optional[DailyStats], 598 | max_tokens: int, 599 | date: datetime.date, 600 | today: datetime.date 601 | ) -> str: 602 | """ 603 | Get the color for a day based on activity level using smooth gradient. 604 | 605 | Args: 606 | day_stats: Statistics for the day 607 | max_tokens: Maximum tokens for scaling 608 | date: The date of this cell 609 | today: Today's date 610 | 611 | Returns: 612 | RGB color string 613 | """ 614 | # Future days: light grey 615 | if date > today: 616 | return CLAUDE_LIGHT_GREY 617 | 618 | # Past days with no activity: dark grey 619 | if not day_stats or day_stats.total_tokens == 0: 620 | return CLAUDE_DARK_GREY 621 | 622 | # Calculate intensity ratio (0.0 to 1.0) 623 | ratio = day_stats.total_tokens / max_tokens if max_tokens > 0 else 0 624 | 625 | # Apply non-linear scaling to make differences more visible 626 | ratio = ratio ** 0.5 627 | 628 | # True continuous gradient from dark grey to orange 629 | dark_grey = _hex_to_rgb(CLAUDE_DARK_GREY) 630 | r = int(dark_grey[0] + (CLAUDE_ORANGE_RGB[0] - dark_grey[0]) * ratio) 631 | g = int(dark_grey[1] + (CLAUDE_ORANGE_RGB[1] - dark_grey[1]) * ratio) 632 | b = int(dark_grey[2] + (CLAUDE_ORANGE_RGB[2] - dark_grey[2]) * ratio) 633 | 634 | return f"rgb({r},{g},{b})" 635 | 636 | 637 | def _hex_to_rgb(hex_color: str) -> tuple[int, int, int]: 638 | """Convert hex color to RGB tuple.""" 639 | hex_color = hex_color.lstrip('#') 640 | return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) 641 | 642 | 643 | def _parse_rgb(rgb_str: str) -> tuple[int, int, int]: 644 | """Parse 'rgb(r,g,b)' string to tuple.""" 645 | rgb_str = rgb_str.replace('rgb(', '').replace(')', '') 646 | return tuple(map(int, rgb_str.split(','))) 647 | 648 | 649 | def _generate_clawd_svg(x: int, y: int, pixel_size: int) -> str: 650 | """ 651 | Generate SVG markup for the Claude guy (Clawd) pixel art icon. 652 | 653 | Based on ASCII art: 654 | ▐▛███▜▌ 655 | ▝▜█████▛▘ 656 | ▘▘ ▝▝ 657 | 658 | Args: 659 | x: X position (left) 660 | y: Y position (top) 661 | pixel_size: Size of each pixel block 662 | 663 | Returns: 664 | SVG markup string 665 | """ 666 | # Colors 667 | orange = f"rgb({CLAUDE_ORANGE_RGB[0]},{CLAUDE_ORANGE_RGB[1]},{CLAUDE_ORANGE_RGB[2]})" 668 | dark_grey = CLAUDE_DARK_GREY 669 | 670 | # Define the pixel grid (1 = orange, 0 = transparent, 2 = dark grey/eye) 671 | grid = [ 672 | [1, 1, 1, 1, 1, 1, 1, 1], # Row 0: top with ears 673 | [0, 1, 2, 1, 1, 2, 1, 0], # Row 1: eyes row 674 | [0, 1, 1, 1, 1, 1, 1, 0], # Row 2: bottom of head 675 | [0, 1, 1, 0, 0, 1, 1, 0], # Row 3: legs 676 | ] 677 | 678 | svg_parts = [] 679 | for row_idx, row in enumerate(grid): 680 | for col_idx, pixel_type in enumerate(row): 681 | if pixel_type == 0: 682 | continue # Skip transparent pixels 683 | 684 | color = orange if pixel_type == 1 else dark_grey 685 | px = x + (col_idx * pixel_size) 686 | py = y + (row_idx * pixel_size) 687 | 688 | svg_parts.append( 689 | f'<rect x="{px}" y="{py}" width="{pixel_size}" height="{pixel_size}" fill="{color}"/>' 690 | ) 691 | 692 | return '\n'.join(svg_parts) 693 | 694 | 695 | def _draw_claude_guy(draw, x: int, y: int, pixel_size: int) -> int: 696 | """ 697 | Draw the Claude guy pixel art icon. 698 | 699 | Based on ASCII art: 700 | ▐▛███▜▌ 701 | ▝▜█████▛▘ 702 | ▘▘ ▝▝ 703 | 704 | Args: 705 | draw: PIL ImageDraw object 706 | x: X position (left) 707 | y: Y position (top) 708 | pixel_size: Size of each pixel block 709 | 710 | Returns: 711 | Width of the icon in pixels 712 | """ 713 | # Colors 714 | orange = (203, 123, 93) # CLAUDE_ORANGE_RGB 715 | dark_grey = (60, 60, 58) # CLAUDE_DARK_GREY 716 | 717 | # Define the pixel grid (1 = orange, 0 = transparent, 2 = dark grey/eye) 718 | # 8 pixels wide, 4 pixels tall 719 | grid = [ 720 | [1, 1, 1, 1, 1, 1, 1, 1], # Row 0: ▐▛███▜▌ - top with ears 721 | [0, 1, 2, 1, 1, 2, 1, 0], # Row 1: ▝▜█████▛▘ - eyes row 722 | [0, 1, 1, 1, 1, 1, 1, 0], # Row 2: bottom of head 723 | [0, 1, 1, 0, 0, 1, 1, 0], # Row 3: ▘▘ ▝▝ - legs 724 | ] 725 | 726 | # Draw each pixel 727 | for row_idx, row in enumerate(grid): 728 | for col_idx, pixel_type in enumerate(row): 729 | if pixel_type == 0: 730 | continue # Skip transparent pixels 731 | 732 | color = orange if pixel_type == 1 else dark_grey 733 | px = x + (col_idx * pixel_size) 734 | py = y + (row_idx * pixel_size) 735 | 736 | draw.rectangle([ 737 | px, py, 738 | px + pixel_size, py + pixel_size 739 | ], fill=color) 740 | 741 | return 8 * pixel_size # Return width 742 | #endregion 743 | ``` -------------------------------------------------------------------------------- /src/storage/snapshot_db.py: -------------------------------------------------------------------------------- ```python 1 | #region Imports 2 | import sqlite3 3 | from datetime import datetime 4 | from pathlib import Path 5 | from typing import Optional 6 | 7 | from src.models.usage_record import UsageRecord 8 | #endregion 9 | 10 | 11 | #region Constants 12 | DEFAULT_DB_PATH = Path.home() / ".claude" / "usage" / "usage_history.db" 13 | #endregion 14 | 15 | 16 | #region Functions 17 | 18 | 19 | def init_database(db_path: Path = DEFAULT_DB_PATH) -> None: 20 | """ 21 | Initialize the SQLite database for historical snapshots. 22 | 23 | Creates tables if they don't exist: 24 | - daily_snapshots: Daily aggregated usage data 25 | - usage_records: Individual usage records for detailed analysis 26 | 27 | Args: 28 | db_path: Path to the SQLite database file 29 | 30 | Raises: 31 | sqlite3.Error: If database initialization fails 32 | """ 33 | db_path.parent.mkdir(parents=True, exist_ok=True) 34 | 35 | conn = sqlite3.connect(db_path) 36 | try: 37 | cursor = conn.cursor() 38 | 39 | # Table for daily aggregated snapshots 40 | cursor.execute(""" 41 | CREATE TABLE IF NOT EXISTS daily_snapshots ( 42 | date TEXT PRIMARY KEY, 43 | total_prompts INTEGER NOT NULL, 44 | total_responses INTEGER NOT NULL, 45 | total_sessions INTEGER NOT NULL, 46 | total_tokens INTEGER NOT NULL, 47 | input_tokens INTEGER NOT NULL, 48 | output_tokens INTEGER NOT NULL, 49 | cache_creation_tokens INTEGER NOT NULL, 50 | cache_read_tokens INTEGER NOT NULL, 51 | snapshot_timestamp TEXT NOT NULL 52 | ) 53 | """) 54 | 55 | # Table for detailed usage records 56 | cursor.execute(""" 57 | CREATE TABLE IF NOT EXISTS usage_records ( 58 | id INTEGER PRIMARY KEY AUTOINCREMENT, 59 | date TEXT NOT NULL, 60 | timestamp TEXT NOT NULL, 61 | session_id TEXT NOT NULL, 62 | message_uuid TEXT NOT NULL, 63 | message_type TEXT NOT NULL, 64 | model TEXT, 65 | folder TEXT NOT NULL, 66 | git_branch TEXT, 67 | version TEXT NOT NULL, 68 | input_tokens INTEGER NOT NULL, 69 | output_tokens INTEGER NOT NULL, 70 | cache_creation_tokens INTEGER NOT NULL, 71 | cache_read_tokens INTEGER NOT NULL, 72 | total_tokens INTEGER NOT NULL, 73 | UNIQUE(session_id, message_uuid) 74 | ) 75 | """) 76 | 77 | # Index for faster date-based queries 78 | cursor.execute(""" 79 | CREATE INDEX IF NOT EXISTS idx_usage_records_date 80 | ON usage_records(date) 81 | """) 82 | 83 | # Table for usage limits snapshots 84 | cursor.execute(""" 85 | CREATE TABLE IF NOT EXISTS limits_snapshots ( 86 | timestamp TEXT PRIMARY KEY, 87 | date TEXT NOT NULL, 88 | session_pct INTEGER, 89 | week_pct INTEGER, 90 | opus_pct INTEGER, 91 | session_reset TEXT, 92 | week_reset TEXT, 93 | opus_reset TEXT 94 | ) 95 | """) 96 | 97 | # Index for faster date-based queries on limits 98 | cursor.execute(""" 99 | CREATE INDEX IF NOT EXISTS idx_limits_snapshots_date 100 | ON limits_snapshots(date) 101 | """) 102 | 103 | # Table for model pricing 104 | cursor.execute(""" 105 | CREATE TABLE IF NOT EXISTS model_pricing ( 106 | model_name TEXT PRIMARY KEY, 107 | input_price_per_mtok REAL NOT NULL, 108 | output_price_per_mtok REAL NOT NULL, 109 | cache_write_price_per_mtok REAL NOT NULL, 110 | cache_read_price_per_mtok REAL NOT NULL, 111 | last_updated TEXT NOT NULL, 112 | notes TEXT 113 | ) 114 | """) 115 | 116 | # Populate pricing data for known models 117 | pricing_data = [ 118 | # Current models 119 | ('claude-opus-4-1-20250805', 15.00, 75.00, 18.75, 1.50, 'Current flagship model'), 120 | ('claude-sonnet-4-5-20250929', 3.00, 15.00, 3.75, 0.30, 'Current balanced model (≤200K tokens)'), 121 | ('claude-haiku-3-5-20241022', 0.80, 4.00, 1.00, 0.08, 'Current fast model'), 122 | 123 | # Legacy models (approximate pricing) 124 | ('claude-sonnet-4-20250514', 3.00, 15.00, 3.75, 0.30, 'Legacy Sonnet 4'), 125 | ('claude-opus-4-20250514', 15.00, 75.00, 18.75, 1.50, 'Legacy Opus 4'), 126 | ('claude-sonnet-3-7-20250219', 3.00, 15.00, 3.75, 0.30, 'Legacy Sonnet 3.7'), 127 | 128 | # Synthetic/test models 129 | ('<synthetic>', 0.00, 0.00, 0.00, 0.00, 'Test/synthetic model - no cost'), 130 | ] 131 | 132 | timestamp = datetime.now().isoformat() 133 | for model_name, input_price, output_price, cache_write, cache_read, notes in pricing_data: 134 | cursor.execute(""" 135 | INSERT OR REPLACE INTO model_pricing ( 136 | model_name, input_price_per_mtok, output_price_per_mtok, 137 | cache_write_price_per_mtok, cache_read_price_per_mtok, 138 | last_updated, notes 139 | ) VALUES (?, ?, ?, ?, ?, ?, ?) 140 | """, (model_name, input_price, output_price, cache_write, cache_read, timestamp, notes)) 141 | 142 | conn.commit() 143 | finally: 144 | conn.close() 145 | 146 | 147 | def save_snapshot(records: list[UsageRecord], db_path: Path = DEFAULT_DB_PATH, storage_mode: str = "aggregate") -> int: 148 | """ 149 | Save usage records to the database as a snapshot. 150 | 151 | Only saves records that don't already exist (based on session_id + message_uuid). 152 | Also updates daily_snapshots table with aggregated data. 153 | 154 | Args: 155 | records: List of usage records to save 156 | db_path: Path to the SQLite database file 157 | storage_mode: "aggregate" (daily totals only) or "full" (individual records) 158 | 159 | Returns: 160 | Number of new records saved 161 | 162 | Raises: 163 | sqlite3.Error: If database operation fails 164 | """ 165 | if not records: 166 | return 0 167 | 168 | init_database(db_path) 169 | 170 | conn = sqlite3.connect(db_path) 171 | saved_count = 0 172 | 173 | try: 174 | cursor = conn.cursor() 175 | 176 | # Save individual records only if in "full" mode 177 | if storage_mode == "full": 178 | for record in records: 179 | # Get token values (0 for user messages without token_usage) 180 | input_tokens = record.token_usage.input_tokens if record.token_usage else 0 181 | output_tokens = record.token_usage.output_tokens if record.token_usage else 0 182 | cache_creation_tokens = record.token_usage.cache_creation_tokens if record.token_usage else 0 183 | cache_read_tokens = record.token_usage.cache_read_tokens if record.token_usage else 0 184 | total_tokens = record.token_usage.total_tokens if record.token_usage else 0 185 | 186 | try: 187 | cursor.execute(""" 188 | INSERT INTO usage_records ( 189 | date, timestamp, session_id, message_uuid, message_type, 190 | model, folder, git_branch, version, 191 | input_tokens, output_tokens, 192 | cache_creation_tokens, cache_read_tokens, total_tokens 193 | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 194 | """, ( 195 | record.date_key, 196 | record.timestamp.isoformat(), 197 | record.session_id, 198 | record.message_uuid, 199 | record.message_type, 200 | record.model, 201 | record.folder, 202 | record.git_branch, 203 | record.version, 204 | input_tokens, 205 | output_tokens, 206 | cache_creation_tokens, 207 | cache_read_tokens, 208 | total_tokens, 209 | )) 210 | saved_count += 1 211 | except sqlite3.IntegrityError: 212 | # Record already exists, skip it 213 | pass 214 | 215 | # Update daily snapshots (aggregate by date) 216 | if storage_mode == "full": 217 | # In full mode, only update dates that have records in usage_records 218 | # IMPORTANT: Never use REPLACE - it would delete old data when JSONL files age out 219 | # Instead, recalculate only for dates that currently have records 220 | timestamp = datetime.now().isoformat() 221 | 222 | # Get all dates that currently have usage_records 223 | cursor.execute("SELECT DISTINCT date FROM usage_records") 224 | dates_with_records = [row[0] for row in cursor.fetchall()] 225 | 226 | for date in dates_with_records: 227 | # Calculate totals for this date from usage_records 228 | cursor.execute(""" 229 | SELECT 230 | SUM(CASE WHEN message_type = 'user' THEN 1 ELSE 0 END) as total_prompts, 231 | SUM(CASE WHEN message_type = 'assistant' THEN 1 ELSE 0 END) as total_responses, 232 | COUNT(DISTINCT session_id) as total_sessions, 233 | SUM(total_tokens) as total_tokens, 234 | SUM(input_tokens) as input_tokens, 235 | SUM(output_tokens) as output_tokens, 236 | SUM(cache_creation_tokens) as cache_creation_tokens, 237 | SUM(cache_read_tokens) as cache_read_tokens 238 | FROM usage_records 239 | WHERE date = ? 240 | """, (date,)) 241 | 242 | row = cursor.fetchone() 243 | 244 | # Use INSERT OR REPLACE only for dates that currently have data 245 | # This preserves historical daily_snapshots for dates no longer in usage_records 246 | cursor.execute(""" 247 | INSERT OR REPLACE INTO daily_snapshots ( 248 | date, total_prompts, total_responses, total_sessions, total_tokens, 249 | input_tokens, output_tokens, cache_creation_tokens, 250 | cache_read_tokens, snapshot_timestamp 251 | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 252 | """, ( 253 | date, 254 | row[0] or 0, 255 | row[1] or 0, 256 | row[2] or 0, 257 | row[3] or 0, 258 | row[4] or 0, 259 | row[5] or 0, 260 | row[6] or 0, 261 | row[7] or 0, 262 | timestamp, 263 | )) 264 | else: 265 | # In aggregate mode, compute from incoming records 266 | from collections import defaultdict 267 | daily_aggregates = defaultdict(lambda: { 268 | "prompts": 0, 269 | "responses": 0, 270 | "sessions": set(), 271 | "input_tokens": 0, 272 | "output_tokens": 0, 273 | "cache_creation_tokens": 0, 274 | "cache_read_tokens": 0, 275 | "total_tokens": 0, 276 | }) 277 | 278 | for record in records: 279 | date_key = record.date_key 280 | daily_aggregates[date_key]["sessions"].add(record.session_id) 281 | 282 | # Count message types 283 | if record.is_user_prompt: 284 | daily_aggregates[date_key]["prompts"] += 1 285 | elif record.is_assistant_response: 286 | daily_aggregates[date_key]["responses"] += 1 287 | 288 | # Token usage only on assistant messages 289 | if record.token_usage: 290 | daily_aggregates[date_key]["input_tokens"] += record.token_usage.input_tokens 291 | daily_aggregates[date_key]["output_tokens"] += record.token_usage.output_tokens 292 | daily_aggregates[date_key]["cache_creation_tokens"] += record.token_usage.cache_creation_tokens 293 | daily_aggregates[date_key]["cache_read_tokens"] += record.token_usage.cache_read_tokens 294 | daily_aggregates[date_key]["total_tokens"] += record.token_usage.total_tokens 295 | 296 | # Insert or update daily snapshots 297 | timestamp = datetime.now().isoformat() 298 | for date_key, agg in daily_aggregates.items(): 299 | # Get existing data for this date to merge with new data 300 | cursor.execute(""" 301 | SELECT total_prompts, total_responses, total_sessions, total_tokens, 302 | input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens 303 | FROM daily_snapshots WHERE date = ? 304 | """, (date_key,)) 305 | existing = cursor.fetchone() 306 | 307 | if existing: 308 | # Merge with existing (add to existing totals) 309 | cursor.execute(""" 310 | INSERT OR REPLACE INTO daily_snapshots ( 311 | date, total_prompts, total_responses, total_sessions, total_tokens, 312 | input_tokens, output_tokens, cache_creation_tokens, 313 | cache_read_tokens, snapshot_timestamp 314 | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 315 | """, ( 316 | date_key, 317 | existing[0] + agg["prompts"], 318 | existing[1] + agg["responses"], 319 | existing[2] + len(agg["sessions"]), 320 | existing[3] + agg["total_tokens"], 321 | existing[4] + agg["input_tokens"], 322 | existing[5] + agg["output_tokens"], 323 | existing[6] + agg["cache_creation_tokens"], 324 | existing[7] + agg["cache_read_tokens"], 325 | timestamp, 326 | )) 327 | else: 328 | # New date, insert fresh 329 | cursor.execute(""" 330 | INSERT INTO daily_snapshots ( 331 | date, total_prompts, total_responses, total_sessions, total_tokens, 332 | input_tokens, output_tokens, cache_creation_tokens, 333 | cache_read_tokens, snapshot_timestamp 334 | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 335 | """, ( 336 | date_key, 337 | agg["prompts"], 338 | agg["responses"], 339 | len(agg["sessions"]), 340 | agg["total_tokens"], 341 | agg["input_tokens"], 342 | agg["output_tokens"], 343 | agg["cache_creation_tokens"], 344 | agg["cache_read_tokens"], 345 | timestamp, 346 | )) 347 | saved_count += 1 348 | 349 | conn.commit() 350 | finally: 351 | conn.close() 352 | 353 | return saved_count 354 | 355 | 356 | def save_limits_snapshot( 357 | session_pct: int, 358 | week_pct: int, 359 | opus_pct: int, 360 | session_reset: str, 361 | week_reset: str, 362 | opus_reset: str, 363 | db_path: Path = DEFAULT_DB_PATH 364 | ) -> None: 365 | """ 366 | Save usage limits snapshot to the database. 367 | 368 | Args: 369 | session_pct: Session usage percentage 370 | week_pct: Week (all models) usage percentage 371 | opus_pct: Opus usage percentage 372 | session_reset: Session reset time 373 | week_reset: Week reset time 374 | opus_reset: Opus reset time 375 | db_path: Path to the SQLite database file 376 | 377 | Raises: 378 | sqlite3.Error: If database operation fails 379 | """ 380 | init_database(db_path) 381 | 382 | conn = sqlite3.connect(db_path) 383 | 384 | try: 385 | cursor = conn.cursor() 386 | timestamp = datetime.now().isoformat() 387 | date = datetime.now().strftime("%Y-%m-%d") 388 | 389 | cursor.execute(""" 390 | INSERT OR REPLACE INTO limits_snapshots ( 391 | timestamp, date, session_pct, week_pct, opus_pct, 392 | session_reset, week_reset, opus_reset 393 | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 394 | """, ( 395 | timestamp, 396 | date, 397 | session_pct, 398 | week_pct, 399 | opus_pct, 400 | session_reset, 401 | week_reset, 402 | opus_reset, 403 | )) 404 | 405 | conn.commit() 406 | finally: 407 | conn.close() 408 | 409 | 410 | def load_historical_records( 411 | start_date: Optional[str] = None, 412 | end_date: Optional[str] = None, 413 | db_path: Path = DEFAULT_DB_PATH 414 | ) -> list[UsageRecord]: 415 | """ 416 | Load historical usage records from the database. 417 | 418 | Args: 419 | start_date: Optional start date in YYYY-MM-DD format (inclusive) 420 | end_date: Optional end date in YYYY-MM-DD format (inclusive) 421 | db_path: Path to the SQLite database file 422 | 423 | Returns: 424 | List of UsageRecord objects 425 | 426 | Raises: 427 | sqlite3.Error: If database query fails 428 | """ 429 | if not db_path.exists(): 430 | return [] 431 | 432 | conn = sqlite3.connect(db_path) 433 | 434 | try: 435 | cursor = conn.cursor() 436 | 437 | query = "SELECT * FROM usage_records WHERE 1=1" 438 | params = [] 439 | 440 | if start_date: 441 | query += " AND date >= ?" 442 | params.append(start_date) 443 | 444 | if end_date: 445 | query += " AND date <= ?" 446 | params.append(end_date) 447 | 448 | query += " ORDER BY date, timestamp" 449 | 450 | cursor.execute(query, params) 451 | 452 | records = [] 453 | for row in cursor.fetchall(): 454 | # Parse the row into a UsageRecord 455 | # Row columns: id, date, timestamp, session_id, message_uuid, message_type, 456 | # model, folder, git_branch, version, 457 | # input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, total_tokens 458 | from src.models.usage_record import TokenUsage 459 | 460 | # Only create TokenUsage if tokens exist (assistant messages) 461 | token_usage = None 462 | if row[10] > 0 or row[11] > 0: # if input_tokens or output_tokens exist 463 | token_usage = TokenUsage( 464 | input_tokens=row[10], 465 | output_tokens=row[11], 466 | cache_creation_tokens=row[12], 467 | cache_read_tokens=row[13], 468 | ) 469 | 470 | record = UsageRecord( 471 | timestamp=datetime.fromisoformat(row[2]), 472 | session_id=row[3], 473 | message_uuid=row[4], 474 | message_type=row[5], 475 | model=row[6], 476 | folder=row[7], 477 | git_branch=row[8], 478 | version=row[9], 479 | token_usage=token_usage, 480 | ) 481 | records.append(record) 482 | 483 | return records 484 | finally: 485 | conn.close() 486 | 487 | 488 | def get_text_analysis_stats(db_path: Path = DEFAULT_DB_PATH) -> dict: 489 | """ 490 | Analyze message content from JSONL files for text statistics. 491 | 492 | Returns: 493 | Dictionary with text analysis statistics 494 | """ 495 | from src.config.settings import get_claude_jsonl_files 496 | from src.data.jsonl_parser import parse_all_jsonl_files 497 | from src.utils.text_analysis import ( 498 | count_swears, 499 | count_perfect_phrases, 500 | count_absolutely_right_phrases, 501 | count_thank_phrases, 502 | count_please_phrases, 503 | ) 504 | 505 | try: 506 | # Get current JSONL files 507 | jsonl_files = get_claude_jsonl_files() 508 | if not jsonl_files: 509 | return { 510 | "user_swears": 0, 511 | "assistant_swears": 0, 512 | "perfect_count": 0, 513 | "absolutely_right_count": 0, 514 | "user_thanks": 0, 515 | "user_please": 0, 516 | "avg_user_prompt_chars": 0, 517 | "total_user_chars": 0, 518 | } 519 | 520 | # Parse all records 521 | records = parse_all_jsonl_files(jsonl_files) 522 | 523 | user_swears = 0 524 | assistant_swears = 0 525 | perfect_count = 0 526 | absolutely_right_count = 0 527 | user_thanks = 0 528 | user_please = 0 529 | total_user_chars = 0 530 | user_prompt_count = 0 531 | 532 | for record in records: 533 | if not record.content: 534 | continue 535 | 536 | if record.is_user_prompt: 537 | user_swears += count_swears(record.content) 538 | user_thanks += count_thank_phrases(record.content) 539 | user_please += count_please_phrases(record.content) 540 | total_user_chars += record.char_count 541 | user_prompt_count += 1 542 | elif record.is_assistant_response: 543 | assistant_swears += count_swears(record.content) 544 | perfect_count += count_perfect_phrases(record.content) 545 | absolutely_right_count += count_absolutely_right_phrases(record.content) 546 | 547 | avg_user_prompt_chars = total_user_chars / user_prompt_count if user_prompt_count > 0 else 0 548 | 549 | return { 550 | "user_swears": user_swears, 551 | "assistant_swears": assistant_swears, 552 | "perfect_count": perfect_count, 553 | "absolutely_right_count": absolutely_right_count, 554 | "user_thanks": user_thanks, 555 | "user_please": user_please, 556 | "avg_user_prompt_chars": round(avg_user_prompt_chars), 557 | "total_user_chars": total_user_chars, 558 | } 559 | except Exception: 560 | # Return zeros if analysis fails 561 | return { 562 | "user_swears": 0, 563 | "assistant_swears": 0, 564 | "perfect_count": 0, 565 | "absolutely_right_count": 0, 566 | "user_thanks": 0, 567 | "user_please": 0, 568 | "avg_user_prompt_chars": 0, 569 | "total_user_chars": 0, 570 | } 571 | 572 | 573 | def get_limits_data(db_path: Path = DEFAULT_DB_PATH) -> dict[str, dict[str, int]]: 574 | """ 575 | Get daily maximum limits percentages from the database. 576 | 577 | Returns a dictionary mapping dates to their max limits: 578 | { 579 | "2025-10-11": {"week_pct": 14, "opus_pct": 8}, 580 | ... 581 | } 582 | 583 | Args: 584 | db_path: Path to the SQLite database file 585 | 586 | Returns: 587 | Dictionary mapping dates to max week_pct and opus_pct for that day 588 | """ 589 | if not db_path.exists(): 590 | return {} 591 | 592 | conn = sqlite3.connect(db_path) 593 | 594 | try: 595 | cursor = conn.cursor() 596 | 597 | # Get max week_pct and opus_pct per day 598 | cursor.execute(""" 599 | SELECT 600 | date, 601 | MAX(week_pct) as max_week, 602 | MAX(opus_pct) as max_opus 603 | FROM limits_snapshots 604 | GROUP BY date 605 | ORDER BY date 606 | """) 607 | 608 | return { 609 | row[0]: { 610 | "week_pct": row[1] or 0, 611 | "opus_pct": row[2] or 0 612 | } 613 | for row in cursor.fetchall() 614 | } 615 | finally: 616 | conn.close() 617 | 618 | 619 | def get_latest_limits(db_path: Path = DEFAULT_DB_PATH) -> dict | None: 620 | """ 621 | Get the most recent limits snapshot from the database. 622 | 623 | Returns a dictionary with the latest limits data: 624 | { 625 | "session_pct": 14, 626 | "week_pct": 18, 627 | "opus_pct": 8, 628 | "session_reset": "Oct 16, 10:59am (Europe/Brussels)", 629 | "week_reset": "Oct 18, 3pm (Europe/Brussels)", 630 | "opus_reset": "Oct 18, 3pm (Europe/Brussels)", 631 | } 632 | 633 | Args: 634 | db_path: Path to the SQLite database file 635 | 636 | Returns: 637 | Dictionary with latest limits, or None if no data exists 638 | """ 639 | if not db_path.exists(): 640 | return None 641 | 642 | conn = sqlite3.connect(db_path) 643 | 644 | try: 645 | cursor = conn.cursor() 646 | 647 | # Get most recent limits snapshot 648 | cursor.execute(""" 649 | SELECT session_pct, week_pct, opus_pct, 650 | session_reset, week_reset, opus_reset 651 | FROM limits_snapshots 652 | ORDER BY timestamp DESC 653 | LIMIT 1 654 | """) 655 | 656 | row = cursor.fetchone() 657 | if not row: 658 | return None 659 | 660 | return { 661 | "session_pct": row[0] or 0, 662 | "week_pct": row[1] or 0, 663 | "opus_pct": row[2] or 0, 664 | "session_reset": row[3] or "", 665 | "week_reset": row[4] or "", 666 | "opus_reset": row[5] or "", 667 | } 668 | finally: 669 | conn.close() 670 | 671 | 672 | def get_database_stats(db_path: Path = DEFAULT_DB_PATH) -> dict: 673 | """ 674 | Get statistics about the historical database. 675 | 676 | Args: 677 | db_path: Path to the SQLite database file 678 | 679 | Returns: 680 | Dictionary with statistics including: 681 | - total_records, total_days, oldest_date, newest_date, newest_timestamp 682 | - total_tokens, total_prompts, total_sessions 683 | - tokens_by_model: dict of model -> token count 684 | - avg_tokens_per_session, avg_tokens_per_prompt 685 | """ 686 | if not db_path.exists(): 687 | return { 688 | "total_records": 0, 689 | "total_days": 0, 690 | "oldest_date": None, 691 | "newest_date": None, 692 | "newest_timestamp": None, 693 | "total_tokens": 0, 694 | "total_prompts": 0, 695 | "total_responses": 0, 696 | "total_sessions": 0, 697 | "tokens_by_model": {}, 698 | "cost_by_model": {}, 699 | "total_cost": 0.0, 700 | "avg_tokens_per_session": 0, 701 | "avg_tokens_per_response": 0, 702 | "avg_cost_per_session": 0.0, 703 | "avg_cost_per_response": 0.0, 704 | } 705 | 706 | conn = sqlite3.connect(db_path) 707 | 708 | try: 709 | cursor = conn.cursor() 710 | 711 | # Basic counts 712 | cursor.execute("SELECT COUNT(*) FROM usage_records") 713 | total_records = cursor.fetchone()[0] 714 | 715 | cursor.execute("SELECT COUNT(DISTINCT date) FROM usage_records") 716 | total_days = cursor.fetchone()[0] 717 | 718 | cursor.execute("SELECT MIN(date), MAX(date) FROM usage_records") 719 | oldest_date, newest_date = cursor.fetchone() 720 | 721 | # Get newest snapshot timestamp 722 | cursor.execute("SELECT MAX(snapshot_timestamp) FROM daily_snapshots") 723 | newest_timestamp = cursor.fetchone()[0] 724 | 725 | # Aggregate statistics from daily_snapshots 726 | cursor.execute(""" 727 | SELECT 728 | SUM(total_tokens) as total_tokens, 729 | SUM(total_prompts) as total_prompts, 730 | SUM(total_responses) as total_responses, 731 | SUM(total_sessions) as total_sessions 732 | FROM daily_snapshots 733 | """) 734 | row = cursor.fetchone() 735 | total_tokens = row[0] or 0 736 | total_prompts = row[1] or 0 737 | total_responses = row[2] or 0 738 | total_sessions = row[3] or 0 739 | 740 | # Tokens by model (only available if usage_records exist) 741 | tokens_by_model = {} 742 | if total_records > 0: 743 | cursor.execute(""" 744 | SELECT model, SUM(total_tokens) as tokens 745 | FROM usage_records 746 | GROUP BY model 747 | ORDER BY tokens DESC 748 | """) 749 | tokens_by_model = {row[0]: row[1] for row in cursor.fetchall() if row[0]} 750 | 751 | # Calculate costs by joining with pricing table 752 | total_cost = 0.0 753 | cost_by_model = {} 754 | 755 | if total_records > 0: 756 | cursor.execute(""" 757 | SELECT 758 | ur.model, 759 | SUM(ur.input_tokens) as total_input, 760 | SUM(ur.output_tokens) as total_output, 761 | SUM(ur.cache_creation_tokens) as total_cache_write, 762 | SUM(ur.cache_read_tokens) as total_cache_read, 763 | mp.input_price_per_mtok, 764 | mp.output_price_per_mtok, 765 | mp.cache_write_price_per_mtok, 766 | mp.cache_read_price_per_mtok 767 | FROM usage_records ur 768 | LEFT JOIN model_pricing mp ON ur.model = mp.model_name 769 | WHERE ur.model IS NOT NULL 770 | GROUP BY ur.model 771 | """) 772 | 773 | for row in cursor.fetchall(): 774 | model = row[0] 775 | input_tokens = row[1] or 0 776 | output_tokens = row[2] or 0 777 | cache_write_tokens = row[3] or 0 778 | cache_read_tokens = row[4] or 0 779 | 780 | # Pricing per million tokens 781 | input_price = row[5] or 0.0 782 | output_price = row[6] or 0.0 783 | cache_write_price = row[7] or 0.0 784 | cache_read_price = row[8] or 0.0 785 | 786 | # Calculate cost in dollars 787 | model_cost = ( 788 | (input_tokens / 1_000_000) * input_price + 789 | (output_tokens / 1_000_000) * output_price + 790 | (cache_write_tokens / 1_000_000) * cache_write_price + 791 | (cache_read_tokens / 1_000_000) * cache_read_price 792 | ) 793 | 794 | cost_by_model[model] = model_cost 795 | total_cost += model_cost 796 | 797 | # Calculate averages 798 | avg_tokens_per_session = total_tokens / total_sessions if total_sessions > 0 else 0 799 | avg_tokens_per_response = total_tokens / total_responses if total_responses > 0 else 0 800 | avg_cost_per_session = total_cost / total_sessions if total_sessions > 0 else 0 801 | avg_cost_per_response = total_cost / total_responses if total_responses > 0 else 0 802 | 803 | return { 804 | "total_records": total_records, 805 | "total_days": total_days, 806 | "oldest_date": oldest_date, 807 | "newest_date": newest_date, 808 | "newest_timestamp": newest_timestamp, 809 | "total_tokens": total_tokens, 810 | "total_prompts": total_prompts, 811 | "total_responses": total_responses, 812 | "total_sessions": total_sessions, 813 | "tokens_by_model": tokens_by_model, 814 | "cost_by_model": cost_by_model, 815 | "total_cost": total_cost, 816 | "avg_tokens_per_session": round(avg_tokens_per_session), 817 | "avg_tokens_per_response": round(avg_tokens_per_response), 818 | "avg_cost_per_session": round(avg_cost_per_session, 2), 819 | "avg_cost_per_response": round(avg_cost_per_response, 4), 820 | } 821 | finally: 822 | conn.close() 823 | #endregion 824 | ```