This is page 1 of 2. Use http://codebase.md/mryanmyn/task-manager-mcp?page={x} to view the full context. # Directory Structure ``` ├── app │ ├── __init__.py │ ├── api │ │ ├── __init__.py │ │ ├── api.py │ │ └── cli.py │ ├── core │ │ ├── __init__.py │ │ ├── plan_manager.py │ │ └── task_manager.py │ └── ui │ ├── __init__.py │ ├── input_handler.py │ ├── terminal_ui.py │ └── ui_components.py ├── img.png ├── main.py ├── mcp_guidelines │ ├── __init__.py │ └── llms_full.txt ├── mcp_server_fixed.py ├── MCP-README.md ├── pyproject.toml ├── README.md ├── setup.py └── tests ├── __init__.py └── test_mcp_server.py ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Terminal Task Tracker A terminal-based task tracking application with a three-pane layout for managing tasks and project plans. # Image  ## Features - Three-pane terminal UI: - Task list (top left) - Task details (top right) - Project plan (bottom, full width) - Task management: - Create, view, edit, and delete tasks - Set priorities and status - Add detailed descriptions - Project plan management: - Define high-level project steps - Track step completion - Reorder steps - Complete API for programmatic access - Command-line interface for scripting - Data persistence ## Installation ```bash # Clone the repository git clone https://github.com/yourusername/terminal-task-tracker.git cd terminal-task-tracker # Install dependencies pip install -e . ``` ## Usage ### Terminal UI To start the terminal UI: ```bash python -m main.py ``` Key bindings: - `Tab`: Cycle between windows - `Up/Down`: Navigate lists - `Enter`: Select task (in task list) - `n`: New item (in task list or plan) - `e`: Edit item - `d`: Delete item - `Space`: Toggle completion (in plan) - `Esc`: Exit ### Command-line Interface The CLI provides access to all functionality: ```bash # List all tasks python -m app.api.cli task list # Add a new task python -m app.api.cli task add "Implement feature X" --description "Details about feature X" --priority 2 # Mark a plan step as completed python -m app.api.cli plan toggle STEP_ID # Export data to JSON python -m app.api.cli export data.json ``` ### API Usage ```python from app.core.task_manager import TaskManager from app.core.plan_manager import PlanManager from app.api.api import TaskTrackerAPI # Initialize managers task_manager = TaskManager("tasks.json") plan_manager = PlanManager("plan.json") # Create API api = TaskTrackerAPI(task_manager, plan_manager) # Add a task task = api.add_task("Implement feature X", "Details about feature X", priority=2) # Add a plan step step = api.add_plan_step("Design architecture for shared operations module") # Mark step as completed api.toggle_plan_step(step["id"]) # Save data api.save_all() ``` ## Project Structure ``` terminal-task-tracker/ ├── app/ │ ├── __init__.py │ ├── core/ # Business logic │ │ ├── __init__.py │ │ ├── task_manager.py │ │ └── plan_manager.py │ ├── ui/ # Terminal UI │ │ ├── __init__.py │ │ ├── terminal_ui.py │ │ ├── ui_components.py │ │ └── input_handler.py │ └── api/ # API and CLI │ ├── __init__.py │ ├── api.py │ └── cli.py ├── main.py # Main application entry point └── README.md ``` ## Data Storage By default, data is stored in the `~/.tasktracker` directory: - `tasks.json`: Tasks data - `plan.json`: Project plan data - `notes.json`: Notes data ## License MIT ``` -------------------------------------------------------------------------------- /mcp_guidelines/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /app/api/__init__.py: -------------------------------------------------------------------------------- ```python # API and CLI module ``` -------------------------------------------------------------------------------- /app/ui/__init__.py: -------------------------------------------------------------------------------- ```python # Terminal UI module ``` -------------------------------------------------------------------------------- /app/core/__init__.py: -------------------------------------------------------------------------------- ```python # Core business logic module ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python """ Tests for TaskTracker MCP package. """ ``` -------------------------------------------------------------------------------- /app/__init__.py: -------------------------------------------------------------------------------- ```python # Terminal Task Tracker application package ``` -------------------------------------------------------------------------------- /setup.py: -------------------------------------------------------------------------------- ```python from setuptools import setup, find_packages setup( name="terminal-task-tracker", version="0.1.0", description="A terminal-based task tracking application with a three-pane layout", author="Your Name", author_email="[email protected]", packages=find_packages(), entry_points={ "console_scripts": [ "tasktracker=main:main", ], }, python_requires=">=3.6", classifiers=[ "Development Status :: 3 - Alpha", "Environment :: Console :: Curses", "Intended Audience :: Developers", "Intended Audience :: End Users/Desktop", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Topic :: Office/Business :: Scheduling", "Topic :: Utilities" ], ) ``` -------------------------------------------------------------------------------- /main.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Terminal Task Tracker - Main Application Entry Point A terminal-based task tracking application with a three-pane layout for managing tasks and project plans. """ import os import sys from app.core.task_manager import TaskManager from app.core.plan_manager import PlanManager from app.api.api import TaskTrackerAPI from app.ui.terminal_ui import TerminalUI def main(): """Main application entry point.""" try: # Create data directory if it doesn't exist home_dir = os.path.expanduser("~") data_dir = os.path.join(home_dir, ".tasktracker") os.makedirs(data_dir, exist_ok=True) # Initialize managers task_manager = TaskManager() plan_manager = PlanManager() # Create API api = TaskTrackerAPI(task_manager, plan_manager) # Run terminal UI ui = TerminalUI(api) ui.run() # Save data on exit api.save_all() return 0 except Exception as e: print(f"Error: {str(e)}", file=sys.stderr) return 1 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [project] name = "tasktracker-mcp" version = "0.1.0" description = "A terminal-based task tracking application with MCP integration" readme = "README.md" authors = [ {name = "Your Name", email = "[email protected]"}, ] license = {text = "MIT"} requires-python = ">=3.8" classifiers = [ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] dependencies = [ "mcp>=0.1.0", ] [project.optional-dependencies] dev = [ "ruff>=0", "pytest>=7.0.0", "pytest-cov>=4.0.0", "pyright>=0", ] [tool.hatch.build.targets.wheel] packages = ["app"] [tool.ruff] line-length = 88 target-version = "py38" [tool.ruff.lint] select = [ "E", # pycodestyle errors "F", # pyflakes "I", # isort ] [tool.pyright] include = ["app", "mcp_server.py"] typeCheckingMode = "basic" reportMissingTypeStubs = false [tool.pytest.ini_options] testpaths = ["tests"] python_files = "test_*.py" [project.scripts] tasktracker-mcp = "mcp_server:main" ``` -------------------------------------------------------------------------------- /tests/test_mcp_server.py: -------------------------------------------------------------------------------- ```python """ Tests for the TaskTracker MCP server. """ import json import pytest from unittest.mock import MagicMock, patch # Import the MCP server import sys import os # Add the parent directory to the path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import mcp_server @pytest.fixture def mock_api(): """Create a mock TaskTrackerAPI for testing.""" mock = MagicMock() # Set up return values for get_all_tasks mock.get_all_tasks.return_value = [ { "id": "test-task-1", "title": "Test Task 1", "description": "Description for test task 1", "priority": 1, "status": "not_started", "created_at": "2025-04-08T00:00:00", "updated_at": "2025-04-08T00:00:00" } ] # Set up return values for get_task mock.get_task.return_value = { "id": "test-task-1", "title": "Test Task 1", "description": "Description for test task 1", "priority": 1, "status": "not_started", "created_at": "2025-04-08T00:00:00", "updated_at": "2025-04-08T00:00:00" } # Set up return values for add_task mock.add_task.return_value = { "id": "new-task-1", "title": "New Task", "description": "Description for new task", "priority": 1, "status": "not_started", "created_at": "2025-04-08T00:00:00", "updated_at": "2025-04-08T00:00:00" } return mock @patch("mcp_server.api") def test_get_all_tasks(mock_api_module, mock_api): """Test the get_all_tasks resource.""" # Set the mock API mock_api_module.get_all_tasks.return_value = mock_api.get_all_tasks.return_value # Call the function result = mcp_server.get_all_tasks() # Assert the result expected = json.dumps(mock_api.get_all_tasks.return_value, indent=2) assert result == expected mock_api_module.get_all_tasks.assert_called_once() @patch("mcp_server.api") def test_get_task(mock_api_module, mock_api): """Test the get_task resource.""" # Set the mock API mock_api_module.get_task.return_value = mock_api.get_task.return_value # Call the function result = mcp_server.get_task("test-task-1") # Assert the result expected = json.dumps(mock_api.get_task.return_value, indent=2) assert result == expected mock_api_module.get_task.assert_called_once_with("test-task-1") @patch("mcp_server.api") def test_add_task(mock_api_module, mock_api): """Test the add_task tool.""" # Set the mock API mock_api_module.add_task.return_value = mock_api.add_task.return_value # Call the function result = mcp_server.add_task( title="New Task", description="Description for new task", priority=1, status="not_started" ) # Assert the result assert result == mock_api.add_task.return_value mock_api_module.add_task.assert_called_once_with( "New Task", "Description for new task", 1, "not_started" ) mock_api_module.save_all.assert_called_once() def test_add_task_prompt(): """Test the add_task_prompt.""" result = mcp_server.add_task_prompt( title="Test Task", description="Task description" ) assert "Test Task" in result assert "Task description" in result def test_create_plan_prompt(): """Test the create_plan_prompt.""" result = mcp_server.create_plan_prompt() assert "project plan" in result.lower() assert "clear steps" in result.lower() ``` -------------------------------------------------------------------------------- /app/core/task_manager.py: -------------------------------------------------------------------------------- ```python import json import os import uuid from datetime import datetime class TaskManager: def __init__(self, file_path=None, notes_file_path=None): """Initialize the TaskManager with optional file paths.""" home_dir = os.path.expanduser("~") task_dir = os.path.join(home_dir, ".tasktracker") os.makedirs(task_dir, exist_ok=True) if file_path is None: file_path = os.path.join(task_dir, "tasks.json") if notes_file_path is None: notes_file_path = os.path.join(task_dir, "notes.txt") self.file_path = file_path self.notes_file_path = notes_file_path self.tasks = self._load_tasks() self.notes = self._load_notes() def _load_tasks(self): """Load tasks from the file or return an empty list if file doesn't exist.""" if os.path.exists(self.file_path): try: with open(self.file_path, 'r') as f: return json.load(f) except json.JSONDecodeError: return [] return [] def reload_tasks(self): """Reload tasks from file (for external changes like MCP).""" self.tasks = self._load_tasks() def _load_notes(self): """Load notes from file or return empty string if file doesn't exist.""" if os.path.exists(self.notes_file_path): try: with open(self.notes_file_path, 'r') as f: return f.read() except: return "" return "" def reload_notes(self): """Reload notes from file (for external changes like MCP).""" self.notes = self._load_notes() def save_tasks(self): """Save tasks to the file.""" with open(self.file_path, 'w') as f: json.dump(self.tasks, f, indent=2) def save_notes(self, notes_text): """Save notes to the file.""" self.notes = notes_text with open(self.notes_file_path, 'w') as f: f.write(notes_text) def get_notes(self): """Get the current notes.""" return self.notes def get_all_tasks(self): """Return all tasks.""" return self.tasks def get_task(self, task_id): """Get a task by ID.""" for task in self.tasks: if task["id"] == task_id: return task return None def add_task(self, title, description="", priority=1, status="not_started"): """Add a new task.""" # Validate status valid_statuses = ["not_started", "in_progress", "completed"] if status not in valid_statuses: status = "not_started" task = { "id": str(uuid.uuid4()), "title": title, "description": description, "priority": priority, "status": status, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } self.tasks.append(task) self.save_tasks() return task def update_task(self, task_id, **kwargs): """Update a task by ID.""" task = self.get_task(task_id) if task: for key, value in kwargs.items(): if key in task and key not in ["id", "created_at"]: task[key] = value task["updated_at"] = datetime.now().isoformat() self.save_tasks() return task return None def delete_task(self, task_id): """Delete a task by ID.""" task = self.get_task(task_id) if task: self.tasks.remove(task) self.save_tasks() return True return False ``` -------------------------------------------------------------------------------- /app/api/api.py: -------------------------------------------------------------------------------- ```python from app.core.task_manager import TaskManager from app.core.plan_manager import PlanManager class TaskTrackerAPI: def __init__(self, task_manager=None, plan_manager=None): """Initialize the TaskTrackerAPI with task and plan managers.""" self.task_manager = task_manager or TaskManager() self.plan_manager = plan_manager or PlanManager() # Task methods def get_all_tasks(self): """Get all tasks.""" return self.task_manager.get_all_tasks() def get_task(self, task_id): """Get a task by ID.""" return self.task_manager.get_task(task_id) def add_task(self, title, description="", priority=1, status="pending"): """Add a new task.""" return self.task_manager.add_task(title, description, priority, status) def update_task(self, task_id, **kwargs): """Update a task by ID.""" return self.task_manager.update_task(task_id, **kwargs) def delete_task(self, task_id): """Delete a task by ID.""" return self.task_manager.delete_task(task_id) # Plan methods def get_all_plan_steps(self): """Get all plan steps.""" return self.plan_manager.get_all_steps() def get_plan_step(self, step_id): """Get a plan step by ID.""" return self.plan_manager.get_step(step_id) def add_plan_step(self, name, description="", details="", order=None, completed=False): """Add a new plan step.""" return self.plan_manager.add_step(name, description, details, order, completed) def update_plan_step(self, step_id, **kwargs): """Update a plan step by ID.""" return self.plan_manager.update_step(step_id, **kwargs) def toggle_plan_step(self, step_id): """Toggle the completion status of a plan step.""" return self.plan_manager.toggle_step(step_id) def delete_plan_step(self, step_id): """Delete a plan step by ID.""" return self.plan_manager.delete_step(step_id) def reorder_plan_steps(self): """Reorder plan steps to ensure consistent ordering.""" return self.plan_manager.reorder_steps() # Notes methods def get_notes(self): """Get the notes.""" return self.task_manager.get_notes() def save_notes(self, notes_text): """Save notes.""" self.task_manager.save_notes(notes_text) return True # Data management def save_all(self): """Save all data to files.""" self.task_manager.save_tasks() self.plan_manager.save_plan() return True def reload_all(self): """Reload all data from files (for external changes like MCP).""" self.task_manager.reload_tasks() self.task_manager.reload_notes() self.plan_manager.reload_plan() return True def export_data(self, file_path): """Export all data to a single JSON file.""" import json data = { "tasks": self.get_all_tasks(), "plan": self.get_all_plan_steps(), "notes": self.get_notes() } with open(file_path, 'w') as f: json.dump(data, f, indent=2) return True def import_data(self, file_path): """Import data from a JSON file.""" import json try: with open(file_path, 'r') as f: data = json.load(f) # Clear existing data self.task_manager.tasks = data.get("tasks", []) self.plan_manager.plan_steps = data.get("plan", []) # Import notes if available if "notes" in data: self.task_manager.save_notes(data["notes"]) # Save imported data self.save_all() return True except (json.JSONDecodeError, FileNotFoundError) as e: print(f"Error importing data: {e}") return False ``` -------------------------------------------------------------------------------- /MCP-README.md: -------------------------------------------------------------------------------- ```markdown # TaskTracker MCP Server TaskTracker MCP is a Model Context Protocol compatible version of the Terminal Task Tracker application. It exposes task and project plan management capabilities through MCP, allowing Language Models to interact with your task tracking data. ## Features - **Resources**: Access tasks, plans, and notes data - **Tools**: Create, update, and delete tasks and plan steps - **Prompts**: Templates for common task management activities ## Installation ```bash # Using uv (recommended) uv add -e . # Or with pip pip install -e . ``` ## Usage with Claude Desktop ```bash # Install in Claude Desktop mcp install mcp_server_fixed.py # Run with MCP Inspector mcp dev mcp_server_fixed.py ``` NOTE: If you encounter errors with `mcp_server.py`, please use the fixed version `mcp_server_fixed.py` instead. The fixed version uses a global API instance for resources and properly handles context parameters for tools. ## Running Directly ```bash # Run server directly python mcp_server_fixed.py # Or using MCP CLI mcp run mcp_server_fixed.py ``` ## Resources TaskTracker exposes the following resources: - `tasks://all` - List all tasks - `tasks://{task_id}` - Get a specific task - `plan://all` - List all plan steps - `plan://{step_id}` - Get a specific plan step - `notes://all` - Get all notes ## Tools TaskTracker provides the following tools: ### Task Tools - `get_all_tasks_tool` - Get all tasks - `get_task_tool` - Get a specific task by ID - `add_task` - Create a new task - `update_task` - Update an existing task - `delete_task` - Delete a task ### Plan Tools - `get_all_plan_steps_tool` - Get all plan steps - `get_plan_step_tool` - Get a specific plan step by ID - `add_plan_step` - Create a new plan step - `update_plan_step` - Update an existing plan step - `delete_plan_step` - Delete a plan step - `toggle_plan_step` - Toggle completion status ### Notes and Data Tools - `get_notes_tool` - Get all notes - `save_notes` - Save notes - `export_data` - Export all data to JSON - `import_data` - Import data from JSON ## Prompts TaskTracker includes the following prompts: - `add_task_prompt` - Template for adding a new task - `create_plan_prompt` - Template for creating a project plan ## Example Interactions ### Adding and Retrieving Tasks ``` > call-tool add_task "Implement login feature" "Create authentication endpoints for user login" 1 "not_started" { "id": "550e8400-e29b-41d4-a716-446655440000", "title": "Implement login feature", "description": "Create authentication endpoints for user login", "priority": 1, "status": "not_started", "created_at": "2025-04-08T14:32:15.123456", "updated_at": "2025-04-08T14:32:15.123456" } > call-tool get_all_tasks_tool [ { "id": "550e8400-e29b-41d4-a716-446655440000", "title": "Implement login feature", "description": "Create authentication endpoints for user login", "priority": 1, "status": "not_started", "created_at": "2025-04-08T14:32:15.123456", "updated_at": "2025-04-08T14:32:15.123456" } ] > read-resource tasks://all [ { "id": "550e8400-e29b-41d4-a716-446655440000", "title": "Implement login feature", "description": "Create authentication endpoints for user login", "priority": 1, "status": "not_started", "created_at": "2025-04-08T14:32:15.123456", "updated_at": "2025-04-08T14:32:15.123456" } ] ``` ### Managing Project Plans ``` > call-tool add_plan_step "Design API endpoints" "Create OpenAPI specification for endpoints" "Include authentication routes" 0 false { "id": "550e8400-e29b-41d4-a716-446655440001", "name": "Design API endpoints", "description": "Create OpenAPI specification for endpoints", "details": "Include authentication routes", "order": 0, "completed": false, "created_at": "2025-04-08T14:33:15.123456", "updated_at": "2025-04-08T14:33:15.123456" } > call-tool get_all_plan_steps_tool [ { "id": "550e8400-e29b-41d4-a716-446655440001", "name": "Design API endpoints", "description": "Create OpenAPI specification for endpoints", "details": "Include authentication routes", "order": 0, "completed": false, "created_at": "2025-04-08T14:33:15.123456", "updated_at": "2025-04-08T14:33:15.123456" } ] ``` ## License MIT ``` -------------------------------------------------------------------------------- /app/core/plan_manager.py: -------------------------------------------------------------------------------- ```python import json import os import uuid from datetime import datetime class PlanManager: def __init__(self, file_path=None): """Initialize the PlanManager with an optional file path.""" if file_path is None: home_dir = os.path.expanduser("~") plan_dir = os.path.join(home_dir, ".tasktracker") os.makedirs(plan_dir, exist_ok=True) file_path = os.path.join(plan_dir, "plan.json") self.file_path = file_path self.plan_steps = self._load_plan() def _load_plan(self): """Load plan steps from the file or return an empty list if file doesn't exist.""" if os.path.exists(self.file_path): try: with open(self.file_path, 'r') as f: data = json.load(f) # Ensure we always return a list if isinstance(data, list): return data elif isinstance(data, dict) and "steps" in data: return data["steps"] else: # If it's not a list or doesn't have steps key, return empty list return [] except (json.JSONDecodeError, IOError): return [] return [] def reload_plan(self): """Reload plan steps from file (for external changes like MCP).""" self.plan_steps = self._load_plan() def save_plan(self): """Save plan steps to the file.""" # Ensure plan_steps is a list before saving if not isinstance(self.plan_steps, list): self.plan_steps = [] with open(self.file_path, 'w') as f: json.dump(self.plan_steps, f, indent=2) def get_all_steps(self): """Return all plan steps.""" # Ensure we always return a list if not isinstance(self.plan_steps, list): self.plan_steps = [] return self.plan_steps def get_step(self, step_id): """Get a plan step by ID.""" # Ensure plan_steps is a list if not isinstance(self.plan_steps, list): self.plan_steps = [] return None for step in self.plan_steps: if step["id"] == step_id: return step return None def add_step(self, name, description="", details="", order=None, completed=False): """Add a new plan step.""" # Ensure plan_steps is a list if not isinstance(self.plan_steps, list): self.plan_steps = [] if order is None: # Place at the end by default order = len(self.plan_steps) step = { "id": str(uuid.uuid4()), "name": name, "description": description, "details": details, "order": order, "completed": completed, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } # Insert at the specified order self.plan_steps.append(step) self.reorder_steps() self.save_plan() return step def update_step(self, step_id, **kwargs): """Update a plan step by ID.""" step = self.get_step(step_id) if step: for key, value in kwargs.items(): if key in step and key not in ["id", "created_at"]: step[key] = value step["updated_at"] = datetime.now().isoformat() # If order changed, reorder all steps if "order" in kwargs: self.reorder_steps() self.save_plan() return step return None def toggle_step(self, step_id): """Toggle the completion status of a plan step.""" step = self.get_step(step_id) if step: step["completed"] = not step["completed"] step["updated_at"] = datetime.now().isoformat() self.save_plan() return step return None def delete_step(self, step_id): """Delete a plan step by ID.""" # Ensure plan_steps is a list if not isinstance(self.plan_steps, list): self.plan_steps = [] return False step = self.get_step(step_id) if step: self.plan_steps.remove(step) self.reorder_steps() self.save_plan() return True return False def reorder_steps(self): """Reorder steps to ensure consistent ordering.""" # Ensure plan_steps is a list if not isinstance(self.plan_steps, list): self.plan_steps = [] return # Sort by the current order self.plan_steps.sort(key=lambda x: x.get("order", 0)) # Update order field to match actual position for i, step in enumerate(self.plan_steps): step["order"] = i ``` -------------------------------------------------------------------------------- /app/api/cli.py: -------------------------------------------------------------------------------- ```python import argparse import sys import os from app.api.api import TaskTrackerAPI from app.core.task_manager import TaskManager from app.core.plan_manager import PlanManager def create_parser(): """Create the command line argument parser.""" parser = argparse.ArgumentParser(description='Terminal Task Tracker CLI') subparsers = parser.add_subparsers(dest='command', help='Command to execute') # Task commands task_parser = subparsers.add_parser('task', help='Task operations') task_subparsers = task_parser.add_subparsers(dest='subcommand', help='Task subcommand') # task list task_list_parser = task_subparsers.add_parser('list', help='List all tasks') # task show task_show_parser = task_subparsers.add_parser('show', help='Show task details') task_show_parser.add_argument('task_id', help='Task ID') # task add task_add_parser = task_subparsers.add_parser('add', help='Add a new task') task_add_parser.add_argument('title', help='Task title') task_add_parser.add_argument('--description', '-d', help='Task description') task_add_parser.add_argument('--priority', '-p', type=int, choices=[1, 2, 3], default=1, help='Task priority (1=Low, 2=Medium, 3=High)') task_add_parser.add_argument('--status', '-s', choices=['not_started', 'in_progress', 'completed'], default='not_started', help='Task status') # task update task_update_parser = task_subparsers.add_parser('update', help='Update a task') task_update_parser.add_argument('task_id', help='Task ID') task_update_parser.add_argument('--title', '-t', help='Task title') task_update_parser.add_argument('--description', '-d', help='Task description') task_update_parser.add_argument('--priority', '-p', type=int, choices=[1, 2, 3], help='Task priority (1=Low, 2=Medium, 3=High)') task_update_parser.add_argument('--status', '-s', choices=['not_started', 'in_progress', 'completed'], help='Task status') # task delete task_delete_parser = task_subparsers.add_parser('delete', help='Delete a task') task_delete_parser.add_argument('task_id', help='Task ID') # Plan commands plan_parser = subparsers.add_parser('plan', help='Plan operations') plan_subparsers = plan_parser.add_subparsers(dest='subcommand', help='Plan subcommand') # plan list plan_list_parser = plan_subparsers.add_parser('list', help='List all plan steps') # plan show plan_show_parser = plan_subparsers.add_parser('show', help='Show plan step details') plan_show_parser.add_argument('step_id', help='Step ID') # plan add plan_add_parser = plan_subparsers.add_parser('add', help='Add a new plan step') plan_add_parser.add_argument('name', help='Step name') plan_add_parser.add_argument('--description', '-d', help='Brief description') plan_add_parser.add_argument('--details', '-D', help='Detailed information') plan_add_parser.add_argument('--order', '-o', type=int, help='Step order (position in plan)') plan_add_parser.add_argument('--completed', '-c', action='store_true', help='Mark step as completed') # plan update plan_update_parser = plan_subparsers.add_parser('update', help='Update a plan step') plan_update_parser.add_argument('step_id', help='Step ID') plan_update_parser.add_argument('--name', '-n', help='Step name') plan_update_parser.add_argument('--description', '-d', help='Brief description') plan_update_parser.add_argument('--details', '-D', help='Detailed information') plan_update_parser.add_argument('--order', '-o', type=int, help='Step order (position in plan)') # plan toggle plan_toggle_parser = plan_subparsers.add_parser('toggle', help='Toggle completion status of a plan step') plan_toggle_parser.add_argument('step_id', help='Step ID') # plan delete plan_delete_parser = plan_subparsers.add_parser('delete', help='Delete a plan step') plan_delete_parser.add_argument('step_id', help='Step ID') # Export/Import commands export_parser = subparsers.add_parser('export', help='Export data to JSON file') export_parser.add_argument('file_path', help='Path to export file') import_parser = subparsers.add_parser('import', help='Import data from JSON file') import_parser.add_argument('file_path', help='Path to import file') return parser def main(): """Main CLI entry point.""" parser = create_parser() args = parser.parse_args() # Initialize API api = TaskTrackerAPI() if not args.command: parser.print_help() return try: # Task commands if args.command == 'task': if args.subcommand == 'list': tasks = api.get_all_tasks() if not tasks: print("No tasks found.") else: print(f"{'ID':<36} {'Title':<30} {'Priority':<8} {'Status':<12}") print("-" * 90) for task in tasks: print(f"{task['id']:<36} {task['title'][:30]:<30} {task['priority']:<8} {task['status']:<12}") elif args.subcommand == 'show': task = api.get_task(args.task_id) if task: print(f"ID: {task['id']}") print(f"Title: {task['title']}") print(f"Description: {task['description']}") print(f"Priority: {task['priority']}") print(f"Status: {task['status']}") print(f"Created: {task['created_at']}") print(f"Updated: {task['updated_at']}") else: print(f"Task not found: {args.task_id}") elif args.subcommand == 'add': task = api.add_task( args.title, args.description or "", args.priority, args.status ) print(f"Task added: {task['id']}") elif args.subcommand == 'update': # Collect the fields to update update_fields = {} if args.title: update_fields['title'] = args.title if args.description: update_fields['description'] = args.description if args.priority: update_fields['priority'] = args.priority if args.status: update_fields['status'] = args.status task = api.update_task(args.task_id, **update_fields) if task: print(f"Task updated: {task['id']}") else: print(f"Task not found: {args.task_id}") elif args.subcommand == 'delete': result = api.delete_task(args.task_id) if result: print(f"Task deleted: {args.task_id}") else: print(f"Task not found: {args.task_id}") else: parser.print_help() # Plan commands elif args.command == 'plan': if args.subcommand == 'list': steps = api.get_all_plan_steps() if not steps: print("No plan steps found.") else: print(f"{'Order':<6} {'Completed':<10} {'ID':<36} {'Description'}") print("-" * 90) for step in steps: completed = "[x]" if step['completed'] else "[ ]" print(f"{step['order']:<6} {completed:<10} {step['id']:<36} {step['description']}") elif args.subcommand == 'show': step = api.get_plan_step(args.step_id) if step: completed = "Yes" if step['completed'] else "No" print(f"ID: {step['id']}") print(f"Name: {step.get('name', 'N/A')}") print(f"Description: {step.get('description', '')}") print(f"Order: {step.get('order', 0)}") print(f"Completed: {completed}") # Print details if available details = step.get('details', '') if details: print("\nDetails:") print(details) print(f"\nCreated: {step.get('created_at', 'N/A')}") print(f"Updated: {step.get('updated_at', 'N/A')}") else: print(f"Plan step not found: {args.step_id}") elif args.subcommand == 'add': step = api.add_plan_step( args.name, args.description or "", args.details or "", args.order, args.completed ) print(f"Plan step added: {step['id']}") elif args.subcommand == 'update': # Collect the fields to update update_fields = {} if args.name: update_fields['name'] = args.name if args.description: update_fields['description'] = args.description if args.details: update_fields['details'] = args.details if args.order is not None: update_fields['order'] = args.order step = api.update_plan_step(args.step_id, **update_fields) if step: print(f"Plan step updated: {step['id']}") else: print(f"Plan step not found: {args.step_id}") elif args.subcommand == 'toggle': step = api.toggle_plan_step(args.step_id) if step: completed = "completed" if step['completed'] else "not completed" print(f"Plan step {args.step_id} marked as {completed}") else: print(f"Plan step not found: {args.step_id}") elif args.subcommand == 'delete': result = api.delete_plan_step(args.step_id) if result: print(f"Plan step deleted: {args.step_id}") else: print(f"Plan step not found: {args.step_id}") else: parser.print_help() # Export/Import commands elif args.command == 'export': result = api.export_data(args.file_path) if result: print(f"Data exported to {args.file_path}") else: print("Error exporting data") elif args.command == 'import': result = api.import_data(args.file_path) if result: print(f"Data imported from {args.file_path}") else: print("Error importing data") else: parser.print_help() except Exception as e: print(f"Error: {str(e)}") return 1 return 0 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /app/ui/terminal_ui.py: -------------------------------------------------------------------------------- ```python import curses import traceback import os import time from app.ui.ui_components import TaskListWindow, TaskDetailWindow, PlanWindow, NotesWindow, InputDialog, ConfirmDialog from app.ui.input_handler import InputHandler, FocusArea class TerminalUI: def __init__(self, api): """Initialize the terminal UI with a reference to the API.""" self.api = api self.stdscr = None self.task_list_win = None self.task_detail_win = None self.plan_win = None self.notes_win = None self.input_handler = None self.notes_visible = True # Flag to control notes visibility # File modification tracking self.last_tasks_mtime = 0 self.last_plan_mtime = 0 self.last_notes_mtime = 0 self.last_check_time = 0 self.file_check_interval = 1.0 # Check for file changes every second def run(self): """Run the terminal UI.""" try: # Start curses application curses.wrapper(self._main) except Exception as e: # If an error occurs, restore terminal and show traceback if self.stdscr: # Reset timeout to blocking before exiting to prevent potential issues self.stdscr.timeout(-1) curses.endwin() print(f"An error occurred: {str(e)}") traceback.print_exc() def _main(self, stdscr): """Main function for the curses application.""" self.stdscr = stdscr # Set up curses curses.curs_set(0) # Hide cursor stdscr.clear() # Set up input handler self.input_handler = InputHandler(self) # Create initial layout self._create_layout() # Initial data load try: self.refresh_tasks() self.refresh_plan() self.refresh_notes() # Initialize last modified times after initial load task_file = self.api.task_manager.file_path plan_file = self.api.plan_manager.file_path notes_file = self.api.task_manager.notes_file_path if os.path.exists(task_file): self.last_tasks_mtime = os.path.getmtime(task_file) if os.path.exists(plan_file): self.last_plan_mtime = os.path.getmtime(plan_file) if os.path.exists(notes_file): self.last_notes_mtime = os.path.getmtime(notes_file) self.last_check_time = time.time() except Exception as e: self.show_message(f"Error loading data: {str(e)}") # Main event loop while True: # Check for external file changes (e.g., from MCP) self.check_file_changes() # Update the screen stdscr.refresh() # Configure timeout for getch to allow polling for file changes stdscr.timeout(100) # 100ms timeout # Get input (returns -1 if no input available) key = stdscr.getch() # Reset timeout to blocking mode if we actually got a key if key != -1: stdscr.timeout(-1) # Handle input (exit if handler returns False) if not self.input_handler.handle_input(key): break else: # No input, just continue the loop to check for file changes continue def _create_layout(self): """Create the initial window layout.""" screen_height, screen_width = self.stdscr.getmaxyx() # Calculate dimensions for initial layout top_height = screen_height // 2 main_width = screen_width - 30 # Reserve 30 cols for notes on the right task_width = main_width // 2 detail_width = main_width - task_width plan_height = screen_height - top_height notes_width = screen_width - main_width # Create windows self.task_list_win = TaskListWindow( self.stdscr, top_height, task_width, 0, 0, "Tasks" ) self.task_detail_win = TaskDetailWindow( self.stdscr, top_height, detail_width, 0, task_width, "Task Details" ) self.plan_win = PlanWindow( self.stdscr, plan_height, main_width, top_height, 0, "Project Plan" ) self.notes_win = NotesWindow( self.stdscr, screen_height, notes_width, 0, main_width, "Notes" ) # Initial refresh self.task_list_win.refresh() self.task_detail_win.refresh() self.plan_win.refresh() if self.notes_visible: self.notes_win.refresh() def toggle_notes_visibility(self): """Toggle the visibility of the notes window.""" self.notes_visible = not self.notes_visible # If hiding and notes is the active focus, change focus to tasks if not self.notes_visible and self.input_handler.focus == FocusArea.NOTES: self.input_handler.focus = FocusArea.TASKS self.update_focus(FocusArea.TASKS) # Redraw layout (this will resize all windows accordingly) self._resize_layout() # If notes are hidden, make sure we redraw all other windows if not self.notes_visible: # Ensure each window is refreshed with its contents self.task_list_win.refresh_content() self.task_detail_win.refresh_content() self.plan_win.refresh_content() # Refresh the stdscr to ensure proper redraw of everything self.stdscr.refresh() return self.notes_visible def _resize_layout(self): """Resize the window layout.""" screen_height, screen_width = self.stdscr.getmaxyx() # Calculate dimensions based on notes visibility if self.notes_visible: main_width = screen_width - 30 # Reserve 30 cols for notes on the right notes_width = screen_width - main_width else: main_width = screen_width # Use full width when notes are hidden notes_width = 0 top_height = screen_height // 2 task_width = main_width // 2 detail_width = main_width - task_width plan_height = screen_height - top_height # Resize windows self.task_list_win.resize(top_height, task_width, 0, 0) self.task_detail_win.resize(top_height, detail_width, 0, task_width) self.plan_win.resize(plan_height, main_width, top_height, 0) # Only resize notes window if visible if self.notes_visible: self.notes_win.resize(screen_height, notes_width, 0, main_width) # Refresh content self.task_list_win.refresh_content() self.task_detail_win.refresh_content() self.plan_win.refresh_content() # Only refresh notes if visible if self.notes_visible: self.notes_win.refresh_content() def refresh_tasks(self): """Refresh task list and details.""" tasks = self.api.get_all_tasks() # Sort tasks by priority (high to low) and then by status tasks.sort(key=lambda x: (-x['priority'], x['status'])) self.task_list_win.set_tasks(tasks) # Update task details if there's a selected task selected_task = self.task_list_win.get_selected_task() self.task_detail_win.set_task(selected_task) def refresh_plan(self): """Refresh the project plan.""" try: steps = self.api.get_all_plan_steps() # Validate steps before setting them if steps is None: steps = [] # Steps are already sorted by order in the API self.plan_win.set_steps(steps) except Exception as e: # Handle errors gracefully self.plan_win.set_steps([]) raise Exception(f"Failed to load plan: {str(e)}") def refresh_notes(self): """Load and refresh the notes content.""" notes = self.api.get_notes() self.notes_win.set_notes(notes) def save_notes(self): """Save the current notes content.""" notes_text = self.notes_win.get_notes() self.api.save_notes(notes_text) def check_file_changes(self): """Check if any data files have been modified externally (like by MCP).""" try: current_time = time.time() # Only check periodically to reduce file system access if current_time - self.last_check_time < self.file_check_interval: return False self.last_check_time = current_time changes_detected = False # Get file paths from the API's managers task_file = self.api.task_manager.file_path plan_file = self.api.plan_manager.file_path notes_file = self.api.task_manager.notes_file_path # Check if any data files have been modified tasks_changed = os.path.exists(task_file) and os.path.getmtime(task_file) > self.last_tasks_mtime plan_changed = os.path.exists(plan_file) and os.path.getmtime(plan_file) > self.last_plan_mtime notes_changed = os.path.exists(notes_file) and os.path.getmtime(notes_file) > self.last_notes_mtime if tasks_changed or plan_changed or notes_changed: # Update last modified times if tasks_changed: self.last_tasks_mtime = os.path.getmtime(task_file) if plan_changed: self.last_plan_mtime = os.path.getmtime(plan_file) if notes_changed: self.last_notes_mtime = os.path.getmtime(notes_file) try: # Reload all data from files self.api.reload_all() # Refresh UI components with individual try-except blocks try: if tasks_changed: self.refresh_tasks() except Exception as e: # Silently handle task refresh error pass try: if plan_changed: self.refresh_plan() except Exception as e: # Silently handle plan refresh error pass try: if notes_changed: self.refresh_notes() except Exception as e: # Silently handle notes refresh error pass changes_detected = True except Exception as e: # If reload fails, try to continue without crashing pass return changes_detected except Exception as e: # Fail silently if file checking itself fails return False def update_focus(self, focus): """Update the UI focus.""" # Reset all titles first self.task_list_win.set_title("Tasks") self.task_detail_win.set_title("Task Details") self.plan_win.set_title("Project Plan") if self.notes_visible: self.notes_win.set_title("Notes") # Highlight the active window by changing its title if focus == FocusArea.TASKS: self.task_list_win.set_title("Tasks [Active]") elif focus == FocusArea.DETAILS: self.task_detail_win.set_title("Task Details [Active]") elif focus == FocusArea.PLAN: self.plan_win.set_title("Project Plan [Active]") elif focus == FocusArea.NOTES and self.notes_visible: self.notes_win.set_title("Notes [Active]") # Clear screen to remove artifacts self.stdscr.erase() self.stdscr.refresh() # Refresh the content of all windows self.task_list_win.refresh_content() self.task_detail_win.refresh_content() self.plan_win.refresh_content() # Only refresh notes if visible if self.notes_visible: self.notes_win.refresh_content() def show_input_dialog(self, title, prompts, initial_values=None): """Show an input dialog and return the entered values or None if canceled.""" dialog = InputDialog(self.stdscr, title, prompts, initial_values) result = dialog.show() # Redraw the entire screen after dialog closes self.stdscr.clear() self.stdscr.refresh() self._resize_layout() return result def show_confirm_dialog(self, title, message): """Show a confirmation dialog and return True if confirmed, False otherwise.""" dialog = ConfirmDialog(self.stdscr, title, message) result = dialog.show() # Redraw the entire screen after dialog closes self.stdscr.clear() self.stdscr.refresh() self._resize_layout() return result def show_message(self, message): """Show a temporary message at the bottom of the screen.""" screen_height, screen_width = self.stdscr.getmaxyx() # Create a small window for the message msg_height = 3 msg_width = min(len(message) + 4, screen_width - 4) msg_y = (screen_height - msg_height) // 2 msg_x = (screen_width - msg_width) // 2 # Create message window msg_win = self.stdscr.subwin(msg_height, msg_width, msg_y, msg_x) msg_win.box() msg_win.addstr(1, 2, message[:msg_width - 4]) msg_win.addstr(msg_height - 1, 2, "Press any key to continue") msg_win.refresh() # Wait for a key press self.stdscr.getch() # Redraw the entire screen self.stdscr.clear() self.stdscr.refresh() self._resize_layout() ``` -------------------------------------------------------------------------------- /app/ui/input_handler.py: -------------------------------------------------------------------------------- ```python import curses from enum import Enum class FocusArea(Enum): TASKS = 0 DETAILS = 1 PLAN = 2 NOTES = 3 class InputHandler: def __init__(self, terminal_ui): """Initialize the input handler with a reference to the terminal UI.""" self.terminal_ui = terminal_ui self.focus = FocusArea.TASKS def handle_input(self, key): """ Handle keyboard input and dispatch to appropriate handlers. Returns True if the application should continue, False if it should exit. """ # If notes is in edit mode, we need special handling if self.focus == FocusArea.NOTES and self.terminal_ui.notes_win.edit_mode: # Escape exits edit mode in notes if key == 27: # Escape self.terminal_ui.notes_win.toggle_edit_mode() self.terminal_ui.save_notes() return True # All other keys are processed by the notes window try: self.terminal_ui.notes_win.handle_key(key) return True except Exception as e: self.terminal_ui.show_message(f"Error in notes edit: {str(e)}") return True # Global keys (work in any context) if key == 27: # Escape return self._handle_escape() elif key == 9: # Tab self._cycle_focus() return True elif key == 24: # Ctrl+X - toggle notes visibility (ASCII 24 is Ctrl+X) self.terminal_ui.toggle_notes_visibility() return True # Focus-specific input handling if self.focus == FocusArea.TASKS: return self._handle_tasks_input(key) elif self.focus == FocusArea.DETAILS: return self._handle_details_input(key) elif self.focus == FocusArea.PLAN: return self._handle_plan_input(key) elif self.focus == FocusArea.NOTES: return self._handle_notes_input(key) return True def _handle_escape(self): """Handle the escape key - confirm exit.""" # Reset timeout to blocking for the confirmation dialog self.terminal_ui.stdscr.timeout(-1) confirm = self.terminal_ui.show_confirm_dialog( "Exit Confirmation", "Are you sure you want to exit? Any unsaved changes will be lost." ) return not confirm # Return False to exit if confirmed def _cycle_focus(self): """Cycle through the focus areas.""" focus_order = list(FocusArea) current_idx = focus_order.index(self.focus) # Skip Notes focus if it's not visible if not self.terminal_ui.notes_visible: # Create a filtered list without the NOTES enum focus_order = [f for f in focus_order if f != FocusArea.NOTES] # Find the next focus in our (potentially filtered) list next_idx = (focus_order.index(self.focus) + 1) % len(focus_order) self.focus = focus_order[next_idx] # Update UI with new focus self.terminal_ui.update_focus(self.focus) # Force a complete UI redraw to fix rendering artifacts self.terminal_ui._resize_layout() def _handle_tasks_input(self, key): """Handle input while focused on the task list.""" if key == curses.KEY_UP: self.terminal_ui.task_list_win.select_prev() self._update_task_details() elif key == curses.KEY_DOWN: self.terminal_ui.task_list_win.select_next() self._update_task_details() elif key in (10, 13, curses.KEY_ENTER): # Enter (different codes) # Toggle completion status when Enter is pressed self._toggle_selected_task() self._update_task_details() elif key == ord(' '): # Space # Toggle completion status when Space is pressed self._toggle_selected_task() self._update_task_details() elif key == ord('n'): # New task self._new_task() elif key == ord('e'): # Edit task self._edit_task() elif key == ord('d'): # Delete task self._delete_task() return True def _handle_details_input(self, key): """Handle input while focused on the task details.""" # There's not much to do in the details view except view # Maybe implement scrolling for long descriptions later return True def _handle_notes_input(self, key): """Handle input while focused on the notes.""" if key == ord('e'): # Edit notes self.terminal_ui.notes_win.toggle_edit_mode() return True return True def _handle_plan_input(self, key): """Handle input while focused on the project plan.""" if key == curses.KEY_UP: self.terminal_ui.plan_win.select_prev() elif key == curses.KEY_DOWN: self.terminal_ui.plan_win.select_next() elif key in (10, 13, curses.KEY_ENTER): # Enter (different codes) # Toggle completion when Enter is pressed self._toggle_plan_step() elif key == ord(' '): # Toggle completion with Space self._toggle_plan_step() elif key == ord('d'): # Toggle details view self.terminal_ui.plan_win.toggle_details() elif key == ord('n'): # New plan step self._new_plan_step() elif key == ord('e'): # Edit plan step self._edit_plan_step() elif key == ord('D'): # Delete plan step (capital D to avoid conflict with details) self._delete_plan_step() return True def _update_task_details(self): """Update the task details window with the selected task.""" task = self.terminal_ui.task_list_win.get_selected_task() self.terminal_ui.task_detail_win.set_task(task) def _new_task(self): """Create a new task.""" prompts = ["Title", "Description", "Priority (1-3)"] values = self.terminal_ui.show_input_dialog("New Task", prompts) if values: title, description, priority_str = values # Validate priority try: priority = int(priority_str) if priority_str else 1 if priority < 1 or priority > 3: priority = 1 except ValueError: priority = 1 # Add the task task = self.terminal_ui.api.add_task(title, description, priority) # Refresh task list self.terminal_ui.refresh_tasks() # Find and select the new task tasks = self.terminal_ui.api.get_all_tasks() for i, t in enumerate(tasks): if t["id"] == task["id"]: self.terminal_ui.task_list_win.selected_index = i self.terminal_ui.task_list_win.adjust_selection() self.terminal_ui.task_list_win.refresh_content() self._update_task_details() break def _edit_task(self): """Edit the selected task.""" task = self.terminal_ui.task_list_win.get_selected_task() if not task: return # Set up the edit dialog with current values prompts = ["Title", "Description", "Priority (1-3)", "Status"] values = [ task["title"], task["description"], str(task["priority"]), task["status"] ] new_values = self.terminal_ui.show_input_dialog("Edit Task", prompts, values) if new_values: title, description, priority_str, status = new_values # Validate priority try: priority = int(priority_str) if priority_str else task["priority"] if priority < 1 or priority > 3: priority = task["priority"] except ValueError: priority = task["priority"] # Validate status valid_statuses = ["not_started", "in_progress", "completed"] if status not in valid_statuses: status = task["status"] # Update the task self.terminal_ui.api.update_task( task["id"], title=title, description=description, priority=priority, status=status ) # Refresh task list and details self.terminal_ui.refresh_tasks() self._update_task_details() def _delete_task(self): """Delete the selected task.""" task = self.terminal_ui.task_list_win.get_selected_task() if not task: return confirm = self.terminal_ui.show_confirm_dialog( "Delete Task", f"Are you sure you want to delete the task '{task['title']}'?" ) if confirm: # Delete the task self.terminal_ui.api.delete_task(task["id"]) # Refresh task list self.terminal_ui.refresh_tasks() self._update_task_details() def _new_plan_step(self): """Create a new plan step.""" prompts = ["Name", "Description", "Details"] values = self.terminal_ui.show_input_dialog("New Plan Step", prompts) try: if values and values[0]: # At least the name should be provided # Add the plan step name = values[0] description = values[1] if len(values) > 1 else "" details = values[2] if len(values) > 2 else "" step = self.terminal_ui.api.add_plan_step( name=name, description=description, details=details ) # Refresh plan self.terminal_ui.refresh_plan() # Only try to find and select the new step if it was successfully created if step and isinstance(step, dict) and "id" in step: steps = self.terminal_ui.api.get_all_plan_steps() for i, s in enumerate(steps): if s["id"] == step["id"]: self.terminal_ui.plan_win.selected_index = i self.terminal_ui.plan_win.adjust_selection() self.terminal_ui.plan_win.refresh_content() break except Exception as e: self.terminal_ui.show_message(f"Error creating plan step: {str(e)}") def _edit_plan_step(self): """Edit the selected plan step.""" step = self.terminal_ui.plan_win.get_selected_step() if not step: return # Set up the edit dialog with current values prompts = ["Name", "Description", "Details", "Order"] values = [ step.get("name", step.get("description", "")), step.get("description", ""), step.get("details", ""), str(step.get("order", 0)) ] new_values = self.terminal_ui.show_input_dialog("Edit Plan Step", prompts, values) if new_values: # Extract and validate values name = new_values[0] if len(new_values) > 0 else "" description = new_values[1] if len(new_values) > 1 else "" details = new_values[2] if len(new_values) > 2 else "" order_str = new_values[3] if len(new_values) > 3 else "" # Validate order try: order = int(order_str) if order_str else step.get("order", 0) if order < 0: order = step.get("order", 0) except ValueError: order = step.get("order", 0) # Update the plan step self.terminal_ui.api.update_plan_step( step["id"], name=name, description=description, details=details, order=order ) # Refresh plan self.terminal_ui.refresh_plan() def _delete_plan_step(self): """Delete the selected plan step.""" step = self.terminal_ui.plan_win.get_selected_step() if not step: return confirm = self.terminal_ui.show_confirm_dialog( "Delete Plan Step", f"Are you sure you want to delete the plan step '{step['description']}'?" ) if confirm: # Delete the plan step self.terminal_ui.api.delete_plan_step(step["id"]) # Refresh plan self.terminal_ui.refresh_plan() def _toggle_selected_task(self): """Cycle through task statuses (not_started -> in_progress -> completed -> not_started).""" task = self.terminal_ui.task_list_win.get_selected_task() if not task: return # Cycle through the statuses current_status = task.get("status", "not_started") # If it's an old "pending" status, treat it as "not_started" if current_status == "pending": current_status = "not_started" status_cycle = { "not_started": "in_progress", "in_progress": "completed", "completed": "not_started" } new_status = status_cycle.get(current_status, "not_started") # Update the task self.terminal_ui.api.update_task( task["id"], status=new_status ) # Refresh task list self.terminal_ui.refresh_tasks() def _toggle_plan_step(self): """Toggle the completion status of the selected plan step.""" step = self.terminal_ui.plan_win.get_selected_step() if not step: return # Toggle the plan step self.terminal_ui.api.toggle_plan_step(step["id"]) # Refresh plan self.terminal_ui.refresh_plan() ``` -------------------------------------------------------------------------------- /mcp_server_fixed.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ MCP-compatible server for the Terminal Task Tracker This server exposes the task tracker functionality through the Model Context Protocol (MCP). """ import json import os import logging from typing import Dict, List, Optional, Any, Union, AsyncIterator from contextlib import asynccontextmanager from collections.abc import AsyncIterator from mcp.server.fastmcp import FastMCP, Context, Image from app.core.task_manager import TaskManager from app.core.plan_manager import PlanManager from app.api.api import TaskTrackerAPI # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize data files home_dir = os.path.expanduser("~") data_dir = os.path.join(home_dir, ".tasktracker") os.makedirs(data_dir, exist_ok=True) task_file = os.path.join(data_dir, "tasks.json") plan_file = os.path.join(data_dir, "plan.json") notes_file = os.path.join(data_dir, "notes.txt") # Global variable for API access from resources without URI parameters global_api = None # Set up lifespan context manager for the MCP server @asynccontextmanager async def lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """ Initialize and manage the task and plan managers for the server's lifespan. This ensures that we have a single consistent instance of the managers throughout the server's lifecycle. """ logger.info(f"Starting TaskTracker server with data directory: {data_dir}") # Initialize managers with explicit file paths task_manager = TaskManager(task_file, notes_file) plan_manager = PlanManager(plan_file) # Create API api = TaskTrackerAPI(task_manager, plan_manager) # Set global API for resources without URI parameters global global_api global_api = api try: # Yield the API instance to the server yield {"api": api} finally: # Ensure all data is saved on shutdown logger.info("Shutting down TaskTracker server, saving all data") api.save_all() # Create an MCP server with the lifespan manager mcp = FastMCP("TaskTracker", lifespan=lifespan) # === Resources === @mcp.resource("tasks://all") def get_all_tasks() -> str: """Get all tasks in the system as JSON.""" # Reload data from files first to ensure we have latest changes global_api.reload_all() tasks = global_api.get_all_tasks() return json.dumps(tasks, indent=2) @mcp.resource("tasks://{task_id}") def get_task(task_id: str) -> str: """Get a specific task by ID.""" # Reload data from files first to ensure we have latest changes global_api.reload_all() task = global_api.get_task(task_id) if task: return json.dumps(task, indent=2) return "Task not found" @mcp.resource("plan://all") def get_all_plan_steps() -> str: """Get all plan steps in the system as JSON.""" # Reload data from files first to ensure we have latest changes global_api.reload_all() steps = global_api.get_all_plan_steps() return json.dumps(steps, indent=2) @mcp.resource("plan://{step_id}") def get_plan_step(step_id: str) -> str: """Get a specific plan step by ID.""" # Reload data from files first to ensure we have latest changes global_api.reload_all() step = global_api.get_plan_step(step_id) if step: return json.dumps(step, indent=2) return "Plan step not found" @mcp.resource("notes://all") def get_notes() -> str: """Get all notes in the system.""" # Reload data from files first to ensure we have latest changes global_api.reload_all() return global_api.get_notes() # === Tools === @mcp.tool() def get_all_tasks_tool(ctx: Context) -> List[Dict[str, Any]]: """ Get all tasks in the system. Returns: List of all tasks """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() return api.get_all_tasks() @mcp.tool() def get_task_tool(task_id: str, ctx: Context) -> Dict[str, Any]: """ Get a specific task by ID. Args: task_id: The ID of the task to retrieve Returns: The task or an error message if not found """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() task = api.get_task(task_id) if task: return task return {"error": "Task not found"} @mcp.tool() def get_all_plan_steps_tool(ctx: Context) -> List[Dict[str, Any]]: """ Get all plan steps in the system. Returns: List of all plan steps """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() return api.get_all_plan_steps() @mcp.tool() def get_plan_step_tool(step_id: str, ctx: Context) -> Dict[str, Any]: """ Get a specific plan step by ID. Args: step_id: The ID of the plan step to retrieve Returns: The plan step or an error message if not found """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() step = api.get_plan_step(step_id) if step: return step return {"error": "Plan step not found"} @mcp.tool() def get_notes_tool(ctx: Context) -> str: """ Get all notes in the system. Returns: The notes text """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() return api.get_notes() @mcp.tool() def add_task(title: str, ctx: Context, description: str = "", priority: int = 1, status: str = "not_started") -> Dict[str, Any]: """ Add a new task to the system. Args: title: The title of the task ctx: The MCP context object description: A detailed description of the task priority: Priority level (1-3, with 1 being highest) status: Current status (not_started, in_progress, completed) Returns: The newly created task """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() task = api.add_task(title, description, priority, status) api.save_all() logger.info(f"Added task: {title} (ID: {task['id']})") return task @mcp.tool() def update_task(task_id: str, ctx: Context, title: Optional[str] = None, description: Optional[str] = None, priority: Optional[int] = None, status: Optional[str] = None) -> Dict[str, Any]: """ Update an existing task. Args: task_id: The ID of the task to update title: New title (optional) description: New description (optional) priority: New priority (optional) status: New status (optional) Returns: The updated task or None if task not found """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() kwargs = {} if title is not None: kwargs["title"] = title if description is not None: kwargs["description"] = description if priority is not None: kwargs["priority"] = priority if status is not None: kwargs["status"] = status task = api.update_task(task_id, **kwargs) if task: api.save_all() logger.info(f"Updated task ID: {task_id}") else: logger.warning(f"Failed to update task: {task_id} - Not found") return task or {"error": "Task not found"} @mcp.tool() def delete_task(task_id: str, ctx: Context) -> Dict[str, Any]: """ Delete a task. Args: task_id: The ID of the task to delete Returns: Success or failure message """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() result = api.delete_task(task_id) if result: api.save_all() logger.info(f"Deleted task ID: {task_id}") return {"success": True, "message": "Task deleted successfully"} logger.warning(f"Failed to delete task: {task_id} - Not found") return {"success": False, "message": "Task not found"} @mcp.tool() def add_plan_step(name: str, ctx: Context, description: str = "", details: str = "", order: Optional[int] = None, completed: bool = False) -> Dict[str, Any]: """ Add a new plan step. Args: name: The name of the plan step description: A brief description details: Detailed information about the step order: Position in the plan (optional) completed: Whether the step is completed Returns: The newly created plan step """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() step = api.add_plan_step(name, description, details, order, completed) api.save_all() logger.info(f"Added plan step: {name} (ID: {step['id']})") return step @mcp.tool() def update_plan_step(step_id: str, ctx: Context, name: Optional[str] = None, description: Optional[str] = None, details: Optional[str] = None, order: Optional[int] = None, completed: Optional[bool] = None) -> Dict[str, Any]: """ Update an existing plan step. Args: step_id: The ID of the step to update name: New name (optional) description: New description (optional) details: New details (optional) order: New order (optional) completed: New completion status (optional) Returns: The updated plan step or None if not found """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() kwargs = {} if name is not None: kwargs["name"] = name if description is not None: kwargs["description"] = description if details is not None: kwargs["details"] = details if order is not None: kwargs["order"] = order if completed is not None: kwargs["completed"] = completed step = api.update_plan_step(step_id, **kwargs) if step: api.save_all() logger.info(f"Updated plan step ID: {step_id}") else: logger.warning(f"Failed to update plan step: {step_id} - Not found") return step or {"error": "Plan step not found"} @mcp.tool() def delete_plan_step(step_id: str, ctx: Context) -> Dict[str, Any]: """ Delete a plan step. Args: step_id: The ID of the step to delete Returns: Success or failure message """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() result = api.delete_plan_step(step_id) if result: api.save_all() logger.info(f"Deleted plan step ID: {step_id}") return {"success": True, "message": "Plan step deleted successfully"} logger.warning(f"Failed to delete plan step: {step_id} - Not found") return {"success": False, "message": "Plan step not found"} @mcp.tool() def toggle_plan_step(step_id: str, ctx: Context) -> Dict[str, Any]: """ Toggle the completion status of a plan step. Args: step_id: The ID of the step to toggle Returns: The updated plan step or None if not found """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() step = api.toggle_plan_step(step_id) if step: api.save_all() logger.info(f"Toggled completion status of plan step ID: {step_id} to {step['completed']}") else: logger.warning(f"Failed to toggle plan step: {step_id} - Not found") return step or {"error": "Plan step not found"} @mcp.tool() def save_notes(notes_text: str, ctx: Context) -> Dict[str, Any]: """ Save notes to the system. Args: notes_text: The notes text to save Returns: Success message """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() api.save_notes(notes_text) api.save_all() logger.info("Notes saved") return {"success": True, "message": "Notes saved successfully"} @mcp.tool() def export_data(file_path: str, ctx: Context) -> Dict[str, Any]: """ Export all data to a JSON file. Args: file_path: Path to save the exported data Returns: Success or failure message """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() try: api.export_data(file_path) logger.info(f"Data exported to {file_path}") return {"success": True, "message": f"Data exported to {file_path}"} except Exception as e: logger.error(f"Export failed: {str(e)}") return {"success": False, "message": f"Export failed: {str(e)}"} @mcp.tool() def import_data(file_path: str, ctx: Context) -> Dict[str, Any]: """ Import data from a JSON file. Args: file_path: Path to the file containing the data to import Returns: Success or failure message """ api = ctx.request_context.lifespan_context["api"] # Reload data from files first to ensure we have latest changes api.reload_all() try: result = api.import_data(file_path) if result: logger.info(f"Data imported from {file_path}") return {"success": True, "message": "Data imported successfully"} logger.warning(f"Import failed from {file_path}") return {"success": False, "message": "Import failed"} except Exception as e: logger.error(f"Import failed: {str(e)}") return {"success": False, "message": f"Import failed: {str(e)}"} # === Prompts === @mcp.prompt() def add_task_prompt(title: str = "", description: str = "") -> str: """Create a prompt to add a new task.""" return f"""Please add a new task with the following details: Title: {title} Description: {description} Please provide any missing information and set the priority and status. """ @mcp.prompt() def create_plan_prompt() -> str: """Create a prompt to help create a new project plan.""" return """I need to create a new project plan. Please help me break down this project into clear steps. For each step, I need: 1. A clear name 2. A brief description 3. Any detailed information needed to complete the step 4. The logical order of the steps Please ask me about my project goals so you can help create an appropriate plan. """ # Define a main function for entry point def main(): """Run the MCP server.""" mcp.run() # Run the server if executed directly if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /app/ui/ui_components.py: -------------------------------------------------------------------------------- ```python import curses class Window: def __init__(self, stdscr, height, width, y, x, title=""): """Initialize a window with a border and optional title.""" self.win = stdscr.subwin(height, width, y, x) self.height = height self.width = width self.title = title self.win.box() self.set_title(title) self.content_window = self.win.derwin(height - 2, width - 2, 1, 1) self.selected_index = 0 self.scroll_offset = 0 self.max_visible_items = height - 2 def set_title(self, title): """Set the window title.""" self.title = title if title: title_str = f" {title} " x = max(1, (self.width - len(title_str)) // 2) try: self.win.addstr(0, x, title_str) except curses.error: # Fall back to a simpler title if there's an error try: self.win.addstr(0, 1, "Window") except: pass def clear(self): """Clear the content window.""" self.content_window.clear() def refresh(self): """Refresh the window and its content.""" self.win.box() self.set_title(self.title) self.win.refresh() self.content_window.refresh() def get_content_dimensions(self): """Get the usable dimensions of the content window.""" return self.height - 2, self.width - 2 def resize(self, height, width, y, x): """Resize and move the window.""" self.height = height self.width = width self.win.resize(height, width) self.win.mvwin(y, x) self.content_window = self.win.derwin(height - 2, width - 2, 1, 1) self.max_visible_items = height - 2 self.refresh() def display_message(self, message): """Display a message in the content window.""" self.clear() self.content_window.addstr(0, 0, message) self.refresh() class TaskListWindow(Window): def __init__(self, stdscr, height, width, y, x, title="Tasks"): """Initialize a task list window.""" super().__init__(stdscr, height, width, y, x, title) self.tasks = [] def set_tasks(self, tasks): """Set the tasks to display.""" self.tasks = tasks self.adjust_selection() self.refresh_content() def adjust_selection(self): """Adjust selection index and scroll offset to valid values.""" if not self.tasks: self.selected_index = 0 self.scroll_offset = 0 return # Ensure selected_index is in valid range if self.selected_index >= len(self.tasks): self.selected_index = len(self.tasks) - 1 # Adjust scroll offset to keep selected item visible if self.selected_index < self.scroll_offset: self.scroll_offset = self.selected_index elif self.selected_index >= self.scroll_offset + self.max_visible_items: self.scroll_offset = self.selected_index - self.max_visible_items + 1 def refresh_content(self): """Refresh the task list content.""" self.clear() content_height, content_width = self.get_content_dimensions() if not self.tasks: self.content_window.addstr(0, 0, "No tasks") self.refresh() return # Display visible tasks for i in range(min(self.max_visible_items, len(self.tasks))): idx = i + self.scroll_offset if idx >= len(self.tasks): break task = self.tasks[idx] # Highlight selected task if idx == self.selected_index: self.content_window.attron(curses.A_REVERSE) # Format priority indicator priority_markers = ["!", "!!", "!!!"] priority_str = priority_markers[task['priority'] - 1] if 1 <= task['priority'] <= 3 else "" # Format status with icons (using ASCII only to avoid Unicode display issues) status_map = { "not_started": "[ ]", # Empty square "in_progress": "[>]", # Right arrow (in progress) - ASCII version "completed": "[x]", # Checkmark - ASCII version # For backward compatibility "pending": "[ ]" } status_str = status_map.get(task['status'], "[ ]") # Truncate title if needed max_title_width = content_width - len(priority_str) - len(status_str) - 2 title = task['title'] if len(title) > max_title_width: title = title[:max_title_width-3] + "..." # Display task line with error handling task_str = f"{status_str} {title} {priority_str}" try: self.content_window.addstr(i, 0, task_str) except curses.error: # Handle display errors gracefully try: # Try with a simpler string self.content_window.addstr(i, 0, f"Task {i+1}") except: pass if idx == self.selected_index: self.content_window.attroff(curses.A_REVERSE) self.refresh() def select_next(self): """Select the next task if available.""" if self.tasks and self.selected_index < len(self.tasks) - 1: self.selected_index += 1 self.adjust_selection() self.refresh_content() def select_prev(self): """Select the previous task if available.""" if self.tasks and self.selected_index > 0: self.selected_index -= 1 self.adjust_selection() self.refresh_content() def get_selected_task(self): """Get the currently selected task.""" if self.tasks and 0 <= self.selected_index < len(self.tasks): return self.tasks[self.selected_index] return None class TaskDetailWindow(Window): def __init__(self, stdscr, height, width, y, x, title="Task Details"): """Initialize a task detail window.""" super().__init__(stdscr, height, width, y, x, title) self.task = None def set_task(self, task): """Set the task to display details for.""" self.task = task self.refresh_content() def refresh_content(self): """Refresh the task detail content.""" self.clear() if not self.task: self.content_window.addstr(0, 0, "No task selected") self.refresh() return # Display task properties content_height, content_width = self.get_content_dimensions() # Map priority and status to more readable forms priority_map = {1: "Low", 2: "Medium", 3: "High"} status_map = { "not_started": "Not Started", "in_progress": "In Progress", "completed": "Completed", # For backward compatibility "pending": "Not Started" } priority = priority_map.get(self.task['priority'], "Unknown") status = status_map.get(self.task['status'], "Unknown") # Display task details y = 0 self.content_window.addstr(y, 0, f"Title: {self.task['title']}") y += 2 self.content_window.addstr(y, 0, f"Status: {status}") y += 1 self.content_window.addstr(y, 0, f"Priority: {priority}") y += 2 # Display description with word wrapping self.content_window.addstr(y, 0, "Description:") y += 1 description = self.task['description'] or "No description provided." words = description.split() if words: line = "" for word in words: # Check if adding this word would exceed width if len(line) + len(word) + 1 > content_width: self.content_window.addstr(y, 0, line) y += 1 line = word else: if line: line += " " + word else: line = word # Add the last line if it has content if line: self.content_window.addstr(y, 0, line) y += 1 # Display created/updated timestamps y += 1 created = self.task.get('created_at', '').split('T')[0] updated = self.task.get('updated_at', '').split('T')[0] if created: self.content_window.addstr(y, 0, f"Created: {created}") y += 1 if updated and updated != created: self.content_window.addstr(y, 0, f"Updated: {updated}") self.refresh() class NotesWindow(Window): def __init__(self, stdscr, height, width, y, x, title="Notes"): """Initialize a notes window.""" super().__init__(stdscr, height, width, y, x, title) self.notes = "" self.edit_mode = False self.cursor_pos = 0 self.scroll_offset = 0 # Enable keypad for special key handling self.content_window.keypad(True) def set_notes(self, notes): """Set the notes to display.""" self.notes = notes if notes else "" self.refresh_content() def get_notes(self): """Get the current notes.""" return self.notes def toggle_edit_mode(self): """Toggle between view and edit mode.""" self.edit_mode = not self.edit_mode if self.edit_mode: curses.curs_set(1) # Show cursor else: curses.curs_set(0) # Hide cursor self.refresh_content() return self.edit_mode def handle_key(self, key): """Handle keyboard input in edit mode.""" if not self.edit_mode: return False try: # Simple key handling - safer approach if key in (10, 13, curses.KEY_ENTER): # Enter # Add a newline at end for simplicity self.notes += "\n" elif key in (curses.KEY_BACKSPACE, 127, 8): # Backspace # Remove last character if there are any if len(self.notes) > 0: self.notes = self.notes[:-1] elif 32 <= key <= 126: # Printable ASCII characters # Add character to the end self.notes += chr(key) # Refresh after any change self.refresh_content() return True except Exception as e: # Log error by adding to notes self.notes += f"\nError: {str(e)}\n" self.refresh_content() return True def adjust_scroll(self): """Adjust scroll offset to keep cursor visible.""" content_height, content_width = self.get_content_dimensions() # Count lines up to cursor lines_to_cursor = self.notes[:self.cursor_pos].count('\n') # Adjust scroll if cursor is off screen if lines_to_cursor < self.scroll_offset: self.scroll_offset = lines_to_cursor elif lines_to_cursor >= self.scroll_offset + content_height: self.scroll_offset = lines_to_cursor - content_height + 1 def refresh_content(self): """Refresh the notes content.""" try: self.clear() content_height, content_width = self.get_content_dimensions() # Simplified content display if not self.notes: if self.edit_mode: self.content_window.addstr(0, 0, "Type to add notes...") else: self.content_window.addstr(0, 0, "No notes. Press 'e' to edit.") else: # Just display the most recent part of notes (last few lines) lines = self.notes.split('\n') # Display only what fits in the window max_lines = min(content_height - 1, len(lines)) start_line = max(0, len(lines) - max_lines) for i in range(max_lines): line_idx = start_line + i if line_idx < len(lines): # Truncate line if needed display_line = lines[line_idx] if len(display_line) > content_width - 1: display_line = display_line[:content_width - 1] self.content_window.addstr(i, 0, display_line) # Add help text at bottom if self.edit_mode and content_height > 1: help_text = "Esc: Save & exit edit mode" if len(help_text) > content_width - 1: help_text = help_text[:content_width - 1] self.content_window.addstr(content_height - 1, 0, help_text) # In edit mode, position cursor at the end of content if self.edit_mode: # Count displayed lines to find end position line_count = min(max_lines if 'max_lines' in locals() else 0, content_height - 1) if line_count > 0: self.content_window.move(line_count - 1, 0) else: self.content_window.move(0, 0) self.refresh() except Exception as e: # If there's an error, try a minimal refresh try: self.clear() self.content_window.addstr(0, 0, "Notes") self.refresh() except: pass class PlanWindow(Window): def __init__(self, stdscr, height, width, y, x, title="Project Plan"): """Initialize a project plan window.""" super().__init__(stdscr, height, width, y, x, title) self.steps = [] self.selected_index = 0 self.scroll_offset = 0 self.show_details = False # Flag to control if details are shown def set_steps(self, steps): """Set the plan steps to display.""" self.steps = steps self.adjust_selection() self.refresh_content() def adjust_selection(self): """Adjust selection index and scroll offset to valid values.""" if not self.steps: self.selected_index = 0 self.scroll_offset = 0 return # Ensure selected_index is in valid range if self.selected_index < 0: self.selected_index = 0 elif self.selected_index >= len(self.steps): self.selected_index = max(0, len(self.steps) - 1) # Adjust scroll offset to keep selected item visible if self.selected_index < self.scroll_offset: self.scroll_offset = self.selected_index elif self.selected_index >= self.scroll_offset + self.max_visible_items: self.scroll_offset = max(0, self.selected_index - self.max_visible_items + 1) def refresh_content(self): """Refresh the plan content.""" self.clear() content_height, content_width = self.get_content_dimensions() if not self.steps: self.content_window.addstr(0, 0, "No plan steps") self.refresh() return selected_step = self.get_selected_step() # If showing details for the selected step if self.show_details and selected_step: self._display_step_details(selected_step, content_height, content_width) return # Otherwise display the list of steps list_height = min(content_height, len(self.steps)) # Display visible steps for i in range(min(self.max_visible_items, len(self.steps))): idx = i + self.scroll_offset if idx >= len(self.steps): break try: step = self.steps[idx] # Highlight selected step if idx == self.selected_index: self.content_window.attron(curses.A_REVERSE) # Format step with order and completion status completion_status = "[x]" if step['completed'] else "[ ]" # Get name or fallback to description for backward compatibility name = step.get('name', step.get('description', 'Unnamed step')) # Truncate name if needed max_name_width = content_width - 10 if len(name) > max_name_width: name = name[:max_name_width-3] + "..." # Display step line with safe index access order = step.get('order', 0) step_str = f"{order + 1:2d}. {completion_status} {name}" try: self.content_window.addstr(i, 0, step_str) except curses.error: # Handle display errors gracefully try: # Try with a simpler string self.content_window.addstr(i, 0, f"Step {order + 1}") except: pass if idx == self.selected_index: self.content_window.attroff(curses.A_REVERSE) except (IndexError, KeyError) as e: # Handle any index errors gracefully self.content_window.addstr(i, 0, f"Error displaying step: {str(e)}") # Add a help line at the bottom if there's space if content_height > list_height + 1: help_text = "Enter/Space: Toggle completion | D: Show/hide details" try: self.content_window.addstr(content_height - 1, 0, help_text) except curses.error: pass # Skip help text if it doesn't fit self.refresh() def _display_step_details(self, step, height, width): """Display detailed information for a plan step.""" y = 0 # Display step name name = step.get('name', 'Unnamed step') self.content_window.addstr(y, 0, f"Name: {name}") y += 2 # Display completion status completed = "Completed" if step.get('completed', False) else "Not completed" self.content_window.addstr(y, 0, f"Status: {completed}") y += 2 # Display description description = step.get('description', '') if description: self.content_window.addstr(y, 0, "Description:") y += 1 # Word wrap description words = description.split() line = "" for word in words: if len(line) + len(word) + 1 > width: self.content_window.addstr(y, 0, line) y += 1 line = word else: if line: line += " " + word else: line = word if line: self.content_window.addstr(y, 0, line) y += 1 y += 1 # Display detailed information details = step.get('details', '') if details: self.content_window.addstr(y, 0, "Details:") y += 1 # Word wrap details words = details.split() line = "" for word in words: if len(line) + len(word) + 1 > width: self.content_window.addstr(y, 0, line) y += 1 line = word else: if line: line += " " + word else: line = word if line: self.content_window.addstr(y, 0, line) y += 1 # Add a help line at the bottom help_text = "D: Return to plan list" self.content_window.addstr(height - 1, 0, help_text) self.refresh() def select_next(self): """Select the next step if available.""" if self.steps and self.selected_index < len(self.steps) - 1: self.selected_index += 1 self.adjust_selection() self.refresh_content() def select_prev(self): """Select the previous step if available.""" if self.steps and self.selected_index > 0: self.selected_index -= 1 self.adjust_selection() self.refresh_content() def get_selected_step(self): """Get the currently selected plan step.""" try: if self.steps and 0 <= self.selected_index < len(self.steps): return self.steps[self.selected_index] except (IndexError, KeyError): pass return None def toggle_details(self): """Toggle between displaying the step list and the details of the selected step.""" if self.get_selected_step(): self.show_details = not self.show_details self.refresh_content() return True return False class InputDialog: def __init__(self, stdscr, title, prompts, initial_values=None): """ Initialize an input dialog with multiple fields. Args: stdscr: The main curses window title: Dialog title prompts: List of field prompts initial_values: List of initial values for fields (optional) """ self.stdscr = stdscr self.title = title self.prompts = prompts # Initialize with empty values or provided initial values if initial_values is None: self.values = ["" for _ in range(len(prompts))] else: self.values = initial_values.copy() # Dialog dimensions screen_height, screen_width = stdscr.getmaxyx() self.width = min(60, screen_width - 4) self.height = len(prompts) * 2 + 4 # 2 lines per field + borders + buttons # Center dialog self.y = (screen_height - self.height) // 2 self.x = (screen_width - self.width) // 2 # Create window self.win = stdscr.subwin(self.height, self.width, self.y, self.x) self.win.keypad(True) # Enable keypad mode for special keys self.current_field = 0 self.cursor_pos = len(self.values[0]) if self.values and self.values[0] else 0 def show(self): """Show the dialog and handle input.""" curses.curs_set(1) # Show cursor # Enable special keys like backspace self.win.keypad(True) # Main input loop while True: self.draw() key = self.win.getch() if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter (different codes) curses.curs_set(0) # Hide cursor return self.values elif key == 27: # Escape curses.curs_set(0) # Hide cursor return None elif key == curses.KEY_UP and self.current_field > 0: self.current_field -= 1 self.cursor_pos = len(self.values[self.current_field]) elif key == curses.KEY_DOWN and self.current_field < len(self.prompts) - 1: self.current_field += 1 self.cursor_pos = len(self.values[self.current_field]) elif key == 9: # Tab self.current_field = (self.current_field + 1) % len(self.prompts) self.cursor_pos = len(self.values[self.current_field]) elif key == curses.KEY_LEFT and self.cursor_pos > 0: self.cursor_pos -= 1 elif key == curses.KEY_RIGHT and self.cursor_pos < len(self.values[self.current_field]): self.cursor_pos += 1 elif key in (curses.KEY_BACKSPACE, 127, 8): # Different backspace codes if self.cursor_pos > 0: self.values[self.current_field] = ( self.values[self.current_field][:self.cursor_pos - 1] + self.values[self.current_field][self.cursor_pos:] ) self.cursor_pos -= 1 elif key == curses.KEY_DC: # Delete if self.cursor_pos < len(self.values[self.current_field]): self.values[self.current_field] = ( self.values[self.current_field][:self.cursor_pos] + self.values[self.current_field][self.cursor_pos + 1:] ) elif 32 <= key <= 126: # Printable characters self.values[self.current_field] = ( self.values[self.current_field][:self.cursor_pos] + chr(key) + self.values[self.current_field][self.cursor_pos:] ) self.cursor_pos += 1 def draw(self): """Draw the dialog box and input fields.""" self.win.clear() self.win.box() # Draw title if self.title: title_str = f" {self.title} " x = max(1, (self.width - len(title_str)) // 2) self.win.addstr(0, x, title_str) # Draw input fields for i, prompt in enumerate(self.prompts): y = i * 2 + 1 self.win.addstr(y, 2, f"{prompt}:") # Draw input field field_x = 2 field_y = y + 1 field_width = self.width - 4 field_value = self.values[i] # Draw input value self.win.addstr(field_y, field_x, field_value) # Draw cursor if this is the active field if i == self.current_field: self.win.move(field_y, field_x + self.cursor_pos) # Draw instructions self.win.addstr(self.height - 1, 2, "Enter: Save | Esc: Cancel") self.win.refresh() class ConfirmDialog: def __init__(self, stdscr, title, message): """Initialize a confirmation dialog.""" self.stdscr = stdscr self.title = title self.message = message # Dialog dimensions screen_height, screen_width = stdscr.getmaxyx() self.width = min(50, screen_width - 4) # Calculate height based on message length message_lines = (len(message) // (self.width - 4)) + 1 self.height = message_lines + 4 # Message + borders + buttons # Center dialog self.y = (screen_height - self.height) // 2 self.x = (screen_width - self.width) // 2 # Create window self.win = stdscr.subwin(self.height, self.width, self.y, self.x) self.selected = 0 # 0 = No, 1 = Yes def show(self): """Show the dialog and handle input.""" # Enable keypad mode for special keys self.win.keypad(True) # Main input loop while True: self.draw() key = self.win.getch() if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter (different codes) return self.selected == 1 # Return True if "Yes" selected elif key == 27: # Escape return False elif key == curses.KEY_LEFT or key == curses.KEY_RIGHT: self.selected = 1 - self.selected # Toggle between 0 and 1 # Add handling for y/n keys elif key in (ord('y'), ord('Y')): return True elif key in (ord('n'), ord('N')): return False def draw(self): """Draw the confirmation dialog.""" self.win.clear() self.win.box() # Draw title if self.title: title_str = f" {self.title} " x = max(1, (self.width - len(title_str)) // 2) self.win.addstr(0, x, title_str) # Draw message message_words = self.message.split() line = "" y = 1 for word in message_words: if len(line) + len(word) + 1 <= self.width - 4: if line: line += " " + word else: line = word else: self.win.addstr(y, 2, line) y += 1 line = word if line: self.win.addstr(y, 2, line) # Draw buttons button_y = self.height - 2 no_x = self.width // 3 - 2 yes_x = 2 * self.width // 3 - 2 if self.selected == 0: self.win.attron(curses.A_REVERSE) self.win.addstr(button_y, no_x, " No ") if self.selected == 0: self.win.attroff(curses.A_REVERSE) if self.selected == 1: self.win.attron(curses.A_REVERSE) self.win.addstr(button_y, yes_x, " Yes ") if self.selected == 1: self.win.attroff(curses.A_REVERSE) self.win.refresh() ```