This is page 2 of 2. Use http://codebase.md/data-goblin/claude-goblin?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .gitignore ├── .python-version ├── CHANGELOG.md ├── docs │ ├── commands.md │ ├── images │ │ ├── dashboard.png │ │ ├── heatmap.png │ │ └── status-bar.png │ └── versions │ ├── 0.1.0.md │ ├── 0.1.1.md │ ├── 0.1.2.md │ └── 0.1.3.md ├── LICENSE ├── pyproject.toml ├── README.md └── src ├── __init__.py ├── aggregation │ ├── __init__.py │ ├── daily_stats.py │ └── usage_limits.py ├── cli.py ├── commands │ ├── __init__.py │ ├── delete_usage.py │ ├── export.py │ ├── help.py │ ├── limits.py │ ├── restore_backup.py │ ├── stats.py │ ├── status_bar.py │ ├── update_usage.py │ └── usage.py ├── config │ ├── __init__.py │ ├── settings.py │ └── user_config.py ├── data │ ├── __init__.py │ └── jsonl_parser.py ├── hooks │ ├── __init__.py │ ├── audio_tts.py │ ├── audio.py │ ├── manager.py │ ├── png.py │ ├── scripts │ │ └── audio_tts_hook.sh │ └── usage.py ├── models │ ├── __init__.py │ └── usage_record.py ├── storage │ ├── __init__.py │ └── snapshot_db.py ├── utils │ ├── __init__.py │ ├── _system.py │ └── text_analysis.py └── visualization ├── __init__.py ├── activity_graph.py ├── dashboard.py ├── export.py └── usage_bars.py ``` # Files -------------------------------------------------------------------------------- /src/visualization/export.py: -------------------------------------------------------------------------------- ```python #region Imports from datetime import datetime, timedelta from pathlib import Path from typing import Optional from src.aggregation.daily_stats import AggregatedStats, DailyStats #endregion #region Constants # Claude UI color scheme CLAUDE_BG = "#262624" CLAUDE_TEXT = "#FAF9F5" CLAUDE_TEXT_SECONDARY = "#C2C0B7" CLAUDE_DARK_GREY = "#3C3C3A" # Past days with no activity CLAUDE_LIGHT_GREY = "#6B6B68" # Future days CLAUDE_ORANGE_RGB = (203, 123, 93) # #CB7B5D # Diverging gradients for limits (matching orange tone) CLAUDE_BLUE_RGB = (93, 150, 203) # Blue with similar tone to orange CLAUDE_GREEN_RGB = (93, 203, 123) # Green with similar tone to orange CLAUDE_RED_RGB = (203, 93, 93) # Red for 100% CLAUDE_DARK_RED_RGB = (120, 40, 80) # Dark red/purple for >100% # Export at higher resolution for sharp output SCALE_FACTOR = 3 # 3x resolution CELL_SIZE = 12 * SCALE_FACTOR CELL_GAP = 3 * SCALE_FACTOR CELL_TOTAL = CELL_SIZE + CELL_GAP #endregion #region Functions def export_heatmap_svg( stats: AggregatedStats, output_path: Path, title: Optional[str] = None, year: Optional[int] = None ) -> None: """ Export the activity heatmap as an SVG file. Args: stats: Aggregated statistics to visualize output_path: Path where SVG file will be saved title: Optional title for the graph year: Year to display (defaults to current year) Raises: IOError: If file cannot be written """ # Show full year: Jan 1 to Dec 31 today = datetime.now().date() display_year = year if year is not None else today.year start_date = datetime(display_year, 1, 1).date() end_date = datetime(display_year, 12, 31).date() # Build weeks structure jan1_day = (start_date.weekday() + 1) % 7 weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] = [] current_week: list[tuple[Optional[DailyStats], Optional[datetime.date]]] = [] # Pad first week with None for _ in range(jan1_day): current_week.append((None, None)) # Add all days from Jan 1 to Dec 31 current_date = start_date while current_date <= end_date: date_key = current_date.strftime("%Y-%m-%d") day_stats = stats.daily_stats.get(date_key) current_week.append((day_stats, current_date)) if len(current_week) == 7: weeks.append(current_week) current_week = [] current_date += timedelta(days=1) # Pad final week with None if current_week: while len(current_week) < 7: current_week.append((None, None)) weeks.append(current_week) # Calculate dimensions num_weeks = len(weeks) width = (num_weeks * CELL_TOTAL) + 120 # Extra space for labels height = (7 * CELL_TOTAL) + 80 # Extra space for title and legend # Calculate max tokens for scaling max_tokens = max( (s.total_tokens for s in stats.daily_stats.values()), default=1 ) if stats.daily_stats else 1 # Generate SVG with dynamic title default_title = f"Your Claude Code activity in {display_year}" svg = _generate_svg(weeks, width, height, max_tokens, title or default_title) # Write to file output_path.write_text(svg, encoding="utf-8") def export_heatmap_png( stats: AggregatedStats, output_path: Path, limits_data: Optional[dict[str, dict[str, int]]] = None, title: Optional[str] = None, year: Optional[int] = None, tracking_mode: str = "both" ) -> None: """ Export activity heatmaps as a PNG file: tokens, week %, and/or opus %. Requires Pillow: pip install pillow Args: stats: Aggregated statistics to visualize output_path: Path where PNG file will be saved limits_data: Dictionary mapping dates to {"week_pct": int, "opus_pct": int} title: Optional title for the graph year: Year to display (defaults to current year) tracking_mode: One of "both", "tokens", or "limits" Raises: ImportError: If Pillow is not installed IOError: If file cannot be written """ if limits_data is None: limits_data = {} try: from PIL import Image, ImageDraw, ImageFont except ImportError: raise ImportError( "PNG export requires Pillow. " "Install with: pip install pillow" ) # Build weeks structure (same as SVG) today = datetime.now().date() display_year = year if year is not None else today.year start_date = datetime(display_year, 1, 1).date() end_date = datetime(display_year, 12, 31).date() jan1_day = (start_date.weekday() + 1) % 7 weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]] = [] current_week: list[tuple[Optional[DailyStats], Optional[datetime.date]]] = [] for _ in range(jan1_day): current_week.append((None, None)) current_date = start_date while current_date <= end_date: date_key = current_date.strftime("%Y-%m-%d") day_stats = stats.daily_stats.get(date_key) current_week.append((day_stats, current_date)) if len(current_week) == 7: weeks.append(current_week) current_week = [] current_date += timedelta(days=1) if current_week: while len(current_week) < 7: current_week.append((None, None)) weeks.append(current_week) # Calculate dimensions based on tracking mode num_weeks = len(weeks) # Base grid dimensions (one heatmap) grid_width = num_weeks * CELL_TOTAL grid_height = 7 * CELL_TOTAL # Layout: Vertical stack with titles and legends for each base_padding = int(40 * SCALE_FACTOR * 0.66) day_label_space = 35 * SCALE_FACTOR heatmap_vertical_gap = 40 * SCALE_FACTOR # Gap between vertically stacked heatmaps heatmap_title_space = 20 * SCALE_FACTOR # Space for individual heatmap titles month_label_space = 12 * SCALE_FACTOR # Space for month labels above each grid legend_height = CELL_SIZE + (8 * SCALE_FACTOR) # Legend squares + small buffer # Main title at the top main_title_height = 20 * SCALE_FACTOR main_title_to_first_heatmap = 25 * SCALE_FACTOR # Each heatmap section includes: title + month labels + grid + legend single_heatmap_section_height = heatmap_title_space + month_label_space + grid_height + legend_height # Calculate number of heatmaps based on tracking mode if tracking_mode == "tokens": num_heatmaps = 1 elif tracking_mode == "limits": num_heatmaps = 2 # Week and Opus else: # "both" num_heatmaps = 3 # Total height top_padding = base_padding + main_title_height + main_title_to_first_heatmap content_height = (num_heatmaps * single_heatmap_section_height) + ((num_heatmaps - 1) * heatmap_vertical_gap) bottom_padding = base_padding width = base_padding + day_label_space + grid_width + base_padding height = top_padding + content_height + bottom_padding # Calculate max tokens max_tokens = max( (s.total_tokens for s in stats.daily_stats.values()), default=1 ) if stats.daily_stats else 1 # Create image img = Image.new('RGB', (width, height), _hex_to_rgb(CLAUDE_BG)) draw = ImageDraw.Draw(img) # Try to load a system font with scaled sizes (cross-platform) try: # Try common font paths across different systems font_paths = [ "/System/Library/Fonts/Helvetica.ttc", # macOS "C:\\Windows\\Fonts\\arial.ttf", # Windows "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", # Linux "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", # Linux alternative ] title_font = None label_font = None for font_path in font_paths: try: title_font = ImageFont.truetype(font_path, 16 * SCALE_FACTOR) label_font = ImageFont.truetype(font_path, 10 * SCALE_FACTOR) break except: continue if title_font is None: raise Exception("No system font found") except: title_font = ImageFont.load_default() label_font = ImageFont.load_default() # Calculate common X positions day_label_x = base_padding grid_x = base_padding + day_label_space # Calculate Y positions for each heatmap section dynamically heatmap_y_positions = [] current_y = top_padding for i in range(num_heatmaps): heatmap_y_positions.append(current_y) current_y += single_heatmap_section_height + heatmap_vertical_gap # Draw main title and icon at the very top title_x = base_padding title_y = base_padding pixel_size = int(SCALE_FACTOR * 4) icon_width = _draw_claude_guy(draw, title_x, title_y, pixel_size) title_text_x = title_x + icon_width + (8 * SCALE_FACTOR) default_title = f"Your Claude Code activity in {display_year}" draw.text((title_text_x, title_y), title or default_title, fill=_hex_to_rgb(CLAUDE_TEXT), font=title_font) corner_radius = 2 * SCALE_FACTOR day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] # Helper function to draw one complete heatmap section def draw_heatmap_section(section_y_start, heatmap_title, gradient_func, is_tokens=True, base_color_rgb=None, reset_info=None): # Positions within this section title_y = section_y_start month_y = title_y + heatmap_title_space grid_y = month_y + month_label_space legend_y = grid_y + grid_height + (CELL_GAP * 2) legend_square_y = legend_y - (CELL_SIZE // 4) # Parse reset date if provided reset_date = None if reset_info: import re # Parse "Oct 16, 10:59am (Europe/Brussels)" format match = re.search(r'([A-Za-z]+)\s+(\d+)', reset_info) if match: month_name = match.group(1) day = int(match.group(2)) # Convert month name to number from datetime import datetime month_num = datetime.strptime(month_name, "%b").month reset_date = datetime(display_year, month_num, day).date() # Draw heatmap title draw.text((grid_x, title_y), heatmap_title, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) # Draw day labels (vertically centered with row) for day_idx, day_name in enumerate(day_names): y = grid_y + (day_idx * CELL_TOTAL) + (CELL_SIZE // 2) draw.text((day_label_x, y), day_name, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font, anchor="lm") # left-middle anchor # Draw month labels last_month = None for week_idx, week in enumerate(weeks): for day_stats, date in week: if date is not None: month = date.month if month != last_month: x = grid_x + (week_idx * CELL_TOTAL) month_name = date.strftime("%b") draw.text((x, month_y), month_name, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) last_month = month break # Draw heatmap cells for week_idx, week in enumerate(weeks): for day_idx, (day_stats, date) in enumerate(week): if date is None: continue x = grid_x + (week_idx * CELL_TOTAL) y = grid_y + (day_idx * CELL_TOTAL) color = gradient_func(day_stats, date) draw.rounded_rectangle([x, y, x + CELL_SIZE, y + CELL_SIZE], radius=corner_radius, fill=color, outline=_hex_to_rgb(CLAUDE_BG)) # Draw orange dot on reset day if reset_date and date == reset_date: dot_radius = int(CELL_SIZE * 0.2) # 20% of cell size dot_center_x = x + (CELL_SIZE // 2) dot_center_y = y + (CELL_SIZE // 2) draw.ellipse([ dot_center_x - dot_radius, dot_center_y - dot_radius, dot_center_x + dot_radius, dot_center_y + dot_radius ], fill=CLAUDE_ORANGE_RGB) # Draw legend draw.text((grid_x, legend_y), "Less" if is_tokens else "0%", fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) text_bbox = draw.textbbox((grid_x, legend_y), "Less" if is_tokens else "0%", font=label_font) text_width = text_bbox[2] - text_bbox[0] legend_extra_gap = int(CELL_GAP * 0.3) legend_square_spacing = CELL_SIZE + CELL_GAP + legend_extra_gap squares_start = grid_x + text_width + (CELL_GAP * 2) if is_tokens: # Token legend: dark grey + orange gradient draw.rounded_rectangle([squares_start, legend_square_y, squares_start + CELL_SIZE, legend_square_y + CELL_SIZE], radius=corner_radius, fill=_hex_to_rgb(CLAUDE_DARK_GREY)) for i in range(1, 5): intensity = 0.2 + ((i - 1) / 3) * 0.8 r = int(CLAUDE_ORANGE_RGB[0] * intensity) g = int(CLAUDE_ORANGE_RGB[1] * intensity) b = int(CLAUDE_ORANGE_RGB[2] * intensity) x = squares_start + (i * legend_square_spacing) draw.rounded_rectangle([x, legend_square_y, x + CELL_SIZE, legend_square_y + CELL_SIZE], radius=corner_radius, fill=(r, g, b)) end_text = "More" else: # Limits legend: base_color → red → dark red for i in range(5): if i == 0: color = base_color_rgb elif i == 4: color = CLAUDE_DARK_RED_RGB else: ratio = i / 3.0 r = int(base_color_rgb[0] + (CLAUDE_RED_RGB[0] - base_color_rgb[0]) * ratio) g = int(base_color_rgb[1] + (CLAUDE_RED_RGB[1] - base_color_rgb[1]) * ratio) b = int(base_color_rgb[2] + (CLAUDE_RED_RGB[2] - base_color_rgb[2]) * ratio) color = (r, g, b) x = squares_start + (i * legend_square_spacing) draw.rounded_rectangle([x, legend_square_y, x + CELL_SIZE, legend_square_y + CELL_SIZE], radius=corner_radius, fill=color) end_text = "100%+" more_x = squares_start + (5 * legend_square_spacing) + CELL_GAP draw.text((more_x, legend_y), end_text, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) # Add reset dot legend if applicable if reset_info: reset_legend_x = more_x + 100 * SCALE_FACTOR # Space after "100%+" dot_radius = int(CELL_SIZE * 0.2) dot_center_x = reset_legend_x + dot_radius + (CELL_GAP) dot_center_y = legend_square_y + (CELL_SIZE // 2) # Draw orange dot draw.ellipse([ dot_center_x - dot_radius, dot_center_y - dot_radius, dot_center_x + dot_radius, dot_center_y + dot_radius ], fill=CLAUDE_ORANGE_RGB) # Draw label reset_text = f"Resets: {reset_info}" reset_text_x = dot_center_x + dot_radius + (CELL_GAP) draw.text((reset_text_x, legend_y), reset_text, fill=_hex_to_rgb(CLAUDE_TEXT_SECONDARY), font=label_font) # Define gradient functions for each heatmap def tokens_gradient(day_stats, date): color_str = _get_color(day_stats, max_tokens, date, today) return _parse_rgb(color_str) if color_str.startswith('rgb(') else _hex_to_rgb(color_str) def week_gradient(day_stats, date): date_key = date.strftime("%Y-%m-%d") week_pct = limits_data.get(date_key, {}).get("week_pct", 0) return _get_limits_color(week_pct, CLAUDE_BLUE_RGB, date, today) def opus_gradient(day_stats, date): date_key = date.strftime("%Y-%m-%d") opus_pct = limits_data.get(date_key, {}).get("opus_pct", 0) return _get_limits_color(opus_pct, CLAUDE_GREEN_RGB, date, today) # Get most recent reset info from limits_data week_reset_info = None opus_reset_info = None if limits_data: # Get the most recent limits snapshot to extract reset times from src.storage.snapshot_db import DEFAULT_DB_PATH import sqlite3 try: conn = sqlite3.connect(DEFAULT_DB_PATH) cursor = conn.cursor() cursor.execute("SELECT week_reset, opus_reset FROM limits_snapshots ORDER BY timestamp DESC LIMIT 1") result = cursor.fetchone() if result: week_reset_info = result[0] opus_reset_info = result[1] conn.close() except: pass # Draw heatmap sections based on tracking mode heatmap_idx = 0 if tracking_mode in ["both", "tokens"]: draw_heatmap_section(heatmap_y_positions[heatmap_idx], "Token Usage", tokens_gradient, is_tokens=True) heatmap_idx += 1 if tracking_mode in ["both", "limits"]: draw_heatmap_section(heatmap_y_positions[heatmap_idx], "Week Limit %", week_gradient, is_tokens=False, base_color_rgb=CLAUDE_BLUE_RGB, reset_info=week_reset_info) heatmap_idx += 1 draw_heatmap_section(heatmap_y_positions[heatmap_idx], "Opus Limit %", opus_gradient, is_tokens=False, base_color_rgb=CLAUDE_GREEN_RGB, reset_info=opus_reset_info) # Save image img.save(output_path, 'PNG') def _generate_svg( weeks: list[list[tuple[Optional[DailyStats], Optional[datetime.date]]]], width: int, height: int, max_tokens: int, title: str ) -> str: """ Generate SVG markup for the heatmap. Args: weeks: List of weeks with daily stats width: SVG width in pixels height: SVG height in pixels max_tokens: Maximum token count for scaling title: Title text Returns: SVG markup as a string """ svg_parts = [ f'<svg width="{width}" height="{height}" xmlns="http://www.w3.org/2000/svg">', '<style>', f' .day-cell {{ stroke: {CLAUDE_BG}; stroke-width: 1; }}', f' .month-label {{ fill: {CLAUDE_TEXT_SECONDARY}; font: 12px -apple-system, sans-serif; }}', f' .day-label {{ fill: {CLAUDE_TEXT_SECONDARY}; font: 10px -apple-system, sans-serif; }}', f' .title {{ fill: {CLAUDE_TEXT}; font: bold 16px -apple-system, sans-serif; }}', f' .legend-text {{ fill: {CLAUDE_TEXT_SECONDARY}; font: 10px -apple-system, sans-serif; }}', '</style>', f'<rect width="{width}" height="{height}" fill="{CLAUDE_BG}"/>', ] # Draw Claude guy (Clawd) icon in SVG clawd_svg = _generate_clawd_svg(10, 10, 3) svg_parts.append(clawd_svg) # Title (positioned after Clawd icon) title_x = 10 + (8 * 3) + 8 # Icon width + gap svg_parts.append(f'<text x="{title_x}" y="25" class="title">{title}</text>') # Day labels (Y-axis) day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] for day_idx, day_name in enumerate(day_names): y = 60 + (day_idx * CELL_TOTAL) + (CELL_SIZE // 2) svg_parts.append(f'<text x="5" y="{y + 4}" class="day-label" text-anchor="start">{day_name}</text>') # Month labels (X-axis) last_month = None for week_idx, week in enumerate(weeks): for day_stats, date in week: if date is not None: month = date.month if month != last_month: x = 40 + (week_idx * CELL_TOTAL) month_name = date.strftime("%b") svg_parts.append(f'<text x="{x}" y="50" class="month-label">{month_name}</text>') last_month = month break # Heatmap cells today = datetime.now().date() for week_idx, week in enumerate(weeks): for day_idx, (day_stats, date) in enumerate(week): if date is None: # Skip padding cells continue x = 40 + (week_idx * CELL_TOTAL) y = 60 + (day_idx * CELL_TOTAL) color = _get_color(day_stats, max_tokens, date, today) # Add tooltip with date and stats if day_stats and day_stats.total_tokens > 0: tooltip = f"{date}: {day_stats.total_prompts} prompts, {day_stats.total_tokens:,} tokens" elif date > today: tooltip = f"{date}: Future" else: tooltip = f"{date}: No activity" svg_parts.append(f'<rect x="{x}" y="{y}" width="{CELL_SIZE}" height="{CELL_SIZE}" fill="{color}" class="day-cell"><title>{tooltip}</title></rect>') # Legend - show gradient from dark to bright orange legend_y = height - 20 legend_x = 40 svg_parts.append(f'<text x="{legend_x}" y="{legend_y}" class="legend-text">Less</text>') # Show 5 sample cells from gradient for i in range(5): intensity = 0.2 + (i / 4) * 0.8 r = int(CLAUDE_ORANGE_RGB[0] * intensity) g = int(CLAUDE_ORANGE_RGB[1] * intensity) b = int(CLAUDE_ORANGE_RGB[2] * intensity) color = f"rgb({r},{g},{b})" x = legend_x + 35 + (i * (CELL_SIZE + 2)) svg_parts.append(f'<rect x="{x}" y="{legend_y - CELL_SIZE + 2}" width="{CELL_SIZE}" height="{CELL_SIZE}" fill="{color}" class="day-cell"/>') svg_parts.append(f'<text x="{legend_x + 35 + (5 * (CELL_SIZE + 2)) + 5}" y="{legend_y}" class="legend-text">More</text>') svg_parts.append('</svg>') return '\n'.join(svg_parts) def _get_limits_color( pct: int, base_color_rgb: tuple[int, int, int], date: datetime.date, today: datetime.date ) -> tuple[int, int, int]: """ Get diverging gradient color for limits percentage. 0% = base_color (blue/green), 100% = red, >100% = dark red/purple Args: pct: Usage percentage (0-100+) base_color_rgb: Base color (blue for week, green for opus) date: The date of this cell today: Today's date Returns: RGB color tuple """ # Future days: light grey if date > today: return _hex_to_rgb(CLAUDE_LIGHT_GREY) # Past days with no data: use dark grey if pct == 0: return _hex_to_rgb(CLAUDE_DARK_GREY) # >100%: dark red/purple if pct > 100: return CLAUDE_DARK_RED_RGB # 0-100%: interpolate from base_color to red ratio = pct / 100.0 r = int(base_color_rgb[0] + (CLAUDE_RED_RGB[0] - base_color_rgb[0]) * ratio) g = int(base_color_rgb[1] + (CLAUDE_RED_RGB[1] - base_color_rgb[1]) * ratio) b = int(base_color_rgb[2] + (CLAUDE_RED_RGB[2] - base_color_rgb[2]) * ratio) return (r, g, b) def _get_color( day_stats: Optional[DailyStats], max_tokens: int, date: datetime.date, today: datetime.date ) -> str: """ Get the color for a day based on activity level using smooth gradient. Args: day_stats: Statistics for the day max_tokens: Maximum tokens for scaling date: The date of this cell today: Today's date Returns: RGB color string """ # Future days: light grey if date > today: return CLAUDE_LIGHT_GREY # Past days with no activity: dark grey if not day_stats or day_stats.total_tokens == 0: return CLAUDE_DARK_GREY # Calculate intensity ratio (0.0 to 1.0) ratio = day_stats.total_tokens / max_tokens if max_tokens > 0 else 0 # Apply non-linear scaling to make differences more visible ratio = ratio ** 0.5 # True continuous gradient from dark grey to orange dark_grey = _hex_to_rgb(CLAUDE_DARK_GREY) r = int(dark_grey[0] + (CLAUDE_ORANGE_RGB[0] - dark_grey[0]) * ratio) g = int(dark_grey[1] + (CLAUDE_ORANGE_RGB[1] - dark_grey[1]) * ratio) b = int(dark_grey[2] + (CLAUDE_ORANGE_RGB[2] - dark_grey[2]) * ratio) return f"rgb({r},{g},{b})" def _hex_to_rgb(hex_color: str) -> tuple[int, int, int]: """Convert hex color to RGB tuple.""" hex_color = hex_color.lstrip('#') return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) def _parse_rgb(rgb_str: str) -> tuple[int, int, int]: """Parse 'rgb(r,g,b)' string to tuple.""" rgb_str = rgb_str.replace('rgb(', '').replace(')', '') return tuple(map(int, rgb_str.split(','))) def _generate_clawd_svg(x: int, y: int, pixel_size: int) -> str: """ Generate SVG markup for the Claude guy (Clawd) pixel art icon. Based on ASCII art: ▐▛███▜▌ ▝▜█████▛▘ ▘▘ ▝▝ Args: x: X position (left) y: Y position (top) pixel_size: Size of each pixel block Returns: SVG markup string """ # Colors orange = f"rgb({CLAUDE_ORANGE_RGB[0]},{CLAUDE_ORANGE_RGB[1]},{CLAUDE_ORANGE_RGB[2]})" dark_grey = CLAUDE_DARK_GREY # Define the pixel grid (1 = orange, 0 = transparent, 2 = dark grey/eye) grid = [ [1, 1, 1, 1, 1, 1, 1, 1], # Row 0: top with ears [0, 1, 2, 1, 1, 2, 1, 0], # Row 1: eyes row [0, 1, 1, 1, 1, 1, 1, 0], # Row 2: bottom of head [0, 1, 1, 0, 0, 1, 1, 0], # Row 3: legs ] svg_parts = [] for row_idx, row in enumerate(grid): for col_idx, pixel_type in enumerate(row): if pixel_type == 0: continue # Skip transparent pixels color = orange if pixel_type == 1 else dark_grey px = x + (col_idx * pixel_size) py = y + (row_idx * pixel_size) svg_parts.append( f'<rect x="{px}" y="{py}" width="{pixel_size}" height="{pixel_size}" fill="{color}"/>' ) return '\n'.join(svg_parts) def _draw_claude_guy(draw, x: int, y: int, pixel_size: int) -> int: """ Draw the Claude guy pixel art icon. Based on ASCII art: ▐▛███▜▌ ▝▜█████▛▘ ▘▘ ▝▝ Args: draw: PIL ImageDraw object x: X position (left) y: Y position (top) pixel_size: Size of each pixel block Returns: Width of the icon in pixels """ # Colors orange = (203, 123, 93) # CLAUDE_ORANGE_RGB dark_grey = (60, 60, 58) # CLAUDE_DARK_GREY # Define the pixel grid (1 = orange, 0 = transparent, 2 = dark grey/eye) # 8 pixels wide, 4 pixels tall grid = [ [1, 1, 1, 1, 1, 1, 1, 1], # Row 0: ▐▛███▜▌ - top with ears [0, 1, 2, 1, 1, 2, 1, 0], # Row 1: ▝▜█████▛▘ - eyes row [0, 1, 1, 1, 1, 1, 1, 0], # Row 2: bottom of head [0, 1, 1, 0, 0, 1, 1, 0], # Row 3: ▘▘ ▝▝ - legs ] # Draw each pixel for row_idx, row in enumerate(grid): for col_idx, pixel_type in enumerate(row): if pixel_type == 0: continue # Skip transparent pixels color = orange if pixel_type == 1 else dark_grey px = x + (col_idx * pixel_size) py = y + (row_idx * pixel_size) draw.rectangle([ px, py, px + pixel_size, py + pixel_size ], fill=color) return 8 * pixel_size # Return width #endregion ``` -------------------------------------------------------------------------------- /src/storage/snapshot_db.py: -------------------------------------------------------------------------------- ```python #region Imports import sqlite3 from datetime import datetime from pathlib import Path from typing import Optional from src.models.usage_record import UsageRecord #endregion #region Constants DEFAULT_DB_PATH = Path.home() / ".claude" / "usage" / "usage_history.db" #endregion #region Functions def init_database(db_path: Path = DEFAULT_DB_PATH) -> None: """ Initialize the SQLite database for historical snapshots. Creates tables if they don't exist: - daily_snapshots: Daily aggregated usage data - usage_records: Individual usage records for detailed analysis Args: db_path: Path to the SQLite database file Raises: sqlite3.Error: If database initialization fails """ db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db_path) try: cursor = conn.cursor() # Table for daily aggregated snapshots cursor.execute(""" CREATE TABLE IF NOT EXISTS daily_snapshots ( date TEXT PRIMARY KEY, total_prompts INTEGER NOT NULL, total_responses INTEGER NOT NULL, total_sessions INTEGER NOT NULL, total_tokens INTEGER NOT NULL, input_tokens INTEGER NOT NULL, output_tokens INTEGER NOT NULL, cache_creation_tokens INTEGER NOT NULL, cache_read_tokens INTEGER NOT NULL, snapshot_timestamp TEXT NOT NULL ) """) # Table for detailed usage records cursor.execute(""" CREATE TABLE IF NOT EXISTS usage_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, timestamp TEXT NOT NULL, session_id TEXT NOT NULL, message_uuid TEXT NOT NULL, message_type TEXT NOT NULL, model TEXT, folder TEXT NOT NULL, git_branch TEXT, version TEXT NOT NULL, input_tokens INTEGER NOT NULL, output_tokens INTEGER NOT NULL, cache_creation_tokens INTEGER NOT NULL, cache_read_tokens INTEGER NOT NULL, total_tokens INTEGER NOT NULL, UNIQUE(session_id, message_uuid) ) """) # Index for faster date-based queries cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_usage_records_date ON usage_records(date) """) # Table for usage limits snapshots cursor.execute(""" CREATE TABLE IF NOT EXISTS limits_snapshots ( timestamp TEXT PRIMARY KEY, date TEXT NOT NULL, session_pct INTEGER, week_pct INTEGER, opus_pct INTEGER, session_reset TEXT, week_reset TEXT, opus_reset TEXT ) """) # Index for faster date-based queries on limits cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_limits_snapshots_date ON limits_snapshots(date) """) # Table for model pricing cursor.execute(""" CREATE TABLE IF NOT EXISTS model_pricing ( model_name TEXT PRIMARY KEY, input_price_per_mtok REAL NOT NULL, output_price_per_mtok REAL NOT NULL, cache_write_price_per_mtok REAL NOT NULL, cache_read_price_per_mtok REAL NOT NULL, last_updated TEXT NOT NULL, notes TEXT ) """) # Populate pricing data for known models pricing_data = [ # Current models ('claude-opus-4-1-20250805', 15.00, 75.00, 18.75, 1.50, 'Current flagship model'), ('claude-sonnet-4-5-20250929', 3.00, 15.00, 3.75, 0.30, 'Current balanced model (≤200K tokens)'), ('claude-haiku-3-5-20241022', 0.80, 4.00, 1.00, 0.08, 'Current fast model'), # Legacy models (approximate pricing) ('claude-sonnet-4-20250514', 3.00, 15.00, 3.75, 0.30, 'Legacy Sonnet 4'), ('claude-opus-4-20250514', 15.00, 75.00, 18.75, 1.50, 'Legacy Opus 4'), ('claude-sonnet-3-7-20250219', 3.00, 15.00, 3.75, 0.30, 'Legacy Sonnet 3.7'), # Synthetic/test models ('<synthetic>', 0.00, 0.00, 0.00, 0.00, 'Test/synthetic model - no cost'), ] timestamp = datetime.now().isoformat() for model_name, input_price, output_price, cache_write, cache_read, notes in pricing_data: cursor.execute(""" INSERT OR REPLACE INTO model_pricing ( model_name, input_price_per_mtok, output_price_per_mtok, cache_write_price_per_mtok, cache_read_price_per_mtok, last_updated, notes ) VALUES (?, ?, ?, ?, ?, ?, ?) """, (model_name, input_price, output_price, cache_write, cache_read, timestamp, notes)) conn.commit() finally: conn.close() def save_snapshot(records: list[UsageRecord], db_path: Path = DEFAULT_DB_PATH, storage_mode: str = "aggregate") -> int: """ Save usage records to the database as a snapshot. Only saves records that don't already exist (based on session_id + message_uuid). Also updates daily_snapshots table with aggregated data. Args: records: List of usage records to save db_path: Path to the SQLite database file storage_mode: "aggregate" (daily totals only) or "full" (individual records) Returns: Number of new records saved Raises: sqlite3.Error: If database operation fails """ if not records: return 0 init_database(db_path) conn = sqlite3.connect(db_path) saved_count = 0 try: cursor = conn.cursor() # Save individual records only if in "full" mode if storage_mode == "full": for record in records: # Get token values (0 for user messages without token_usage) input_tokens = record.token_usage.input_tokens if record.token_usage else 0 output_tokens = record.token_usage.output_tokens if record.token_usage else 0 cache_creation_tokens = record.token_usage.cache_creation_tokens if record.token_usage else 0 cache_read_tokens = record.token_usage.cache_read_tokens if record.token_usage else 0 total_tokens = record.token_usage.total_tokens if record.token_usage else 0 try: cursor.execute(""" INSERT INTO usage_records ( date, timestamp, session_id, message_uuid, message_type, model, folder, git_branch, version, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, total_tokens ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( record.date_key, record.timestamp.isoformat(), record.session_id, record.message_uuid, record.message_type, record.model, record.folder, record.git_branch, record.version, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, total_tokens, )) saved_count += 1 except sqlite3.IntegrityError: # Record already exists, skip it pass # Update daily snapshots (aggregate by date) if storage_mode == "full": # In full mode, only update dates that have records in usage_records # IMPORTANT: Never use REPLACE - it would delete old data when JSONL files age out # Instead, recalculate only for dates that currently have records timestamp = datetime.now().isoformat() # Get all dates that currently have usage_records cursor.execute("SELECT DISTINCT date FROM usage_records") dates_with_records = [row[0] for row in cursor.fetchall()] for date in dates_with_records: # Calculate totals for this date from usage_records cursor.execute(""" SELECT SUM(CASE WHEN message_type = 'user' THEN 1 ELSE 0 END) as total_prompts, SUM(CASE WHEN message_type = 'assistant' THEN 1 ELSE 0 END) as total_responses, COUNT(DISTINCT session_id) as total_sessions, SUM(total_tokens) as total_tokens, SUM(input_tokens) as input_tokens, SUM(output_tokens) as output_tokens, SUM(cache_creation_tokens) as cache_creation_tokens, SUM(cache_read_tokens) as cache_read_tokens FROM usage_records WHERE date = ? """, (date,)) row = cursor.fetchone() # Use INSERT OR REPLACE only for dates that currently have data # This preserves historical daily_snapshots for dates no longer in usage_records cursor.execute(""" INSERT OR REPLACE INTO daily_snapshots ( date, total_prompts, total_responses, total_sessions, total_tokens, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, snapshot_timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( date, row[0] or 0, row[1] or 0, row[2] or 0, row[3] or 0, row[4] or 0, row[5] or 0, row[6] or 0, row[7] or 0, timestamp, )) else: # In aggregate mode, compute from incoming records from collections import defaultdict daily_aggregates = defaultdict(lambda: { "prompts": 0, "responses": 0, "sessions": set(), "input_tokens": 0, "output_tokens": 0, "cache_creation_tokens": 0, "cache_read_tokens": 0, "total_tokens": 0, }) for record in records: date_key = record.date_key daily_aggregates[date_key]["sessions"].add(record.session_id) # Count message types if record.is_user_prompt: daily_aggregates[date_key]["prompts"] += 1 elif record.is_assistant_response: daily_aggregates[date_key]["responses"] += 1 # Token usage only on assistant messages if record.token_usage: daily_aggregates[date_key]["input_tokens"] += record.token_usage.input_tokens daily_aggregates[date_key]["output_tokens"] += record.token_usage.output_tokens daily_aggregates[date_key]["cache_creation_tokens"] += record.token_usage.cache_creation_tokens daily_aggregates[date_key]["cache_read_tokens"] += record.token_usage.cache_read_tokens daily_aggregates[date_key]["total_tokens"] += record.token_usage.total_tokens # Insert or update daily snapshots timestamp = datetime.now().isoformat() for date_key, agg in daily_aggregates.items(): # Get existing data for this date to merge with new data cursor.execute(""" SELECT total_prompts, total_responses, total_sessions, total_tokens, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens FROM daily_snapshots WHERE date = ? """, (date_key,)) existing = cursor.fetchone() if existing: # Merge with existing (add to existing totals) cursor.execute(""" INSERT OR REPLACE INTO daily_snapshots ( date, total_prompts, total_responses, total_sessions, total_tokens, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, snapshot_timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( date_key, existing[0] + agg["prompts"], existing[1] + agg["responses"], existing[2] + len(agg["sessions"]), existing[3] + agg["total_tokens"], existing[4] + agg["input_tokens"], existing[5] + agg["output_tokens"], existing[6] + agg["cache_creation_tokens"], existing[7] + agg["cache_read_tokens"], timestamp, )) else: # New date, insert fresh cursor.execute(""" INSERT INTO daily_snapshots ( date, total_prompts, total_responses, total_sessions, total_tokens, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, snapshot_timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( date_key, agg["prompts"], agg["responses"], len(agg["sessions"]), agg["total_tokens"], agg["input_tokens"], agg["output_tokens"], agg["cache_creation_tokens"], agg["cache_read_tokens"], timestamp, )) saved_count += 1 conn.commit() finally: conn.close() return saved_count def save_limits_snapshot( session_pct: int, week_pct: int, opus_pct: int, session_reset: str, week_reset: str, opus_reset: str, db_path: Path = DEFAULT_DB_PATH ) -> None: """ Save usage limits snapshot to the database. Args: session_pct: Session usage percentage week_pct: Week (all models) usage percentage opus_pct: Opus usage percentage session_reset: Session reset time week_reset: Week reset time opus_reset: Opus reset time db_path: Path to the SQLite database file Raises: sqlite3.Error: If database operation fails """ init_database(db_path) conn = sqlite3.connect(db_path) try: cursor = conn.cursor() timestamp = datetime.now().isoformat() date = datetime.now().strftime("%Y-%m-%d") cursor.execute(""" INSERT OR REPLACE INTO limits_snapshots ( timestamp, date, session_pct, week_pct, opus_pct, session_reset, week_reset, opus_reset ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( timestamp, date, session_pct, week_pct, opus_pct, session_reset, week_reset, opus_reset, )) conn.commit() finally: conn.close() def load_historical_records( start_date: Optional[str] = None, end_date: Optional[str] = None, db_path: Path = DEFAULT_DB_PATH ) -> list[UsageRecord]: """ Load historical usage records from the database. Args: start_date: Optional start date in YYYY-MM-DD format (inclusive) end_date: Optional end date in YYYY-MM-DD format (inclusive) db_path: Path to the SQLite database file Returns: List of UsageRecord objects Raises: sqlite3.Error: If database query fails """ if not db_path.exists(): return [] conn = sqlite3.connect(db_path) try: cursor = conn.cursor() query = "SELECT * FROM usage_records WHERE 1=1" params = [] if start_date: query += " AND date >= ?" params.append(start_date) if end_date: query += " AND date <= ?" params.append(end_date) query += " ORDER BY date, timestamp" cursor.execute(query, params) records = [] for row in cursor.fetchall(): # Parse the row into a UsageRecord # Row columns: id, date, timestamp, session_id, message_uuid, message_type, # model, folder, git_branch, version, # input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, total_tokens from src.models.usage_record import TokenUsage # Only create TokenUsage if tokens exist (assistant messages) token_usage = None if row[10] > 0 or row[11] > 0: # if input_tokens or output_tokens exist token_usage = TokenUsage( input_tokens=row[10], output_tokens=row[11], cache_creation_tokens=row[12], cache_read_tokens=row[13], ) record = UsageRecord( timestamp=datetime.fromisoformat(row[2]), session_id=row[3], message_uuid=row[4], message_type=row[5], model=row[6], folder=row[7], git_branch=row[8], version=row[9], token_usage=token_usage, ) records.append(record) return records finally: conn.close() def get_text_analysis_stats(db_path: Path = DEFAULT_DB_PATH) -> dict: """ Analyze message content from JSONL files for text statistics. Returns: Dictionary with text analysis statistics """ from src.config.settings import get_claude_jsonl_files from src.data.jsonl_parser import parse_all_jsonl_files from src.utils.text_analysis import ( count_swears, count_perfect_phrases, count_absolutely_right_phrases, count_thank_phrases, count_please_phrases, ) try: # Get current JSONL files jsonl_files = get_claude_jsonl_files() if not jsonl_files: return { "user_swears": 0, "assistant_swears": 0, "perfect_count": 0, "absolutely_right_count": 0, "user_thanks": 0, "user_please": 0, "avg_user_prompt_chars": 0, "total_user_chars": 0, } # Parse all records records = parse_all_jsonl_files(jsonl_files) user_swears = 0 assistant_swears = 0 perfect_count = 0 absolutely_right_count = 0 user_thanks = 0 user_please = 0 total_user_chars = 0 user_prompt_count = 0 for record in records: if not record.content: continue if record.is_user_prompt: user_swears += count_swears(record.content) user_thanks += count_thank_phrases(record.content) user_please += count_please_phrases(record.content) total_user_chars += record.char_count user_prompt_count += 1 elif record.is_assistant_response: assistant_swears += count_swears(record.content) perfect_count += count_perfect_phrases(record.content) absolutely_right_count += count_absolutely_right_phrases(record.content) avg_user_prompt_chars = total_user_chars / user_prompt_count if user_prompt_count > 0 else 0 return { "user_swears": user_swears, "assistant_swears": assistant_swears, "perfect_count": perfect_count, "absolutely_right_count": absolutely_right_count, "user_thanks": user_thanks, "user_please": user_please, "avg_user_prompt_chars": round(avg_user_prompt_chars), "total_user_chars": total_user_chars, } except Exception: # Return zeros if analysis fails return { "user_swears": 0, "assistant_swears": 0, "perfect_count": 0, "absolutely_right_count": 0, "user_thanks": 0, "user_please": 0, "avg_user_prompt_chars": 0, "total_user_chars": 0, } def get_limits_data(db_path: Path = DEFAULT_DB_PATH) -> dict[str, dict[str, int]]: """ Get daily maximum limits percentages from the database. Returns a dictionary mapping dates to their max limits: { "2025-10-11": {"week_pct": 14, "opus_pct": 8}, ... } Args: db_path: Path to the SQLite database file Returns: Dictionary mapping dates to max week_pct and opus_pct for that day """ if not db_path.exists(): return {} conn = sqlite3.connect(db_path) try: cursor = conn.cursor() # Get max week_pct and opus_pct per day cursor.execute(""" SELECT date, MAX(week_pct) as max_week, MAX(opus_pct) as max_opus FROM limits_snapshots GROUP BY date ORDER BY date """) return { row[0]: { "week_pct": row[1] or 0, "opus_pct": row[2] or 0 } for row in cursor.fetchall() } finally: conn.close() def get_latest_limits(db_path: Path = DEFAULT_DB_PATH) -> dict | None: """ Get the most recent limits snapshot from the database. Returns a dictionary with the latest limits data: { "session_pct": 14, "week_pct": 18, "opus_pct": 8, "session_reset": "Oct 16, 10:59am (Europe/Brussels)", "week_reset": "Oct 18, 3pm (Europe/Brussels)", "opus_reset": "Oct 18, 3pm (Europe/Brussels)", } Args: db_path: Path to the SQLite database file Returns: Dictionary with latest limits, or None if no data exists """ if not db_path.exists(): return None conn = sqlite3.connect(db_path) try: cursor = conn.cursor() # Get most recent limits snapshot cursor.execute(""" SELECT session_pct, week_pct, opus_pct, session_reset, week_reset, opus_reset FROM limits_snapshots ORDER BY timestamp DESC LIMIT 1 """) row = cursor.fetchone() if not row: return None return { "session_pct": row[0] or 0, "week_pct": row[1] or 0, "opus_pct": row[2] or 0, "session_reset": row[3] or "", "week_reset": row[4] or "", "opus_reset": row[5] or "", } finally: conn.close() def get_database_stats(db_path: Path = DEFAULT_DB_PATH) -> dict: """ Get statistics about the historical database. Args: db_path: Path to the SQLite database file Returns: Dictionary with statistics including: - total_records, total_days, oldest_date, newest_date, newest_timestamp - total_tokens, total_prompts, total_sessions - tokens_by_model: dict of model -> token count - avg_tokens_per_session, avg_tokens_per_prompt """ if not db_path.exists(): return { "total_records": 0, "total_days": 0, "oldest_date": None, "newest_date": None, "newest_timestamp": None, "total_tokens": 0, "total_prompts": 0, "total_responses": 0, "total_sessions": 0, "tokens_by_model": {}, "cost_by_model": {}, "total_cost": 0.0, "avg_tokens_per_session": 0, "avg_tokens_per_response": 0, "avg_cost_per_session": 0.0, "avg_cost_per_response": 0.0, } conn = sqlite3.connect(db_path) try: cursor = conn.cursor() # Basic counts cursor.execute("SELECT COUNT(*) FROM usage_records") total_records = cursor.fetchone()[0] cursor.execute("SELECT COUNT(DISTINCT date) FROM usage_records") total_days = cursor.fetchone()[0] cursor.execute("SELECT MIN(date), MAX(date) FROM usage_records") oldest_date, newest_date = cursor.fetchone() # Get newest snapshot timestamp cursor.execute("SELECT MAX(snapshot_timestamp) FROM daily_snapshots") newest_timestamp = cursor.fetchone()[0] # Aggregate statistics from daily_snapshots cursor.execute(""" SELECT SUM(total_tokens) as total_tokens, SUM(total_prompts) as total_prompts, SUM(total_responses) as total_responses, SUM(total_sessions) as total_sessions FROM daily_snapshots """) row = cursor.fetchone() total_tokens = row[0] or 0 total_prompts = row[1] or 0 total_responses = row[2] or 0 total_sessions = row[3] or 0 # Tokens by model (only available if usage_records exist) tokens_by_model = {} if total_records > 0: cursor.execute(""" SELECT model, SUM(total_tokens) as tokens FROM usage_records GROUP BY model ORDER BY tokens DESC """) tokens_by_model = {row[0]: row[1] for row in cursor.fetchall() if row[0]} # Calculate costs by joining with pricing table total_cost = 0.0 cost_by_model = {} if total_records > 0: cursor.execute(""" SELECT ur.model, SUM(ur.input_tokens) as total_input, SUM(ur.output_tokens) as total_output, SUM(ur.cache_creation_tokens) as total_cache_write, SUM(ur.cache_read_tokens) as total_cache_read, mp.input_price_per_mtok, mp.output_price_per_mtok, mp.cache_write_price_per_mtok, mp.cache_read_price_per_mtok FROM usage_records ur LEFT JOIN model_pricing mp ON ur.model = mp.model_name WHERE ur.model IS NOT NULL GROUP BY ur.model """) for row in cursor.fetchall(): model = row[0] input_tokens = row[1] or 0 output_tokens = row[2] or 0 cache_write_tokens = row[3] or 0 cache_read_tokens = row[4] or 0 # Pricing per million tokens input_price = row[5] or 0.0 output_price = row[6] or 0.0 cache_write_price = row[7] or 0.0 cache_read_price = row[8] or 0.0 # Calculate cost in dollars model_cost = ( (input_tokens / 1_000_000) * input_price + (output_tokens / 1_000_000) * output_price + (cache_write_tokens / 1_000_000) * cache_write_price + (cache_read_tokens / 1_000_000) * cache_read_price ) cost_by_model[model] = model_cost total_cost += model_cost # Calculate averages avg_tokens_per_session = total_tokens / total_sessions if total_sessions > 0 else 0 avg_tokens_per_response = total_tokens / total_responses if total_responses > 0 else 0 avg_cost_per_session = total_cost / total_sessions if total_sessions > 0 else 0 avg_cost_per_response = total_cost / total_responses if total_responses > 0 else 0 return { "total_records": total_records, "total_days": total_days, "oldest_date": oldest_date, "newest_date": newest_date, "newest_timestamp": newest_timestamp, "total_tokens": total_tokens, "total_prompts": total_prompts, "total_responses": total_responses, "total_sessions": total_sessions, "tokens_by_model": tokens_by_model, "cost_by_model": cost_by_model, "total_cost": total_cost, "avg_tokens_per_session": round(avg_tokens_per_session), "avg_tokens_per_response": round(avg_tokens_per_response), "avg_cost_per_session": round(avg_cost_per_session, 2), "avg_cost_per_response": round(avg_cost_per_response, 4), } finally: conn.close() #endregion ```