This is page 1 of 2. Use http://codebase.md/mryanmyn/task-manager-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── app
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api.py
│ │ └── cli.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── plan_manager.py
│ │ └── task_manager.py
│ └── ui
│ ├── __init__.py
│ ├── input_handler.py
│ ├── terminal_ui.py
│ └── ui_components.py
├── img.png
├── main.py
├── mcp_guidelines
│ ├── __init__.py
│ └── llms_full.txt
├── mcp_server_fixed.py
├── MCP-README.md
├── pyproject.toml
├── README.md
├── setup.py
└── tests
├── __init__.py
└── test_mcp_server.py
```
# Files
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Terminal Task Tracker
A terminal-based task tracking application with a three-pane layout for managing tasks and project plans.
# Image

## Features
- Three-pane terminal UI:
- Task list (top left)
- Task details (top right)
- Project plan (bottom, full width)
- Task management:
- Create, view, edit, and delete tasks
- Set priorities and status
- Add detailed descriptions
- Project plan management:
- Define high-level project steps
- Track step completion
- Reorder steps
- Complete API for programmatic access
- Command-line interface for scripting
- Data persistence
## Installation
```bash
# Clone the repository
git clone https://github.com/yourusername/terminal-task-tracker.git
cd terminal-task-tracker
# Install dependencies
pip install -e .
```
## Usage
### Terminal UI
To start the terminal UI:
```bash
python -m main.py
```
Key bindings:
- `Tab`: Cycle between windows
- `Up/Down`: Navigate lists
- `Enter`: Select task (in task list)
- `n`: New item (in task list or plan)
- `e`: Edit item
- `d`: Delete item
- `Space`: Toggle completion (in plan)
- `Esc`: Exit
### Command-line Interface
The CLI provides access to all functionality:
```bash
# List all tasks
python -m app.api.cli task list
# Add a new task
python -m app.api.cli task add "Implement feature X" --description "Details about feature X" --priority 2
# Mark a plan step as completed
python -m app.api.cli plan toggle STEP_ID
# Export data to JSON
python -m app.api.cli export data.json
```
### API Usage
```python
from app.core.task_manager import TaskManager
from app.core.plan_manager import PlanManager
from app.api.api import TaskTrackerAPI
# Initialize managers
task_manager = TaskManager("tasks.json")
plan_manager = PlanManager("plan.json")
# Create API
api = TaskTrackerAPI(task_manager, plan_manager)
# Add a task
task = api.add_task("Implement feature X", "Details about feature X", priority=2)
# Add a plan step
step = api.add_plan_step("Design architecture for shared operations module")
# Mark step as completed
api.toggle_plan_step(step["id"])
# Save data
api.save_all()
```
## Project Structure
```
terminal-task-tracker/
├── app/
│ ├── __init__.py
│ ├── core/ # Business logic
│ │ ├── __init__.py
│ │ ├── task_manager.py
│ │ └── plan_manager.py
│ ├── ui/ # Terminal UI
│ │ ├── __init__.py
│ │ ├── terminal_ui.py
│ │ ├── ui_components.py
│ │ └── input_handler.py
│ └── api/ # API and CLI
│ ├── __init__.py
│ ├── api.py
│ └── cli.py
├── main.py # Main application entry point
└── README.md
```
## Data Storage
By default, data is stored in the `~/.tasktracker` directory:
- `tasks.json`: Tasks data
- `plan.json`: Project plan data
- `notes.json`: Notes data
## License
MIT
```
--------------------------------------------------------------------------------
/mcp_guidelines/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/app/api/__init__.py:
--------------------------------------------------------------------------------
```python
# API and CLI module
```
--------------------------------------------------------------------------------
/app/ui/__init__.py:
--------------------------------------------------------------------------------
```python
# Terminal UI module
```
--------------------------------------------------------------------------------
/app/core/__init__.py:
--------------------------------------------------------------------------------
```python
# Core business logic module
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Tests for TaskTracker MCP package.
"""
```
--------------------------------------------------------------------------------
/app/__init__.py:
--------------------------------------------------------------------------------
```python
# Terminal Task Tracker application package
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
from setuptools import setup, find_packages
setup(
name="terminal-task-tracker",
version="0.1.0",
description="A terminal-based task tracking application with a three-pane layout",
author="Your Name",
author_email="[email protected]",
packages=find_packages(),
entry_points={
"console_scripts": [
"tasktracker=main:main",
],
},
python_requires=">=3.6",
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console :: Curses",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Topic :: Office/Business :: Scheduling",
"Topic :: Utilities"
],
)
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Terminal Task Tracker - Main Application Entry Point
A terminal-based task tracking application with a three-pane layout
for managing tasks and project plans.
"""
import os
import sys
from app.core.task_manager import TaskManager
from app.core.plan_manager import PlanManager
from app.api.api import TaskTrackerAPI
from app.ui.terminal_ui import TerminalUI
def main():
"""Main application entry point."""
try:
# Create data directory if it doesn't exist
home_dir = os.path.expanduser("~")
data_dir = os.path.join(home_dir, ".tasktracker")
os.makedirs(data_dir, exist_ok=True)
# Initialize managers
task_manager = TaskManager()
plan_manager = PlanManager()
# Create API
api = TaskTrackerAPI(task_manager, plan_manager)
# Run terminal UI
ui = TerminalUI(api)
ui.run()
# Save data on exit
api.save_all()
return 0
except Exception as e:
print(f"Error: {str(e)}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "tasktracker-mcp"
version = "0.1.0"
description = "A terminal-based task tracking application with MCP integration"
readme = "README.md"
authors = [
{name = "Your Name", email = "[email protected]"},
]
license = {text = "MIT"}
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"mcp>=0.1.0",
]
[project.optional-dependencies]
dev = [
"ruff>=0",
"pytest>=7.0.0",
"pytest-cov>=4.0.0",
"pyright>=0",
]
[tool.hatch.build.targets.wheel]
packages = ["app"]
[tool.ruff]
line-length = 88
target-version = "py38"
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"F", # pyflakes
"I", # isort
]
[tool.pyright]
include = ["app", "mcp_server.py"]
typeCheckingMode = "basic"
reportMissingTypeStubs = false
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = "test_*.py"
[project.scripts]
tasktracker-mcp = "mcp_server:main"
```
--------------------------------------------------------------------------------
/tests/test_mcp_server.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the TaskTracker MCP server.
"""
import json
import pytest
from unittest.mock import MagicMock, patch
# Import the MCP server
import sys
import os
# Add the parent directory to the path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import mcp_server
@pytest.fixture
def mock_api():
"""Create a mock TaskTrackerAPI for testing."""
mock = MagicMock()
# Set up return values for get_all_tasks
mock.get_all_tasks.return_value = [
{
"id": "test-task-1",
"title": "Test Task 1",
"description": "Description for test task 1",
"priority": 1,
"status": "not_started",
"created_at": "2025-04-08T00:00:00",
"updated_at": "2025-04-08T00:00:00"
}
]
# Set up return values for get_task
mock.get_task.return_value = {
"id": "test-task-1",
"title": "Test Task 1",
"description": "Description for test task 1",
"priority": 1,
"status": "not_started",
"created_at": "2025-04-08T00:00:00",
"updated_at": "2025-04-08T00:00:00"
}
# Set up return values for add_task
mock.add_task.return_value = {
"id": "new-task-1",
"title": "New Task",
"description": "Description for new task",
"priority": 1,
"status": "not_started",
"created_at": "2025-04-08T00:00:00",
"updated_at": "2025-04-08T00:00:00"
}
return mock
@patch("mcp_server.api")
def test_get_all_tasks(mock_api_module, mock_api):
"""Test the get_all_tasks resource."""
# Set the mock API
mock_api_module.get_all_tasks.return_value = mock_api.get_all_tasks.return_value
# Call the function
result = mcp_server.get_all_tasks()
# Assert the result
expected = json.dumps(mock_api.get_all_tasks.return_value, indent=2)
assert result == expected
mock_api_module.get_all_tasks.assert_called_once()
@patch("mcp_server.api")
def test_get_task(mock_api_module, mock_api):
"""Test the get_task resource."""
# Set the mock API
mock_api_module.get_task.return_value = mock_api.get_task.return_value
# Call the function
result = mcp_server.get_task("test-task-1")
# Assert the result
expected = json.dumps(mock_api.get_task.return_value, indent=2)
assert result == expected
mock_api_module.get_task.assert_called_once_with("test-task-1")
@patch("mcp_server.api")
def test_add_task(mock_api_module, mock_api):
"""Test the add_task tool."""
# Set the mock API
mock_api_module.add_task.return_value = mock_api.add_task.return_value
# Call the function
result = mcp_server.add_task(
title="New Task",
description="Description for new task",
priority=1,
status="not_started"
)
# Assert the result
assert result == mock_api.add_task.return_value
mock_api_module.add_task.assert_called_once_with(
"New Task", "Description for new task", 1, "not_started"
)
mock_api_module.save_all.assert_called_once()
def test_add_task_prompt():
"""Test the add_task_prompt."""
result = mcp_server.add_task_prompt(
title="Test Task",
description="Task description"
)
assert "Test Task" in result
assert "Task description" in result
def test_create_plan_prompt():
"""Test the create_plan_prompt."""
result = mcp_server.create_plan_prompt()
assert "project plan" in result.lower()
assert "clear steps" in result.lower()
```
--------------------------------------------------------------------------------
/app/core/task_manager.py:
--------------------------------------------------------------------------------
```python
import json
import os
import uuid
from datetime import datetime
class TaskManager:
def __init__(self, file_path=None, notes_file_path=None):
"""Initialize the TaskManager with optional file paths."""
home_dir = os.path.expanduser("~")
task_dir = os.path.join(home_dir, ".tasktracker")
os.makedirs(task_dir, exist_ok=True)
if file_path is None:
file_path = os.path.join(task_dir, "tasks.json")
if notes_file_path is None:
notes_file_path = os.path.join(task_dir, "notes.txt")
self.file_path = file_path
self.notes_file_path = notes_file_path
self.tasks = self._load_tasks()
self.notes = self._load_notes()
def _load_tasks(self):
"""Load tasks from the file or return an empty list if file doesn't exist."""
if os.path.exists(self.file_path):
try:
with open(self.file_path, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
return []
return []
def reload_tasks(self):
"""Reload tasks from file (for external changes like MCP)."""
self.tasks = self._load_tasks()
def _load_notes(self):
"""Load notes from file or return empty string if file doesn't exist."""
if os.path.exists(self.notes_file_path):
try:
with open(self.notes_file_path, 'r') as f:
return f.read()
except:
return ""
return ""
def reload_notes(self):
"""Reload notes from file (for external changes like MCP)."""
self.notes = self._load_notes()
def save_tasks(self):
"""Save tasks to the file."""
with open(self.file_path, 'w') as f:
json.dump(self.tasks, f, indent=2)
def save_notes(self, notes_text):
"""Save notes to the file."""
self.notes = notes_text
with open(self.notes_file_path, 'w') as f:
f.write(notes_text)
def get_notes(self):
"""Get the current notes."""
return self.notes
def get_all_tasks(self):
"""Return all tasks."""
return self.tasks
def get_task(self, task_id):
"""Get a task by ID."""
for task in self.tasks:
if task["id"] == task_id:
return task
return None
def add_task(self, title, description="", priority=1, status="not_started"):
"""Add a new task."""
# Validate status
valid_statuses = ["not_started", "in_progress", "completed"]
if status not in valid_statuses:
status = "not_started"
task = {
"id": str(uuid.uuid4()),
"title": title,
"description": description,
"priority": priority,
"status": status,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
self.tasks.append(task)
self.save_tasks()
return task
def update_task(self, task_id, **kwargs):
"""Update a task by ID."""
task = self.get_task(task_id)
if task:
for key, value in kwargs.items():
if key in task and key not in ["id", "created_at"]:
task[key] = value
task["updated_at"] = datetime.now().isoformat()
self.save_tasks()
return task
return None
def delete_task(self, task_id):
"""Delete a task by ID."""
task = self.get_task(task_id)
if task:
self.tasks.remove(task)
self.save_tasks()
return True
return False
```
--------------------------------------------------------------------------------
/app/api/api.py:
--------------------------------------------------------------------------------
```python
from app.core.task_manager import TaskManager
from app.core.plan_manager import PlanManager
class TaskTrackerAPI:
def __init__(self, task_manager=None, plan_manager=None):
"""Initialize the TaskTrackerAPI with task and plan managers."""
self.task_manager = task_manager or TaskManager()
self.plan_manager = plan_manager or PlanManager()
# Task methods
def get_all_tasks(self):
"""Get all tasks."""
return self.task_manager.get_all_tasks()
def get_task(self, task_id):
"""Get a task by ID."""
return self.task_manager.get_task(task_id)
def add_task(self, title, description="", priority=1, status="pending"):
"""Add a new task."""
return self.task_manager.add_task(title, description, priority, status)
def update_task(self, task_id, **kwargs):
"""Update a task by ID."""
return self.task_manager.update_task(task_id, **kwargs)
def delete_task(self, task_id):
"""Delete a task by ID."""
return self.task_manager.delete_task(task_id)
# Plan methods
def get_all_plan_steps(self):
"""Get all plan steps."""
return self.plan_manager.get_all_steps()
def get_plan_step(self, step_id):
"""Get a plan step by ID."""
return self.plan_manager.get_step(step_id)
def add_plan_step(self, name, description="", details="", order=None, completed=False):
"""Add a new plan step."""
return self.plan_manager.add_step(name, description, details, order, completed)
def update_plan_step(self, step_id, **kwargs):
"""Update a plan step by ID."""
return self.plan_manager.update_step(step_id, **kwargs)
def toggle_plan_step(self, step_id):
"""Toggle the completion status of a plan step."""
return self.plan_manager.toggle_step(step_id)
def delete_plan_step(self, step_id):
"""Delete a plan step by ID."""
return self.plan_manager.delete_step(step_id)
def reorder_plan_steps(self):
"""Reorder plan steps to ensure consistent ordering."""
return self.plan_manager.reorder_steps()
# Notes methods
def get_notes(self):
"""Get the notes."""
return self.task_manager.get_notes()
def save_notes(self, notes_text):
"""Save notes."""
self.task_manager.save_notes(notes_text)
return True
# Data management
def save_all(self):
"""Save all data to files."""
self.task_manager.save_tasks()
self.plan_manager.save_plan()
return True
def reload_all(self):
"""Reload all data from files (for external changes like MCP)."""
self.task_manager.reload_tasks()
self.task_manager.reload_notes()
self.plan_manager.reload_plan()
return True
def export_data(self, file_path):
"""Export all data to a single JSON file."""
import json
data = {
"tasks": self.get_all_tasks(),
"plan": self.get_all_plan_steps(),
"notes": self.get_notes()
}
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
return True
def import_data(self, file_path):
"""Import data from a JSON file."""
import json
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Clear existing data
self.task_manager.tasks = data.get("tasks", [])
self.plan_manager.plan_steps = data.get("plan", [])
# Import notes if available
if "notes" in data:
self.task_manager.save_notes(data["notes"])
# Save imported data
self.save_all()
return True
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error importing data: {e}")
return False
```
--------------------------------------------------------------------------------
/MCP-README.md:
--------------------------------------------------------------------------------
```markdown
# TaskTracker MCP Server
TaskTracker MCP is a Model Context Protocol compatible version of the Terminal Task Tracker application. It exposes task and project plan management capabilities through MCP, allowing Language Models to interact with your task tracking data.
## Features
- **Resources**: Access tasks, plans, and notes data
- **Tools**: Create, update, and delete tasks and plan steps
- **Prompts**: Templates for common task management activities
## Installation
```bash
# Using uv (recommended)
uv add -e .
# Or with pip
pip install -e .
```
## Usage with Claude Desktop
```bash
# Install in Claude Desktop
mcp install mcp_server_fixed.py
# Run with MCP Inspector
mcp dev mcp_server_fixed.py
```
NOTE: If you encounter errors with `mcp_server.py`, please use the fixed version `mcp_server_fixed.py` instead. The fixed version uses a global API instance for resources and properly handles context parameters for tools.
## Running Directly
```bash
# Run server directly
python mcp_server_fixed.py
# Or using MCP CLI
mcp run mcp_server_fixed.py
```
## Resources
TaskTracker exposes the following resources:
- `tasks://all` - List all tasks
- `tasks://{task_id}` - Get a specific task
- `plan://all` - List all plan steps
- `plan://{step_id}` - Get a specific plan step
- `notes://all` - Get all notes
## Tools
TaskTracker provides the following tools:
### Task Tools
- `get_all_tasks_tool` - Get all tasks
- `get_task_tool` - Get a specific task by ID
- `add_task` - Create a new task
- `update_task` - Update an existing task
- `delete_task` - Delete a task
### Plan Tools
- `get_all_plan_steps_tool` - Get all plan steps
- `get_plan_step_tool` - Get a specific plan step by ID
- `add_plan_step` - Create a new plan step
- `update_plan_step` - Update an existing plan step
- `delete_plan_step` - Delete a plan step
- `toggle_plan_step` - Toggle completion status
### Notes and Data Tools
- `get_notes_tool` - Get all notes
- `save_notes` - Save notes
- `export_data` - Export all data to JSON
- `import_data` - Import data from JSON
## Prompts
TaskTracker includes the following prompts:
- `add_task_prompt` - Template for adding a new task
- `create_plan_prompt` - Template for creating a project plan
## Example Interactions
### Adding and Retrieving Tasks
```
> call-tool add_task "Implement login feature" "Create authentication endpoints for user login" 1 "not_started"
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"title": "Implement login feature",
"description": "Create authentication endpoints for user login",
"priority": 1,
"status": "not_started",
"created_at": "2025-04-08T14:32:15.123456",
"updated_at": "2025-04-08T14:32:15.123456"
}
> call-tool get_all_tasks_tool
[
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"title": "Implement login feature",
"description": "Create authentication endpoints for user login",
"priority": 1,
"status": "not_started",
"created_at": "2025-04-08T14:32:15.123456",
"updated_at": "2025-04-08T14:32:15.123456"
}
]
> read-resource tasks://all
[
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"title": "Implement login feature",
"description": "Create authentication endpoints for user login",
"priority": 1,
"status": "not_started",
"created_at": "2025-04-08T14:32:15.123456",
"updated_at": "2025-04-08T14:32:15.123456"
}
]
```
### Managing Project Plans
```
> call-tool add_plan_step "Design API endpoints" "Create OpenAPI specification for endpoints" "Include authentication routes" 0 false
{
"id": "550e8400-e29b-41d4-a716-446655440001",
"name": "Design API endpoints",
"description": "Create OpenAPI specification for endpoints",
"details": "Include authentication routes",
"order": 0,
"completed": false,
"created_at": "2025-04-08T14:33:15.123456",
"updated_at": "2025-04-08T14:33:15.123456"
}
> call-tool get_all_plan_steps_tool
[
{
"id": "550e8400-e29b-41d4-a716-446655440001",
"name": "Design API endpoints",
"description": "Create OpenAPI specification for endpoints",
"details": "Include authentication routes",
"order": 0,
"completed": false,
"created_at": "2025-04-08T14:33:15.123456",
"updated_at": "2025-04-08T14:33:15.123456"
}
]
```
## License
MIT
```
--------------------------------------------------------------------------------
/app/core/plan_manager.py:
--------------------------------------------------------------------------------
```python
import json
import os
import uuid
from datetime import datetime
class PlanManager:
def __init__(self, file_path=None):
"""Initialize the PlanManager with an optional file path."""
if file_path is None:
home_dir = os.path.expanduser("~")
plan_dir = os.path.join(home_dir, ".tasktracker")
os.makedirs(plan_dir, exist_ok=True)
file_path = os.path.join(plan_dir, "plan.json")
self.file_path = file_path
self.plan_steps = self._load_plan()
def _load_plan(self):
"""Load plan steps from the file or return an empty list if file doesn't exist."""
if os.path.exists(self.file_path):
try:
with open(self.file_path, 'r') as f:
data = json.load(f)
# Ensure we always return a list
if isinstance(data, list):
return data
elif isinstance(data, dict) and "steps" in data:
return data["steps"]
else:
# If it's not a list or doesn't have steps key, return empty list
return []
except (json.JSONDecodeError, IOError):
return []
return []
def reload_plan(self):
"""Reload plan steps from file (for external changes like MCP)."""
self.plan_steps = self._load_plan()
def save_plan(self):
"""Save plan steps to the file."""
# Ensure plan_steps is a list before saving
if not isinstance(self.plan_steps, list):
self.plan_steps = []
with open(self.file_path, 'w') as f:
json.dump(self.plan_steps, f, indent=2)
def get_all_steps(self):
"""Return all plan steps."""
# Ensure we always return a list
if not isinstance(self.plan_steps, list):
self.plan_steps = []
return self.plan_steps
def get_step(self, step_id):
"""Get a plan step by ID."""
# Ensure plan_steps is a list
if not isinstance(self.plan_steps, list):
self.plan_steps = []
return None
for step in self.plan_steps:
if step["id"] == step_id:
return step
return None
def add_step(self, name, description="", details="", order=None, completed=False):
"""Add a new plan step."""
# Ensure plan_steps is a list
if not isinstance(self.plan_steps, list):
self.plan_steps = []
if order is None:
# Place at the end by default
order = len(self.plan_steps)
step = {
"id": str(uuid.uuid4()),
"name": name,
"description": description,
"details": details,
"order": order,
"completed": completed,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
# Insert at the specified order
self.plan_steps.append(step)
self.reorder_steps()
self.save_plan()
return step
def update_step(self, step_id, **kwargs):
"""Update a plan step by ID."""
step = self.get_step(step_id)
if step:
for key, value in kwargs.items():
if key in step and key not in ["id", "created_at"]:
step[key] = value
step["updated_at"] = datetime.now().isoformat()
# If order changed, reorder all steps
if "order" in kwargs:
self.reorder_steps()
self.save_plan()
return step
return None
def toggle_step(self, step_id):
"""Toggle the completion status of a plan step."""
step = self.get_step(step_id)
if step:
step["completed"] = not step["completed"]
step["updated_at"] = datetime.now().isoformat()
self.save_plan()
return step
return None
def delete_step(self, step_id):
"""Delete a plan step by ID."""
# Ensure plan_steps is a list
if not isinstance(self.plan_steps, list):
self.plan_steps = []
return False
step = self.get_step(step_id)
if step:
self.plan_steps.remove(step)
self.reorder_steps()
self.save_plan()
return True
return False
def reorder_steps(self):
"""Reorder steps to ensure consistent ordering."""
# Ensure plan_steps is a list
if not isinstance(self.plan_steps, list):
self.plan_steps = []
return
# Sort by the current order
self.plan_steps.sort(key=lambda x: x.get("order", 0))
# Update order field to match actual position
for i, step in enumerate(self.plan_steps):
step["order"] = i
```
--------------------------------------------------------------------------------
/app/api/cli.py:
--------------------------------------------------------------------------------
```python
import argparse
import sys
import os
from app.api.api import TaskTrackerAPI
from app.core.task_manager import TaskManager
from app.core.plan_manager import PlanManager
def create_parser():
"""Create the command line argument parser."""
parser = argparse.ArgumentParser(description='Terminal Task Tracker CLI')
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# Task commands
task_parser = subparsers.add_parser('task', help='Task operations')
task_subparsers = task_parser.add_subparsers(dest='subcommand', help='Task subcommand')
# task list
task_list_parser = task_subparsers.add_parser('list', help='List all tasks')
# task show
task_show_parser = task_subparsers.add_parser('show', help='Show task details')
task_show_parser.add_argument('task_id', help='Task ID')
# task add
task_add_parser = task_subparsers.add_parser('add', help='Add a new task')
task_add_parser.add_argument('title', help='Task title')
task_add_parser.add_argument('--description', '-d', help='Task description')
task_add_parser.add_argument('--priority', '-p', type=int, choices=[1, 2, 3], default=1,
help='Task priority (1=Low, 2=Medium, 3=High)')
task_add_parser.add_argument('--status', '-s',
choices=['not_started', 'in_progress', 'completed'],
default='not_started', help='Task status')
# task update
task_update_parser = task_subparsers.add_parser('update', help='Update a task')
task_update_parser.add_argument('task_id', help='Task ID')
task_update_parser.add_argument('--title', '-t', help='Task title')
task_update_parser.add_argument('--description', '-d', help='Task description')
task_update_parser.add_argument('--priority', '-p', type=int, choices=[1, 2, 3],
help='Task priority (1=Low, 2=Medium, 3=High)')
task_update_parser.add_argument('--status', '-s',
choices=['not_started', 'in_progress', 'completed'],
help='Task status')
# task delete
task_delete_parser = task_subparsers.add_parser('delete', help='Delete a task')
task_delete_parser.add_argument('task_id', help='Task ID')
# Plan commands
plan_parser = subparsers.add_parser('plan', help='Plan operations')
plan_subparsers = plan_parser.add_subparsers(dest='subcommand', help='Plan subcommand')
# plan list
plan_list_parser = plan_subparsers.add_parser('list', help='List all plan steps')
# plan show
plan_show_parser = plan_subparsers.add_parser('show', help='Show plan step details')
plan_show_parser.add_argument('step_id', help='Step ID')
# plan add
plan_add_parser = plan_subparsers.add_parser('add', help='Add a new plan step')
plan_add_parser.add_argument('name', help='Step name')
plan_add_parser.add_argument('--description', '-d', help='Brief description')
plan_add_parser.add_argument('--details', '-D', help='Detailed information')
plan_add_parser.add_argument('--order', '-o', type=int, help='Step order (position in plan)')
plan_add_parser.add_argument('--completed', '-c', action='store_true',
help='Mark step as completed')
# plan update
plan_update_parser = plan_subparsers.add_parser('update', help='Update a plan step')
plan_update_parser.add_argument('step_id', help='Step ID')
plan_update_parser.add_argument('--name', '-n', help='Step name')
plan_update_parser.add_argument('--description', '-d', help='Brief description')
plan_update_parser.add_argument('--details', '-D', help='Detailed information')
plan_update_parser.add_argument('--order', '-o', type=int, help='Step order (position in plan)')
# plan toggle
plan_toggle_parser = plan_subparsers.add_parser('toggle',
help='Toggle completion status of a plan step')
plan_toggle_parser.add_argument('step_id', help='Step ID')
# plan delete
plan_delete_parser = plan_subparsers.add_parser('delete', help='Delete a plan step')
plan_delete_parser.add_argument('step_id', help='Step ID')
# Export/Import commands
export_parser = subparsers.add_parser('export', help='Export data to JSON file')
export_parser.add_argument('file_path', help='Path to export file')
import_parser = subparsers.add_parser('import', help='Import data from JSON file')
import_parser.add_argument('file_path', help='Path to import file')
return parser
def main():
"""Main CLI entry point."""
parser = create_parser()
args = parser.parse_args()
# Initialize API
api = TaskTrackerAPI()
if not args.command:
parser.print_help()
return
try:
# Task commands
if args.command == 'task':
if args.subcommand == 'list':
tasks = api.get_all_tasks()
if not tasks:
print("No tasks found.")
else:
print(f"{'ID':<36} {'Title':<30} {'Priority':<8} {'Status':<12}")
print("-" * 90)
for task in tasks:
print(f"{task['id']:<36} {task['title'][:30]:<30} {task['priority']:<8} {task['status']:<12}")
elif args.subcommand == 'show':
task = api.get_task(args.task_id)
if task:
print(f"ID: {task['id']}")
print(f"Title: {task['title']}")
print(f"Description: {task['description']}")
print(f"Priority: {task['priority']}")
print(f"Status: {task['status']}")
print(f"Created: {task['created_at']}")
print(f"Updated: {task['updated_at']}")
else:
print(f"Task not found: {args.task_id}")
elif args.subcommand == 'add':
task = api.add_task(
args.title,
args.description or "",
args.priority,
args.status
)
print(f"Task added: {task['id']}")
elif args.subcommand == 'update':
# Collect the fields to update
update_fields = {}
if args.title:
update_fields['title'] = args.title
if args.description:
update_fields['description'] = args.description
if args.priority:
update_fields['priority'] = args.priority
if args.status:
update_fields['status'] = args.status
task = api.update_task(args.task_id, **update_fields)
if task:
print(f"Task updated: {task['id']}")
else:
print(f"Task not found: {args.task_id}")
elif args.subcommand == 'delete':
result = api.delete_task(args.task_id)
if result:
print(f"Task deleted: {args.task_id}")
else:
print(f"Task not found: {args.task_id}")
else:
parser.print_help()
# Plan commands
elif args.command == 'plan':
if args.subcommand == 'list':
steps = api.get_all_plan_steps()
if not steps:
print("No plan steps found.")
else:
print(f"{'Order':<6} {'Completed':<10} {'ID':<36} {'Description'}")
print("-" * 90)
for step in steps:
completed = "[x]" if step['completed'] else "[ ]"
print(f"{step['order']:<6} {completed:<10} {step['id']:<36} {step['description']}")
elif args.subcommand == 'show':
step = api.get_plan_step(args.step_id)
if step:
completed = "Yes" if step['completed'] else "No"
print(f"ID: {step['id']}")
print(f"Name: {step.get('name', 'N/A')}")
print(f"Description: {step.get('description', '')}")
print(f"Order: {step.get('order', 0)}")
print(f"Completed: {completed}")
# Print details if available
details = step.get('details', '')
if details:
print("\nDetails:")
print(details)
print(f"\nCreated: {step.get('created_at', 'N/A')}")
print(f"Updated: {step.get('updated_at', 'N/A')}")
else:
print(f"Plan step not found: {args.step_id}")
elif args.subcommand == 'add':
step = api.add_plan_step(
args.name,
args.description or "",
args.details or "",
args.order,
args.completed
)
print(f"Plan step added: {step['id']}")
elif args.subcommand == 'update':
# Collect the fields to update
update_fields = {}
if args.name:
update_fields['name'] = args.name
if args.description:
update_fields['description'] = args.description
if args.details:
update_fields['details'] = args.details
if args.order is not None:
update_fields['order'] = args.order
step = api.update_plan_step(args.step_id, **update_fields)
if step:
print(f"Plan step updated: {step['id']}")
else:
print(f"Plan step not found: {args.step_id}")
elif args.subcommand == 'toggle':
step = api.toggle_plan_step(args.step_id)
if step:
completed = "completed" if step['completed'] else "not completed"
print(f"Plan step {args.step_id} marked as {completed}")
else:
print(f"Plan step not found: {args.step_id}")
elif args.subcommand == 'delete':
result = api.delete_plan_step(args.step_id)
if result:
print(f"Plan step deleted: {args.step_id}")
else:
print(f"Plan step not found: {args.step_id}")
else:
parser.print_help()
# Export/Import commands
elif args.command == 'export':
result = api.export_data(args.file_path)
if result:
print(f"Data exported to {args.file_path}")
else:
print("Error exporting data")
elif args.command == 'import':
result = api.import_data(args.file_path)
if result:
print(f"Data imported from {args.file_path}")
else:
print("Error importing data")
else:
parser.print_help()
except Exception as e:
print(f"Error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/app/ui/terminal_ui.py:
--------------------------------------------------------------------------------
```python
import curses
import traceback
import os
import time
from app.ui.ui_components import TaskListWindow, TaskDetailWindow, PlanWindow, NotesWindow, InputDialog, ConfirmDialog
from app.ui.input_handler import InputHandler, FocusArea
class TerminalUI:
def __init__(self, api):
"""Initialize the terminal UI with a reference to the API."""
self.api = api
self.stdscr = None
self.task_list_win = None
self.task_detail_win = None
self.plan_win = None
self.notes_win = None
self.input_handler = None
self.notes_visible = True # Flag to control notes visibility
# File modification tracking
self.last_tasks_mtime = 0
self.last_plan_mtime = 0
self.last_notes_mtime = 0
self.last_check_time = 0
self.file_check_interval = 1.0 # Check for file changes every second
def run(self):
"""Run the terminal UI."""
try:
# Start curses application
curses.wrapper(self._main)
except Exception as e:
# If an error occurs, restore terminal and show traceback
if self.stdscr:
# Reset timeout to blocking before exiting to prevent potential issues
self.stdscr.timeout(-1)
curses.endwin()
print(f"An error occurred: {str(e)}")
traceback.print_exc()
def _main(self, stdscr):
"""Main function for the curses application."""
self.stdscr = stdscr
# Set up curses
curses.curs_set(0) # Hide cursor
stdscr.clear()
# Set up input handler
self.input_handler = InputHandler(self)
# Create initial layout
self._create_layout()
# Initial data load
try:
self.refresh_tasks()
self.refresh_plan()
self.refresh_notes()
# Initialize last modified times after initial load
task_file = self.api.task_manager.file_path
plan_file = self.api.plan_manager.file_path
notes_file = self.api.task_manager.notes_file_path
if os.path.exists(task_file):
self.last_tasks_mtime = os.path.getmtime(task_file)
if os.path.exists(plan_file):
self.last_plan_mtime = os.path.getmtime(plan_file)
if os.path.exists(notes_file):
self.last_notes_mtime = os.path.getmtime(notes_file)
self.last_check_time = time.time()
except Exception as e:
self.show_message(f"Error loading data: {str(e)}")
# Main event loop
while True:
# Check for external file changes (e.g., from MCP)
self.check_file_changes()
# Update the screen
stdscr.refresh()
# Configure timeout for getch to allow polling for file changes
stdscr.timeout(100) # 100ms timeout
# Get input (returns -1 if no input available)
key = stdscr.getch()
# Reset timeout to blocking mode if we actually got a key
if key != -1:
stdscr.timeout(-1)
# Handle input (exit if handler returns False)
if not self.input_handler.handle_input(key):
break
else:
# No input, just continue the loop to check for file changes
continue
def _create_layout(self):
"""Create the initial window layout."""
screen_height, screen_width = self.stdscr.getmaxyx()
# Calculate dimensions for initial layout
top_height = screen_height // 2
main_width = screen_width - 30 # Reserve 30 cols for notes on the right
task_width = main_width // 2
detail_width = main_width - task_width
plan_height = screen_height - top_height
notes_width = screen_width - main_width
# Create windows
self.task_list_win = TaskListWindow(
self.stdscr, top_height, task_width, 0, 0, "Tasks"
)
self.task_detail_win = TaskDetailWindow(
self.stdscr, top_height, detail_width, 0, task_width, "Task Details"
)
self.plan_win = PlanWindow(
self.stdscr, plan_height, main_width, top_height, 0, "Project Plan"
)
self.notes_win = NotesWindow(
self.stdscr, screen_height, notes_width, 0, main_width, "Notes"
)
# Initial refresh
self.task_list_win.refresh()
self.task_detail_win.refresh()
self.plan_win.refresh()
if self.notes_visible:
self.notes_win.refresh()
def toggle_notes_visibility(self):
"""Toggle the visibility of the notes window."""
self.notes_visible = not self.notes_visible
# If hiding and notes is the active focus, change focus to tasks
if not self.notes_visible and self.input_handler.focus == FocusArea.NOTES:
self.input_handler.focus = FocusArea.TASKS
self.update_focus(FocusArea.TASKS)
# Redraw layout (this will resize all windows accordingly)
self._resize_layout()
# If notes are hidden, make sure we redraw all other windows
if not self.notes_visible:
# Ensure each window is refreshed with its contents
self.task_list_win.refresh_content()
self.task_detail_win.refresh_content()
self.plan_win.refresh_content()
# Refresh the stdscr to ensure proper redraw of everything
self.stdscr.refresh()
return self.notes_visible
def _resize_layout(self):
"""Resize the window layout."""
screen_height, screen_width = self.stdscr.getmaxyx()
# Calculate dimensions based on notes visibility
if self.notes_visible:
main_width = screen_width - 30 # Reserve 30 cols for notes on the right
notes_width = screen_width - main_width
else:
main_width = screen_width # Use full width when notes are hidden
notes_width = 0
top_height = screen_height // 2
task_width = main_width // 2
detail_width = main_width - task_width
plan_height = screen_height - top_height
# Resize windows
self.task_list_win.resize(top_height, task_width, 0, 0)
self.task_detail_win.resize(top_height, detail_width, 0, task_width)
self.plan_win.resize(plan_height, main_width, top_height, 0)
# Only resize notes window if visible
if self.notes_visible:
self.notes_win.resize(screen_height, notes_width, 0, main_width)
# Refresh content
self.task_list_win.refresh_content()
self.task_detail_win.refresh_content()
self.plan_win.refresh_content()
# Only refresh notes if visible
if self.notes_visible:
self.notes_win.refresh_content()
def refresh_tasks(self):
"""Refresh task list and details."""
tasks = self.api.get_all_tasks()
# Sort tasks by priority (high to low) and then by status
tasks.sort(key=lambda x: (-x['priority'], x['status']))
self.task_list_win.set_tasks(tasks)
# Update task details if there's a selected task
selected_task = self.task_list_win.get_selected_task()
self.task_detail_win.set_task(selected_task)
def refresh_plan(self):
"""Refresh the project plan."""
try:
steps = self.api.get_all_plan_steps()
# Validate steps before setting them
if steps is None:
steps = []
# Steps are already sorted by order in the API
self.plan_win.set_steps(steps)
except Exception as e:
# Handle errors gracefully
self.plan_win.set_steps([])
raise Exception(f"Failed to load plan: {str(e)}")
def refresh_notes(self):
"""Load and refresh the notes content."""
notes = self.api.get_notes()
self.notes_win.set_notes(notes)
def save_notes(self):
"""Save the current notes content."""
notes_text = self.notes_win.get_notes()
self.api.save_notes(notes_text)
def check_file_changes(self):
"""Check if any data files have been modified externally (like by MCP)."""
try:
current_time = time.time()
# Only check periodically to reduce file system access
if current_time - self.last_check_time < self.file_check_interval:
return False
self.last_check_time = current_time
changes_detected = False
# Get file paths from the API's managers
task_file = self.api.task_manager.file_path
plan_file = self.api.plan_manager.file_path
notes_file = self.api.task_manager.notes_file_path
# Check if any data files have been modified
tasks_changed = os.path.exists(task_file) and os.path.getmtime(task_file) > self.last_tasks_mtime
plan_changed = os.path.exists(plan_file) and os.path.getmtime(plan_file) > self.last_plan_mtime
notes_changed = os.path.exists(notes_file) and os.path.getmtime(notes_file) > self.last_notes_mtime
if tasks_changed or plan_changed or notes_changed:
# Update last modified times
if tasks_changed:
self.last_tasks_mtime = os.path.getmtime(task_file)
if plan_changed:
self.last_plan_mtime = os.path.getmtime(plan_file)
if notes_changed:
self.last_notes_mtime = os.path.getmtime(notes_file)
try:
# Reload all data from files
self.api.reload_all()
# Refresh UI components with individual try-except blocks
try:
if tasks_changed:
self.refresh_tasks()
except Exception as e:
# Silently handle task refresh error
pass
try:
if plan_changed:
self.refresh_plan()
except Exception as e:
# Silently handle plan refresh error
pass
try:
if notes_changed:
self.refresh_notes()
except Exception as e:
# Silently handle notes refresh error
pass
changes_detected = True
except Exception as e:
# If reload fails, try to continue without crashing
pass
return changes_detected
except Exception as e:
# Fail silently if file checking itself fails
return False
def update_focus(self, focus):
"""Update the UI focus."""
# Reset all titles first
self.task_list_win.set_title("Tasks")
self.task_detail_win.set_title("Task Details")
self.plan_win.set_title("Project Plan")
if self.notes_visible:
self.notes_win.set_title("Notes")
# Highlight the active window by changing its title
if focus == FocusArea.TASKS:
self.task_list_win.set_title("Tasks [Active]")
elif focus == FocusArea.DETAILS:
self.task_detail_win.set_title("Task Details [Active]")
elif focus == FocusArea.PLAN:
self.plan_win.set_title("Project Plan [Active]")
elif focus == FocusArea.NOTES and self.notes_visible:
self.notes_win.set_title("Notes [Active]")
# Clear screen to remove artifacts
self.stdscr.erase()
self.stdscr.refresh()
# Refresh the content of all windows
self.task_list_win.refresh_content()
self.task_detail_win.refresh_content()
self.plan_win.refresh_content()
# Only refresh notes if visible
if self.notes_visible:
self.notes_win.refresh_content()
def show_input_dialog(self, title, prompts, initial_values=None):
"""Show an input dialog and return the entered values or None if canceled."""
dialog = InputDialog(self.stdscr, title, prompts, initial_values)
result = dialog.show()
# Redraw the entire screen after dialog closes
self.stdscr.clear()
self.stdscr.refresh()
self._resize_layout()
return result
def show_confirm_dialog(self, title, message):
"""Show a confirmation dialog and return True if confirmed, False otherwise."""
dialog = ConfirmDialog(self.stdscr, title, message)
result = dialog.show()
# Redraw the entire screen after dialog closes
self.stdscr.clear()
self.stdscr.refresh()
self._resize_layout()
return result
def show_message(self, message):
"""Show a temporary message at the bottom of the screen."""
screen_height, screen_width = self.stdscr.getmaxyx()
# Create a small window for the message
msg_height = 3
msg_width = min(len(message) + 4, screen_width - 4)
msg_y = (screen_height - msg_height) // 2
msg_x = (screen_width - msg_width) // 2
# Create message window
msg_win = self.stdscr.subwin(msg_height, msg_width, msg_y, msg_x)
msg_win.box()
msg_win.addstr(1, 2, message[:msg_width - 4])
msg_win.addstr(msg_height - 1, 2, "Press any key to continue")
msg_win.refresh()
# Wait for a key press
self.stdscr.getch()
# Redraw the entire screen
self.stdscr.clear()
self.stdscr.refresh()
self._resize_layout()
```
--------------------------------------------------------------------------------
/app/ui/input_handler.py:
--------------------------------------------------------------------------------
```python
import curses
from enum import Enum
class FocusArea(Enum):
TASKS = 0
DETAILS = 1
PLAN = 2
NOTES = 3
class InputHandler:
def __init__(self, terminal_ui):
"""Initialize the input handler with a reference to the terminal UI."""
self.terminal_ui = terminal_ui
self.focus = FocusArea.TASKS
def handle_input(self, key):
"""
Handle keyboard input and dispatch to appropriate handlers.
Returns True if the application should continue, False if it should exit.
"""
# If notes is in edit mode, we need special handling
if self.focus == FocusArea.NOTES and self.terminal_ui.notes_win.edit_mode:
# Escape exits edit mode in notes
if key == 27: # Escape
self.terminal_ui.notes_win.toggle_edit_mode()
self.terminal_ui.save_notes()
return True
# All other keys are processed by the notes window
try:
self.terminal_ui.notes_win.handle_key(key)
return True
except Exception as e:
self.terminal_ui.show_message(f"Error in notes edit: {str(e)}")
return True
# Global keys (work in any context)
if key == 27: # Escape
return self._handle_escape()
elif key == 9: # Tab
self._cycle_focus()
return True
elif key == 24: # Ctrl+X - toggle notes visibility (ASCII 24 is Ctrl+X)
self.terminal_ui.toggle_notes_visibility()
return True
# Focus-specific input handling
if self.focus == FocusArea.TASKS:
return self._handle_tasks_input(key)
elif self.focus == FocusArea.DETAILS:
return self._handle_details_input(key)
elif self.focus == FocusArea.PLAN:
return self._handle_plan_input(key)
elif self.focus == FocusArea.NOTES:
return self._handle_notes_input(key)
return True
def _handle_escape(self):
"""Handle the escape key - confirm exit."""
# Reset timeout to blocking for the confirmation dialog
self.terminal_ui.stdscr.timeout(-1)
confirm = self.terminal_ui.show_confirm_dialog(
"Exit Confirmation",
"Are you sure you want to exit? Any unsaved changes will be lost."
)
return not confirm # Return False to exit if confirmed
def _cycle_focus(self):
"""Cycle through the focus areas."""
focus_order = list(FocusArea)
current_idx = focus_order.index(self.focus)
# Skip Notes focus if it's not visible
if not self.terminal_ui.notes_visible:
# Create a filtered list without the NOTES enum
focus_order = [f for f in focus_order if f != FocusArea.NOTES]
# Find the next focus in our (potentially filtered) list
next_idx = (focus_order.index(self.focus) + 1) % len(focus_order)
self.focus = focus_order[next_idx]
# Update UI with new focus
self.terminal_ui.update_focus(self.focus)
# Force a complete UI redraw to fix rendering artifacts
self.terminal_ui._resize_layout()
def _handle_tasks_input(self, key):
"""Handle input while focused on the task list."""
if key == curses.KEY_UP:
self.terminal_ui.task_list_win.select_prev()
self._update_task_details()
elif key == curses.KEY_DOWN:
self.terminal_ui.task_list_win.select_next()
self._update_task_details()
elif key in (10, 13, curses.KEY_ENTER): # Enter (different codes)
# Toggle completion status when Enter is pressed
self._toggle_selected_task()
self._update_task_details()
elif key == ord(' '): # Space
# Toggle completion status when Space is pressed
self._toggle_selected_task()
self._update_task_details()
elif key == ord('n'): # New task
self._new_task()
elif key == ord('e'): # Edit task
self._edit_task()
elif key == ord('d'): # Delete task
self._delete_task()
return True
def _handle_details_input(self, key):
"""Handle input while focused on the task details."""
# There's not much to do in the details view except view
# Maybe implement scrolling for long descriptions later
return True
def _handle_notes_input(self, key):
"""Handle input while focused on the notes."""
if key == ord('e'): # Edit notes
self.terminal_ui.notes_win.toggle_edit_mode()
return True
return True
def _handle_plan_input(self, key):
"""Handle input while focused on the project plan."""
if key == curses.KEY_UP:
self.terminal_ui.plan_win.select_prev()
elif key == curses.KEY_DOWN:
self.terminal_ui.plan_win.select_next()
elif key in (10, 13, curses.KEY_ENTER): # Enter (different codes)
# Toggle completion when Enter is pressed
self._toggle_plan_step()
elif key == ord(' '): # Toggle completion with Space
self._toggle_plan_step()
elif key == ord('d'): # Toggle details view
self.terminal_ui.plan_win.toggle_details()
elif key == ord('n'): # New plan step
self._new_plan_step()
elif key == ord('e'): # Edit plan step
self._edit_plan_step()
elif key == ord('D'): # Delete plan step (capital D to avoid conflict with details)
self._delete_plan_step()
return True
def _update_task_details(self):
"""Update the task details window with the selected task."""
task = self.terminal_ui.task_list_win.get_selected_task()
self.terminal_ui.task_detail_win.set_task(task)
def _new_task(self):
"""Create a new task."""
prompts = ["Title", "Description", "Priority (1-3)"]
values = self.terminal_ui.show_input_dialog("New Task", prompts)
if values:
title, description, priority_str = values
# Validate priority
try:
priority = int(priority_str) if priority_str else 1
if priority < 1 or priority > 3:
priority = 1
except ValueError:
priority = 1
# Add the task
task = self.terminal_ui.api.add_task(title, description, priority)
# Refresh task list
self.terminal_ui.refresh_tasks()
# Find and select the new task
tasks = self.terminal_ui.api.get_all_tasks()
for i, t in enumerate(tasks):
if t["id"] == task["id"]:
self.terminal_ui.task_list_win.selected_index = i
self.terminal_ui.task_list_win.adjust_selection()
self.terminal_ui.task_list_win.refresh_content()
self._update_task_details()
break
def _edit_task(self):
"""Edit the selected task."""
task = self.terminal_ui.task_list_win.get_selected_task()
if not task:
return
# Set up the edit dialog with current values
prompts = ["Title", "Description", "Priority (1-3)", "Status"]
values = [
task["title"],
task["description"],
str(task["priority"]),
task["status"]
]
new_values = self.terminal_ui.show_input_dialog("Edit Task", prompts, values)
if new_values:
title, description, priority_str, status = new_values
# Validate priority
try:
priority = int(priority_str) if priority_str else task["priority"]
if priority < 1 or priority > 3:
priority = task["priority"]
except ValueError:
priority = task["priority"]
# Validate status
valid_statuses = ["not_started", "in_progress", "completed"]
if status not in valid_statuses:
status = task["status"]
# Update the task
self.terminal_ui.api.update_task(
task["id"],
title=title,
description=description,
priority=priority,
status=status
)
# Refresh task list and details
self.terminal_ui.refresh_tasks()
self._update_task_details()
def _delete_task(self):
"""Delete the selected task."""
task = self.terminal_ui.task_list_win.get_selected_task()
if not task:
return
confirm = self.terminal_ui.show_confirm_dialog(
"Delete Task",
f"Are you sure you want to delete the task '{task['title']}'?"
)
if confirm:
# Delete the task
self.terminal_ui.api.delete_task(task["id"])
# Refresh task list
self.terminal_ui.refresh_tasks()
self._update_task_details()
def _new_plan_step(self):
"""Create a new plan step."""
prompts = ["Name", "Description", "Details"]
values = self.terminal_ui.show_input_dialog("New Plan Step", prompts)
try:
if values and values[0]: # At least the name should be provided
# Add the plan step
name = values[0]
description = values[1] if len(values) > 1 else ""
details = values[2] if len(values) > 2 else ""
step = self.terminal_ui.api.add_plan_step(
name=name,
description=description,
details=details
)
# Refresh plan
self.terminal_ui.refresh_plan()
# Only try to find and select the new step if it was successfully created
if step and isinstance(step, dict) and "id" in step:
steps = self.terminal_ui.api.get_all_plan_steps()
for i, s in enumerate(steps):
if s["id"] == step["id"]:
self.terminal_ui.plan_win.selected_index = i
self.terminal_ui.plan_win.adjust_selection()
self.terminal_ui.plan_win.refresh_content()
break
except Exception as e:
self.terminal_ui.show_message(f"Error creating plan step: {str(e)}")
def _edit_plan_step(self):
"""Edit the selected plan step."""
step = self.terminal_ui.plan_win.get_selected_step()
if not step:
return
# Set up the edit dialog with current values
prompts = ["Name", "Description", "Details", "Order"]
values = [
step.get("name", step.get("description", "")),
step.get("description", ""),
step.get("details", ""),
str(step.get("order", 0))
]
new_values = self.terminal_ui.show_input_dialog("Edit Plan Step", prompts, values)
if new_values:
# Extract and validate values
name = new_values[0] if len(new_values) > 0 else ""
description = new_values[1] if len(new_values) > 1 else ""
details = new_values[2] if len(new_values) > 2 else ""
order_str = new_values[3] if len(new_values) > 3 else ""
# Validate order
try:
order = int(order_str) if order_str else step.get("order", 0)
if order < 0:
order = step.get("order", 0)
except ValueError:
order = step.get("order", 0)
# Update the plan step
self.terminal_ui.api.update_plan_step(
step["id"],
name=name,
description=description,
details=details,
order=order
)
# Refresh plan
self.terminal_ui.refresh_plan()
def _delete_plan_step(self):
"""Delete the selected plan step."""
step = self.terminal_ui.plan_win.get_selected_step()
if not step:
return
confirm = self.terminal_ui.show_confirm_dialog(
"Delete Plan Step",
f"Are you sure you want to delete the plan step '{step['description']}'?"
)
if confirm:
# Delete the plan step
self.terminal_ui.api.delete_plan_step(step["id"])
# Refresh plan
self.terminal_ui.refresh_plan()
def _toggle_selected_task(self):
"""Cycle through task statuses (not_started -> in_progress -> completed -> not_started)."""
task = self.terminal_ui.task_list_win.get_selected_task()
if not task:
return
# Cycle through the statuses
current_status = task.get("status", "not_started")
# If it's an old "pending" status, treat it as "not_started"
if current_status == "pending":
current_status = "not_started"
status_cycle = {
"not_started": "in_progress",
"in_progress": "completed",
"completed": "not_started"
}
new_status = status_cycle.get(current_status, "not_started")
# Update the task
self.terminal_ui.api.update_task(
task["id"],
status=new_status
)
# Refresh task list
self.terminal_ui.refresh_tasks()
def _toggle_plan_step(self):
"""Toggle the completion status of the selected plan step."""
step = self.terminal_ui.plan_win.get_selected_step()
if not step:
return
# Toggle the plan step
self.terminal_ui.api.toggle_plan_step(step["id"])
# Refresh plan
self.terminal_ui.refresh_plan()
```
--------------------------------------------------------------------------------
/mcp_server_fixed.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
MCP-compatible server for the Terminal Task Tracker
This server exposes the task tracker functionality through the Model Context Protocol (MCP).
"""
import json
import os
import logging
from typing import Dict, List, Optional, Any, Union, AsyncIterator
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from mcp.server.fastmcp import FastMCP, Context, Image
from app.core.task_manager import TaskManager
from app.core.plan_manager import PlanManager
from app.api.api import TaskTrackerAPI
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize data files
home_dir = os.path.expanduser("~")
data_dir = os.path.join(home_dir, ".tasktracker")
os.makedirs(data_dir, exist_ok=True)
task_file = os.path.join(data_dir, "tasks.json")
plan_file = os.path.join(data_dir, "plan.json")
notes_file = os.path.join(data_dir, "notes.txt")
# Global variable for API access from resources without URI parameters
global_api = None
# Set up lifespan context manager for the MCP server
@asynccontextmanager
async def lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""
Initialize and manage the task and plan managers for the server's lifespan.
This ensures that we have a single consistent instance of the managers
throughout the server's lifecycle.
"""
logger.info(f"Starting TaskTracker server with data directory: {data_dir}")
# Initialize managers with explicit file paths
task_manager = TaskManager(task_file, notes_file)
plan_manager = PlanManager(plan_file)
# Create API
api = TaskTrackerAPI(task_manager, plan_manager)
# Set global API for resources without URI parameters
global global_api
global_api = api
try:
# Yield the API instance to the server
yield {"api": api}
finally:
# Ensure all data is saved on shutdown
logger.info("Shutting down TaskTracker server, saving all data")
api.save_all()
# Create an MCP server with the lifespan manager
mcp = FastMCP("TaskTracker", lifespan=lifespan)
# === Resources ===
@mcp.resource("tasks://all")
def get_all_tasks() -> str:
"""Get all tasks in the system as JSON."""
# Reload data from files first to ensure we have latest changes
global_api.reload_all()
tasks = global_api.get_all_tasks()
return json.dumps(tasks, indent=2)
@mcp.resource("tasks://{task_id}")
def get_task(task_id: str) -> str:
"""Get a specific task by ID."""
# Reload data from files first to ensure we have latest changes
global_api.reload_all()
task = global_api.get_task(task_id)
if task:
return json.dumps(task, indent=2)
return "Task not found"
@mcp.resource("plan://all")
def get_all_plan_steps() -> str:
"""Get all plan steps in the system as JSON."""
# Reload data from files first to ensure we have latest changes
global_api.reload_all()
steps = global_api.get_all_plan_steps()
return json.dumps(steps, indent=2)
@mcp.resource("plan://{step_id}")
def get_plan_step(step_id: str) -> str:
"""Get a specific plan step by ID."""
# Reload data from files first to ensure we have latest changes
global_api.reload_all()
step = global_api.get_plan_step(step_id)
if step:
return json.dumps(step, indent=2)
return "Plan step not found"
@mcp.resource("notes://all")
def get_notes() -> str:
"""Get all notes in the system."""
# Reload data from files first to ensure we have latest changes
global_api.reload_all()
return global_api.get_notes()
# === Tools ===
@mcp.tool()
def get_all_tasks_tool(ctx: Context) -> List[Dict[str, Any]]:
"""
Get all tasks in the system.
Returns:
List of all tasks
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
return api.get_all_tasks()
@mcp.tool()
def get_task_tool(task_id: str, ctx: Context) -> Dict[str, Any]:
"""
Get a specific task by ID.
Args:
task_id: The ID of the task to retrieve
Returns:
The task or an error message if not found
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
task = api.get_task(task_id)
if task:
return task
return {"error": "Task not found"}
@mcp.tool()
def get_all_plan_steps_tool(ctx: Context) -> List[Dict[str, Any]]:
"""
Get all plan steps in the system.
Returns:
List of all plan steps
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
return api.get_all_plan_steps()
@mcp.tool()
def get_plan_step_tool(step_id: str, ctx: Context) -> Dict[str, Any]:
"""
Get a specific plan step by ID.
Args:
step_id: The ID of the plan step to retrieve
Returns:
The plan step or an error message if not found
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
step = api.get_plan_step(step_id)
if step:
return step
return {"error": "Plan step not found"}
@mcp.tool()
def get_notes_tool(ctx: Context) -> str:
"""
Get all notes in the system.
Returns:
The notes text
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
return api.get_notes()
@mcp.tool()
def add_task(title: str, ctx: Context, description: str = "", priority: int = 1,
status: str = "not_started") -> Dict[str, Any]:
"""
Add a new task to the system.
Args:
title: The title of the task
ctx: The MCP context object
description: A detailed description of the task
priority: Priority level (1-3, with 1 being highest)
status: Current status (not_started, in_progress, completed)
Returns:
The newly created task
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
task = api.add_task(title, description, priority, status)
api.save_all()
logger.info(f"Added task: {title} (ID: {task['id']})")
return task
@mcp.tool()
def update_task(task_id: str, ctx: Context, title: Optional[str] = None,
description: Optional[str] = None, priority: Optional[int] = None,
status: Optional[str] = None) -> Dict[str, Any]:
"""
Update an existing task.
Args:
task_id: The ID of the task to update
title: New title (optional)
description: New description (optional)
priority: New priority (optional)
status: New status (optional)
Returns:
The updated task or None if task not found
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
kwargs = {}
if title is not None:
kwargs["title"] = title
if description is not None:
kwargs["description"] = description
if priority is not None:
kwargs["priority"] = priority
if status is not None:
kwargs["status"] = status
task = api.update_task(task_id, **kwargs)
if task:
api.save_all()
logger.info(f"Updated task ID: {task_id}")
else:
logger.warning(f"Failed to update task: {task_id} - Not found")
return task or {"error": "Task not found"}
@mcp.tool()
def delete_task(task_id: str, ctx: Context) -> Dict[str, Any]:
"""
Delete a task.
Args:
task_id: The ID of the task to delete
Returns:
Success or failure message
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
result = api.delete_task(task_id)
if result:
api.save_all()
logger.info(f"Deleted task ID: {task_id}")
return {"success": True, "message": "Task deleted successfully"}
logger.warning(f"Failed to delete task: {task_id} - Not found")
return {"success": False, "message": "Task not found"}
@mcp.tool()
def add_plan_step(name: str, ctx: Context, description: str = "", details: str = "",
order: Optional[int] = None, completed: bool = False) -> Dict[str, Any]:
"""
Add a new plan step.
Args:
name: The name of the plan step
description: A brief description
details: Detailed information about the step
order: Position in the plan (optional)
completed: Whether the step is completed
Returns:
The newly created plan step
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
step = api.add_plan_step(name, description, details, order, completed)
api.save_all()
logger.info(f"Added plan step: {name} (ID: {step['id']})")
return step
@mcp.tool()
def update_plan_step(step_id: str, ctx: Context, name: Optional[str] = None,
description: Optional[str] = None, details: Optional[str] = None,
order: Optional[int] = None, completed: Optional[bool] = None) -> Dict[str, Any]:
"""
Update an existing plan step.
Args:
step_id: The ID of the step to update
name: New name (optional)
description: New description (optional)
details: New details (optional)
order: New order (optional)
completed: New completion status (optional)
Returns:
The updated plan step or None if not found
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
kwargs = {}
if name is not None:
kwargs["name"] = name
if description is not None:
kwargs["description"] = description
if details is not None:
kwargs["details"] = details
if order is not None:
kwargs["order"] = order
if completed is not None:
kwargs["completed"] = completed
step = api.update_plan_step(step_id, **kwargs)
if step:
api.save_all()
logger.info(f"Updated plan step ID: {step_id}")
else:
logger.warning(f"Failed to update plan step: {step_id} - Not found")
return step or {"error": "Plan step not found"}
@mcp.tool()
def delete_plan_step(step_id: str, ctx: Context) -> Dict[str, Any]:
"""
Delete a plan step.
Args:
step_id: The ID of the step to delete
Returns:
Success or failure message
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
result = api.delete_plan_step(step_id)
if result:
api.save_all()
logger.info(f"Deleted plan step ID: {step_id}")
return {"success": True, "message": "Plan step deleted successfully"}
logger.warning(f"Failed to delete plan step: {step_id} - Not found")
return {"success": False, "message": "Plan step not found"}
@mcp.tool()
def toggle_plan_step(step_id: str, ctx: Context) -> Dict[str, Any]:
"""
Toggle the completion status of a plan step.
Args:
step_id: The ID of the step to toggle
Returns:
The updated plan step or None if not found
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
step = api.toggle_plan_step(step_id)
if step:
api.save_all()
logger.info(f"Toggled completion status of plan step ID: {step_id} to {step['completed']}")
else:
logger.warning(f"Failed to toggle plan step: {step_id} - Not found")
return step or {"error": "Plan step not found"}
@mcp.tool()
def save_notes(notes_text: str, ctx: Context) -> Dict[str, Any]:
"""
Save notes to the system.
Args:
notes_text: The notes text to save
Returns:
Success message
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
api.save_notes(notes_text)
api.save_all()
logger.info("Notes saved")
return {"success": True, "message": "Notes saved successfully"}
@mcp.tool()
def export_data(file_path: str, ctx: Context) -> Dict[str, Any]:
"""
Export all data to a JSON file.
Args:
file_path: Path to save the exported data
Returns:
Success or failure message
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
try:
api.export_data(file_path)
logger.info(f"Data exported to {file_path}")
return {"success": True, "message": f"Data exported to {file_path}"}
except Exception as e:
logger.error(f"Export failed: {str(e)}")
return {"success": False, "message": f"Export failed: {str(e)}"}
@mcp.tool()
def import_data(file_path: str, ctx: Context) -> Dict[str, Any]:
"""
Import data from a JSON file.
Args:
file_path: Path to the file containing the data to import
Returns:
Success or failure message
"""
api = ctx.request_context.lifespan_context["api"]
# Reload data from files first to ensure we have latest changes
api.reload_all()
try:
result = api.import_data(file_path)
if result:
logger.info(f"Data imported from {file_path}")
return {"success": True, "message": "Data imported successfully"}
logger.warning(f"Import failed from {file_path}")
return {"success": False, "message": "Import failed"}
except Exception as e:
logger.error(f"Import failed: {str(e)}")
return {"success": False, "message": f"Import failed: {str(e)}"}
# === Prompts ===
@mcp.prompt()
def add_task_prompt(title: str = "", description: str = "") -> str:
"""Create a prompt to add a new task."""
return f"""Please add a new task with the following details:
Title: {title}
Description: {description}
Please provide any missing information and set the priority and status.
"""
@mcp.prompt()
def create_plan_prompt() -> str:
"""Create a prompt to help create a new project plan."""
return """I need to create a new project plan. Please help me break down this project into clear steps.
For each step, I need:
1. A clear name
2. A brief description
3. Any detailed information needed to complete the step
4. The logical order of the steps
Please ask me about my project goals so you can help create an appropriate plan.
"""
# Define a main function for entry point
def main():
"""Run the MCP server."""
mcp.run()
# Run the server if executed directly
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/app/ui/ui_components.py:
--------------------------------------------------------------------------------
```python
import curses
class Window:
def __init__(self, stdscr, height, width, y, x, title=""):
"""Initialize a window with a border and optional title."""
self.win = stdscr.subwin(height, width, y, x)
self.height = height
self.width = width
self.title = title
self.win.box()
self.set_title(title)
self.content_window = self.win.derwin(height - 2, width - 2, 1, 1)
self.selected_index = 0
self.scroll_offset = 0
self.max_visible_items = height - 2
def set_title(self, title):
"""Set the window title."""
self.title = title
if title:
title_str = f" {title} "
x = max(1, (self.width - len(title_str)) // 2)
try:
self.win.addstr(0, x, title_str)
except curses.error:
# Fall back to a simpler title if there's an error
try:
self.win.addstr(0, 1, "Window")
except:
pass
def clear(self):
"""Clear the content window."""
self.content_window.clear()
def refresh(self):
"""Refresh the window and its content."""
self.win.box()
self.set_title(self.title)
self.win.refresh()
self.content_window.refresh()
def get_content_dimensions(self):
"""Get the usable dimensions of the content window."""
return self.height - 2, self.width - 2
def resize(self, height, width, y, x):
"""Resize and move the window."""
self.height = height
self.width = width
self.win.resize(height, width)
self.win.mvwin(y, x)
self.content_window = self.win.derwin(height - 2, width - 2, 1, 1)
self.max_visible_items = height - 2
self.refresh()
def display_message(self, message):
"""Display a message in the content window."""
self.clear()
self.content_window.addstr(0, 0, message)
self.refresh()
class TaskListWindow(Window):
def __init__(self, stdscr, height, width, y, x, title="Tasks"):
"""Initialize a task list window."""
super().__init__(stdscr, height, width, y, x, title)
self.tasks = []
def set_tasks(self, tasks):
"""Set the tasks to display."""
self.tasks = tasks
self.adjust_selection()
self.refresh_content()
def adjust_selection(self):
"""Adjust selection index and scroll offset to valid values."""
if not self.tasks:
self.selected_index = 0
self.scroll_offset = 0
return
# Ensure selected_index is in valid range
if self.selected_index >= len(self.tasks):
self.selected_index = len(self.tasks) - 1
# Adjust scroll offset to keep selected item visible
if self.selected_index < self.scroll_offset:
self.scroll_offset = self.selected_index
elif self.selected_index >= self.scroll_offset + self.max_visible_items:
self.scroll_offset = self.selected_index - self.max_visible_items + 1
def refresh_content(self):
"""Refresh the task list content."""
self.clear()
content_height, content_width = self.get_content_dimensions()
if not self.tasks:
self.content_window.addstr(0, 0, "No tasks")
self.refresh()
return
# Display visible tasks
for i in range(min(self.max_visible_items, len(self.tasks))):
idx = i + self.scroll_offset
if idx >= len(self.tasks):
break
task = self.tasks[idx]
# Highlight selected task
if idx == self.selected_index:
self.content_window.attron(curses.A_REVERSE)
# Format priority indicator
priority_markers = ["!", "!!", "!!!"]
priority_str = priority_markers[task['priority'] - 1] if 1 <= task['priority'] <= 3 else ""
# Format status with icons (using ASCII only to avoid Unicode display issues)
status_map = {
"not_started": "[ ]", # Empty square
"in_progress": "[>]", # Right arrow (in progress) - ASCII version
"completed": "[x]", # Checkmark - ASCII version
# For backward compatibility
"pending": "[ ]"
}
status_str = status_map.get(task['status'], "[ ]")
# Truncate title if needed
max_title_width = content_width - len(priority_str) - len(status_str) - 2
title = task['title']
if len(title) > max_title_width:
title = title[:max_title_width-3] + "..."
# Display task line with error handling
task_str = f"{status_str} {title} {priority_str}"
try:
self.content_window.addstr(i, 0, task_str)
except curses.error:
# Handle display errors gracefully
try:
# Try with a simpler string
self.content_window.addstr(i, 0, f"Task {i+1}")
except:
pass
if idx == self.selected_index:
self.content_window.attroff(curses.A_REVERSE)
self.refresh()
def select_next(self):
"""Select the next task if available."""
if self.tasks and self.selected_index < len(self.tasks) - 1:
self.selected_index += 1
self.adjust_selection()
self.refresh_content()
def select_prev(self):
"""Select the previous task if available."""
if self.tasks and self.selected_index > 0:
self.selected_index -= 1
self.adjust_selection()
self.refresh_content()
def get_selected_task(self):
"""Get the currently selected task."""
if self.tasks and 0 <= self.selected_index < len(self.tasks):
return self.tasks[self.selected_index]
return None
class TaskDetailWindow(Window):
def __init__(self, stdscr, height, width, y, x, title="Task Details"):
"""Initialize a task detail window."""
super().__init__(stdscr, height, width, y, x, title)
self.task = None
def set_task(self, task):
"""Set the task to display details for."""
self.task = task
self.refresh_content()
def refresh_content(self):
"""Refresh the task detail content."""
self.clear()
if not self.task:
self.content_window.addstr(0, 0, "No task selected")
self.refresh()
return
# Display task properties
content_height, content_width = self.get_content_dimensions()
# Map priority and status to more readable forms
priority_map = {1: "Low", 2: "Medium", 3: "High"}
status_map = {
"not_started": "Not Started",
"in_progress": "In Progress",
"completed": "Completed",
# For backward compatibility
"pending": "Not Started"
}
priority = priority_map.get(self.task['priority'], "Unknown")
status = status_map.get(self.task['status'], "Unknown")
# Display task details
y = 0
self.content_window.addstr(y, 0, f"Title: {self.task['title']}")
y += 2
self.content_window.addstr(y, 0, f"Status: {status}")
y += 1
self.content_window.addstr(y, 0, f"Priority: {priority}")
y += 2
# Display description with word wrapping
self.content_window.addstr(y, 0, "Description:")
y += 1
description = self.task['description'] or "No description provided."
words = description.split()
if words:
line = ""
for word in words:
# Check if adding this word would exceed width
if len(line) + len(word) + 1 > content_width:
self.content_window.addstr(y, 0, line)
y += 1
line = word
else:
if line:
line += " " + word
else:
line = word
# Add the last line if it has content
if line:
self.content_window.addstr(y, 0, line)
y += 1
# Display created/updated timestamps
y += 1
created = self.task.get('created_at', '').split('T')[0]
updated = self.task.get('updated_at', '').split('T')[0]
if created:
self.content_window.addstr(y, 0, f"Created: {created}")
y += 1
if updated and updated != created:
self.content_window.addstr(y, 0, f"Updated: {updated}")
self.refresh()
class NotesWindow(Window):
def __init__(self, stdscr, height, width, y, x, title="Notes"):
"""Initialize a notes window."""
super().__init__(stdscr, height, width, y, x, title)
self.notes = ""
self.edit_mode = False
self.cursor_pos = 0
self.scroll_offset = 0
# Enable keypad for special key handling
self.content_window.keypad(True)
def set_notes(self, notes):
"""Set the notes to display."""
self.notes = notes if notes else ""
self.refresh_content()
def get_notes(self):
"""Get the current notes."""
return self.notes
def toggle_edit_mode(self):
"""Toggle between view and edit mode."""
self.edit_mode = not self.edit_mode
if self.edit_mode:
curses.curs_set(1) # Show cursor
else:
curses.curs_set(0) # Hide cursor
self.refresh_content()
return self.edit_mode
def handle_key(self, key):
"""Handle keyboard input in edit mode."""
if not self.edit_mode:
return False
try:
# Simple key handling - safer approach
if key in (10, 13, curses.KEY_ENTER): # Enter
# Add a newline at end for simplicity
self.notes += "\n"
elif key in (curses.KEY_BACKSPACE, 127, 8): # Backspace
# Remove last character if there are any
if len(self.notes) > 0:
self.notes = self.notes[:-1]
elif 32 <= key <= 126: # Printable ASCII characters
# Add character to the end
self.notes += chr(key)
# Refresh after any change
self.refresh_content()
return True
except Exception as e:
# Log error by adding to notes
self.notes += f"\nError: {str(e)}\n"
self.refresh_content()
return True
def adjust_scroll(self):
"""Adjust scroll offset to keep cursor visible."""
content_height, content_width = self.get_content_dimensions()
# Count lines up to cursor
lines_to_cursor = self.notes[:self.cursor_pos].count('\n')
# Adjust scroll if cursor is off screen
if lines_to_cursor < self.scroll_offset:
self.scroll_offset = lines_to_cursor
elif lines_to_cursor >= self.scroll_offset + content_height:
self.scroll_offset = lines_to_cursor - content_height + 1
def refresh_content(self):
"""Refresh the notes content."""
try:
self.clear()
content_height, content_width = self.get_content_dimensions()
# Simplified content display
if not self.notes:
if self.edit_mode:
self.content_window.addstr(0, 0, "Type to add notes...")
else:
self.content_window.addstr(0, 0, "No notes. Press 'e' to edit.")
else:
# Just display the most recent part of notes (last few lines)
lines = self.notes.split('\n')
# Display only what fits in the window
max_lines = min(content_height - 1, len(lines))
start_line = max(0, len(lines) - max_lines)
for i in range(max_lines):
line_idx = start_line + i
if line_idx < len(lines):
# Truncate line if needed
display_line = lines[line_idx]
if len(display_line) > content_width - 1:
display_line = display_line[:content_width - 1]
self.content_window.addstr(i, 0, display_line)
# Add help text at bottom
if self.edit_mode and content_height > 1:
help_text = "Esc: Save & exit edit mode"
if len(help_text) > content_width - 1:
help_text = help_text[:content_width - 1]
self.content_window.addstr(content_height - 1, 0, help_text)
# In edit mode, position cursor at the end of content
if self.edit_mode:
# Count displayed lines to find end position
line_count = min(max_lines if 'max_lines' in locals() else 0, content_height - 1)
if line_count > 0:
self.content_window.move(line_count - 1, 0)
else:
self.content_window.move(0, 0)
self.refresh()
except Exception as e:
# If there's an error, try a minimal refresh
try:
self.clear()
self.content_window.addstr(0, 0, "Notes")
self.refresh()
except:
pass
class PlanWindow(Window):
def __init__(self, stdscr, height, width, y, x, title="Project Plan"):
"""Initialize a project plan window."""
super().__init__(stdscr, height, width, y, x, title)
self.steps = []
self.selected_index = 0
self.scroll_offset = 0
self.show_details = False # Flag to control if details are shown
def set_steps(self, steps):
"""Set the plan steps to display."""
self.steps = steps
self.adjust_selection()
self.refresh_content()
def adjust_selection(self):
"""Adjust selection index and scroll offset to valid values."""
if not self.steps:
self.selected_index = 0
self.scroll_offset = 0
return
# Ensure selected_index is in valid range
if self.selected_index < 0:
self.selected_index = 0
elif self.selected_index >= len(self.steps):
self.selected_index = max(0, len(self.steps) - 1)
# Adjust scroll offset to keep selected item visible
if self.selected_index < self.scroll_offset:
self.scroll_offset = self.selected_index
elif self.selected_index >= self.scroll_offset + self.max_visible_items:
self.scroll_offset = max(0, self.selected_index - self.max_visible_items + 1)
def refresh_content(self):
"""Refresh the plan content."""
self.clear()
content_height, content_width = self.get_content_dimensions()
if not self.steps:
self.content_window.addstr(0, 0, "No plan steps")
self.refresh()
return
selected_step = self.get_selected_step()
# If showing details for the selected step
if self.show_details and selected_step:
self._display_step_details(selected_step, content_height, content_width)
return
# Otherwise display the list of steps
list_height = min(content_height, len(self.steps))
# Display visible steps
for i in range(min(self.max_visible_items, len(self.steps))):
idx = i + self.scroll_offset
if idx >= len(self.steps):
break
try:
step = self.steps[idx]
# Highlight selected step
if idx == self.selected_index:
self.content_window.attron(curses.A_REVERSE)
# Format step with order and completion status
completion_status = "[x]" if step['completed'] else "[ ]"
# Get name or fallback to description for backward compatibility
name = step.get('name', step.get('description', 'Unnamed step'))
# Truncate name if needed
max_name_width = content_width - 10
if len(name) > max_name_width:
name = name[:max_name_width-3] + "..."
# Display step line with safe index access
order = step.get('order', 0)
step_str = f"{order + 1:2d}. {completion_status} {name}"
try:
self.content_window.addstr(i, 0, step_str)
except curses.error:
# Handle display errors gracefully
try:
# Try with a simpler string
self.content_window.addstr(i, 0, f"Step {order + 1}")
except:
pass
if idx == self.selected_index:
self.content_window.attroff(curses.A_REVERSE)
except (IndexError, KeyError) as e:
# Handle any index errors gracefully
self.content_window.addstr(i, 0, f"Error displaying step: {str(e)}")
# Add a help line at the bottom if there's space
if content_height > list_height + 1:
help_text = "Enter/Space: Toggle completion | D: Show/hide details"
try:
self.content_window.addstr(content_height - 1, 0, help_text)
except curses.error:
pass # Skip help text if it doesn't fit
self.refresh()
def _display_step_details(self, step, height, width):
"""Display detailed information for a plan step."""
y = 0
# Display step name
name = step.get('name', 'Unnamed step')
self.content_window.addstr(y, 0, f"Name: {name}")
y += 2
# Display completion status
completed = "Completed" if step.get('completed', False) else "Not completed"
self.content_window.addstr(y, 0, f"Status: {completed}")
y += 2
# Display description
description = step.get('description', '')
if description:
self.content_window.addstr(y, 0, "Description:")
y += 1
# Word wrap description
words = description.split()
line = ""
for word in words:
if len(line) + len(word) + 1 > width:
self.content_window.addstr(y, 0, line)
y += 1
line = word
else:
if line:
line += " " + word
else:
line = word
if line:
self.content_window.addstr(y, 0, line)
y += 1
y += 1
# Display detailed information
details = step.get('details', '')
if details:
self.content_window.addstr(y, 0, "Details:")
y += 1
# Word wrap details
words = details.split()
line = ""
for word in words:
if len(line) + len(word) + 1 > width:
self.content_window.addstr(y, 0, line)
y += 1
line = word
else:
if line:
line += " " + word
else:
line = word
if line:
self.content_window.addstr(y, 0, line)
y += 1
# Add a help line at the bottom
help_text = "D: Return to plan list"
self.content_window.addstr(height - 1, 0, help_text)
self.refresh()
def select_next(self):
"""Select the next step if available."""
if self.steps and self.selected_index < len(self.steps) - 1:
self.selected_index += 1
self.adjust_selection()
self.refresh_content()
def select_prev(self):
"""Select the previous step if available."""
if self.steps and self.selected_index > 0:
self.selected_index -= 1
self.adjust_selection()
self.refresh_content()
def get_selected_step(self):
"""Get the currently selected plan step."""
try:
if self.steps and 0 <= self.selected_index < len(self.steps):
return self.steps[self.selected_index]
except (IndexError, KeyError):
pass
return None
def toggle_details(self):
"""Toggle between displaying the step list and the details of the selected step."""
if self.get_selected_step():
self.show_details = not self.show_details
self.refresh_content()
return True
return False
class InputDialog:
def __init__(self, stdscr, title, prompts, initial_values=None):
"""
Initialize an input dialog with multiple fields.
Args:
stdscr: The main curses window
title: Dialog title
prompts: List of field prompts
initial_values: List of initial values for fields (optional)
"""
self.stdscr = stdscr
self.title = title
self.prompts = prompts
# Initialize with empty values or provided initial values
if initial_values is None:
self.values = ["" for _ in range(len(prompts))]
else:
self.values = initial_values.copy()
# Dialog dimensions
screen_height, screen_width = stdscr.getmaxyx()
self.width = min(60, screen_width - 4)
self.height = len(prompts) * 2 + 4 # 2 lines per field + borders + buttons
# Center dialog
self.y = (screen_height - self.height) // 2
self.x = (screen_width - self.width) // 2
# Create window
self.win = stdscr.subwin(self.height, self.width, self.y, self.x)
self.win.keypad(True) # Enable keypad mode for special keys
self.current_field = 0
self.cursor_pos = len(self.values[0]) if self.values and self.values[0] else 0
def show(self):
"""Show the dialog and handle input."""
curses.curs_set(1) # Show cursor
# Enable special keys like backspace
self.win.keypad(True)
# Main input loop
while True:
self.draw()
key = self.win.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter (different codes)
curses.curs_set(0) # Hide cursor
return self.values
elif key == 27: # Escape
curses.curs_set(0) # Hide cursor
return None
elif key == curses.KEY_UP and self.current_field > 0:
self.current_field -= 1
self.cursor_pos = len(self.values[self.current_field])
elif key == curses.KEY_DOWN and self.current_field < len(self.prompts) - 1:
self.current_field += 1
self.cursor_pos = len(self.values[self.current_field])
elif key == 9: # Tab
self.current_field = (self.current_field + 1) % len(self.prompts)
self.cursor_pos = len(self.values[self.current_field])
elif key == curses.KEY_LEFT and self.cursor_pos > 0:
self.cursor_pos -= 1
elif key == curses.KEY_RIGHT and self.cursor_pos < len(self.values[self.current_field]):
self.cursor_pos += 1
elif key in (curses.KEY_BACKSPACE, 127, 8): # Different backspace codes
if self.cursor_pos > 0:
self.values[self.current_field] = (
self.values[self.current_field][:self.cursor_pos - 1] +
self.values[self.current_field][self.cursor_pos:]
)
self.cursor_pos -= 1
elif key == curses.KEY_DC: # Delete
if self.cursor_pos < len(self.values[self.current_field]):
self.values[self.current_field] = (
self.values[self.current_field][:self.cursor_pos] +
self.values[self.current_field][self.cursor_pos + 1:]
)
elif 32 <= key <= 126: # Printable characters
self.values[self.current_field] = (
self.values[self.current_field][:self.cursor_pos] +
chr(key) +
self.values[self.current_field][self.cursor_pos:]
)
self.cursor_pos += 1
def draw(self):
"""Draw the dialog box and input fields."""
self.win.clear()
self.win.box()
# Draw title
if self.title:
title_str = f" {self.title} "
x = max(1, (self.width - len(title_str)) // 2)
self.win.addstr(0, x, title_str)
# Draw input fields
for i, prompt in enumerate(self.prompts):
y = i * 2 + 1
self.win.addstr(y, 2, f"{prompt}:")
# Draw input field
field_x = 2
field_y = y + 1
field_width = self.width - 4
field_value = self.values[i]
# Draw input value
self.win.addstr(field_y, field_x, field_value)
# Draw cursor if this is the active field
if i == self.current_field:
self.win.move(field_y, field_x + self.cursor_pos)
# Draw instructions
self.win.addstr(self.height - 1, 2, "Enter: Save | Esc: Cancel")
self.win.refresh()
class ConfirmDialog:
def __init__(self, stdscr, title, message):
"""Initialize a confirmation dialog."""
self.stdscr = stdscr
self.title = title
self.message = message
# Dialog dimensions
screen_height, screen_width = stdscr.getmaxyx()
self.width = min(50, screen_width - 4)
# Calculate height based on message length
message_lines = (len(message) // (self.width - 4)) + 1
self.height = message_lines + 4 # Message + borders + buttons
# Center dialog
self.y = (screen_height - self.height) // 2
self.x = (screen_width - self.width) // 2
# Create window
self.win = stdscr.subwin(self.height, self.width, self.y, self.x)
self.selected = 0 # 0 = No, 1 = Yes
def show(self):
"""Show the dialog and handle input."""
# Enable keypad mode for special keys
self.win.keypad(True)
# Main input loop
while True:
self.draw()
key = self.win.getch()
if key == curses.KEY_ENTER or key == 10 or key == 13: # Enter (different codes)
return self.selected == 1 # Return True if "Yes" selected
elif key == 27: # Escape
return False
elif key == curses.KEY_LEFT or key == curses.KEY_RIGHT:
self.selected = 1 - self.selected # Toggle between 0 and 1
# Add handling for y/n keys
elif key in (ord('y'), ord('Y')):
return True
elif key in (ord('n'), ord('N')):
return False
def draw(self):
"""Draw the confirmation dialog."""
self.win.clear()
self.win.box()
# Draw title
if self.title:
title_str = f" {self.title} "
x = max(1, (self.width - len(title_str)) // 2)
self.win.addstr(0, x, title_str)
# Draw message
message_words = self.message.split()
line = ""
y = 1
for word in message_words:
if len(line) + len(word) + 1 <= self.width - 4:
if line:
line += " " + word
else:
line = word
else:
self.win.addstr(y, 2, line)
y += 1
line = word
if line:
self.win.addstr(y, 2, line)
# Draw buttons
button_y = self.height - 2
no_x = self.width // 3 - 2
yes_x = 2 * self.width // 3 - 2
if self.selected == 0:
self.win.attron(curses.A_REVERSE)
self.win.addstr(button_y, no_x, " No ")
if self.selected == 0:
self.win.attroff(curses.A_REVERSE)
if self.selected == 1:
self.win.attron(curses.A_REVERSE)
self.win.addstr(button_y, yes_x, " Yes ")
if self.selected == 1:
self.win.attroff(curses.A_REVERSE)
self.win.refresh()
```