# Directory Structure ``` ├── GCalendar │ ├── README.md │ ├── requirements.txt │ └── src │ ├── calendar_service.py │ ├── create_token.py │ ├── list_past_events.py │ ├── mcp_client.py │ ├── mcp_server.py │ └── renew_token.py ├── README.md ├── requirements.txt └── src ├── calendar_service.py ├── create_token.py ├── list_past_events.py ├── mcp_client.py ├── mcp_server.py └── renew_token.py ``` # Files -------------------------------------------------------------------------------- /GCalendar/README.md: -------------------------------------------------------------------------------- ```markdown # Google Calendar MCP Server - Python Installation Guide 📘 **Overview** A Model Context Protocol server that provides access to Google Calendar API with support for asynchronous operations. This implementation enables efficient calendar management through a standardized interface. 🔑 **Key Components** ### API Tools - list: Query calendar events (past 2 years to 1 year future) - create-event: Create new calendar entries - delete-duplicates: Remove duplicate events - delete-event: Delete specific calendar events ### Core Resources - Token Management System (credentials/*.json) - OAuth 2.0 authentication flow - Automatic token refresh handling - Runtime token discovery - Logging System (logs/*.log) - Operation logging with timestamps - Error tracking and debugging - Performance metrics collection 💡 **Implementation Steps** ### Prerequisites ```bash # Install required Python packages pip install -r requirements.txt ``` Dependencies: ```python google-auth-oauthlib==1.0.0 google-auth-httplib2==0.1.0 google-api-python-client==2.108.0 aiohttp==3.8.5 asyncio==3.4.3 ``` ### Initial Setup ```bash # Clone repository git clone https://github.com/yourusername/GCalendar.git cd GCalendar # Create required directories mkdir -p credentials logs ``` ### Authentication Setup 1. Create Google Cloud Console project 2. Enable Google Calendar API 3. Download credentials.json to credentials/ 4. Run create_token.py: ```bash python src/create_token.py ``` ⚙️ **Technical Configuration** ### Server Configuration Add to `claude_desktop_config.json`: ```json { "mcpServers": { "gcalendar": { "command": "YOUR_CONDA_PATH/envs/mcp-gcalendar/bin/python", "args": [ "YOUR_PATH/GCalendar/src/mcp_server.py" ] } } } ``` ### Path Configuration Replace placeholders: • YOUR_CONDA_PATH options: - M1/M2 Mac: `/opt/homebrew/Caskroom/miniforge/base` - Miniconda: `~/miniconda3` or `/opt/miniconda3` • YOUR_PATH: Full repository clone path ### Error Handling - Automatic token refresh on expiry - 60-second operation timeout - Comprehensive error logging - Auto-retry on transient errors ### Project Structure ``` GCalendar/ ├── credentials/ │ ├── credentials.json # From Google Cloud Console │ └── token.json # Generated by create_token.py ├── logs/ │ └── calendar_service.log ├── src/ │ ├── calendar_service.py # Core calendar operations │ ├── create_token.py # Token generation │ ├── list_past_events.py # Event listing utility │ ├── mcp_client.py # MCP client implementation │ ├── mcp_server.py # Main server implementation │ └── renew_token.py # Token renewal utility ├── requirements.txt └── README.md ``` ### System Requirements - Python 3.9 or higher - Google Calendar API enabled - Valid OAuth 2.0 credentials - Internet connectivity - Asia/Bangkok timezone ## License This project is licensed under the MIT License. See LICENSE file for details. ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` google-auth-oauthlib==1.0.0 google-auth-httplib2==0.1.0 google-api-python-client==2.108.0 aiohttp==3.8.5 asyncio==3.4.3 mcp ``` -------------------------------------------------------------------------------- /GCalendar/requirements.txt: -------------------------------------------------------------------------------- ``` google-auth-oauthlib==1.0.0 google-auth-httplib2==0.1.0 google-api-python-client==2.108.0 aiohttp==3.8.5 asyncio==3.4.3 tzdata==2023.3 # For timezone database ``` -------------------------------------------------------------------------------- /GCalendar/src/mcp_client.py: -------------------------------------------------------------------------------- ```python from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client import asyncio import logging import os # Set up logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) async def run(): # Create server parameters server_params = StdioServerParameters( command="python", args=[os.path.join(os.path.dirname(__file__), "mcp_server.py")] ) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize connection await session.initialize() logger.info("Initialization successful") # List calendar events using tool result = await session.call_tool("list", {}) if result and hasattr(result, "content"): for content_item in result.content: if hasattr(content_item, "text"): print("\nCalendar Events:\n") print(content_item.text) else: logger.warning("No events data received") except Exception as e: logger.error(f"Error: {e}", exc_info=True) if __name__ == "__main__": try: asyncio.run(run()) except KeyboardInterrupt: logger.info("Client shutdown requested") ``` -------------------------------------------------------------------------------- /src/mcp_client.py: -------------------------------------------------------------------------------- ```python from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client import asyncio import logging import os # Set up logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) async def run(): # Create server parameters server_params = StdioServerParameters( command="python", args=[os.path.join(os.path.dirname(__file__), "mcp_server.py")] ) try: async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize connection await session.initialize() logger.info("Initialization successful") # List calendar events using tool result = await session.call_tool("list", {}) if result and hasattr(result, "content"): for content_item in result.content: if hasattr(content_item, "text"): print("\nCalendar Events:\n") print(content_item.text) else: logger.warning("No events data received") except Exception as e: logger.error(f"Error: {e}", exc_info=True) if __name__ == "__main__": try: asyncio.run(run()) except KeyboardInterrupt: logger.info("Client shutdown requested") ``` -------------------------------------------------------------------------------- /GCalendar/src/mcp_server.py: -------------------------------------------------------------------------------- ```python from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types import logging import asyncio from calendar_service import CalendarService import os import signal from datetime import datetime # Set up logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Create a server instance server = Server("gcalendar-server") calendar_service = None # Signal handler for graceful shutdown def signal_handler(signum, frame): logger.info(f"Received signal {signum}") if calendar_service: calendar_service.close() raise KeyboardInterrupt signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @server.list_tools() async def handle_list_tools() -> list[types.Tool]: return [ types.Tool( name="list", description="List calendar events", arguments={}, inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="create-event", description="Create a new calendar event", arguments={}, inputSchema={ "type": "object", "properties": { "summary": { "type": "string", "description": "Event title" }, "start_time": { "type": "string", "description": "Start time in YYYY-MM-DDTHH:MM:SS format or date in YYYY-MM-DD format" }, "end_time": { "type": "string", "description": "End time (optional). If not provided, event will be 1 hour long" }, "description": { "type": "string", "description": "Event description (optional)" } }, "required": ["summary", "start_time"] } ), types.Tool( name="delete-duplicates", description="Delete duplicate events on a specific date", arguments={}, inputSchema={ "type": "object", "properties": { "target_date": { "type": "string", "description": "Target date in YYYY-MM-DD format" }, "event_summary": { "type": "string", "description": "Event title to match" } }, "required": ["target_date", "event_summary"] } ), types.Tool( name="delete-event", description="Delete a single calendar event", arguments={}, inputSchema={ "type": "object", "properties": { "event_time": { "type": "string", "description": "Event time from list output" }, "event_summary": { "type": "string", "description": "Event title to match" } }, "required": ["event_time", "event_summary"] } ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: try: if name == "list": formatted_text = await asyncio.wait_for( calendar_service.list_events(), timeout=60 ) return [types.TextContent( type="text", text=formatted_text )] elif name == "create-event": if not arguments.get("summary"): return [types.TextContent( type="text", text="Error: Event title (summary) is required" )] if not arguments.get("start_time"): return [types.TextContent( type="text", text="Error: Start time is required" )] result = await asyncio.wait_for( calendar_service.create_event( summary=arguments["summary"], start_time=arguments["start_time"], end_time=arguments.get("end_time"), description=arguments.get("description") ), timeout=60 ) return [types.TextContent( type="text", text=result )] elif name == "delete-duplicates": if not arguments.get("target_date"): return [types.TextContent( type="text", text="Error: Target date is required" )] if not arguments.get("event_summary"): return [types.TextContent( type="text", text="Error: Event title is required" )] result = await asyncio.wait_for( calendar_service.delete_duplicate_events( target_date=arguments["target_date"], event_summary=arguments["event_summary"] ), timeout=60 ) return [types.TextContent( type="text", text=result )] elif name == "delete-event": if not arguments.get("event_time"): return [types.TextContent( type="text", text="Error: Event time is required" )] if not arguments.get("event_summary"): return [types.TextContent( type="text", text="Error: Event title is required" )] result = await asyncio.wait_for( calendar_service.delete_single_event( event_time=arguments["event_time"], event_summary=arguments["event_summary"] ), timeout=60 ) return [types.TextContent( type="text", text=result )] else: return [types.TextContent( type="text", text=f"Unknown tool: {name}" )] except asyncio.TimeoutError: error_msg = f"Operation timed out while executing {name}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except Exception as e: error_msg = f"Error executing {name}: {str(e)}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] @server.list_resources() async def handle_list_resources() -> list[types.Resource]: return [] @server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: return [] async def run(): global calendar_service try: # Initialize calendar service logger.info("Starting Calendar Server...") script_dir = os.path.dirname(os.path.abspath(__file__)) calendar_service = CalendarService( credentials_path=os.path.join(script_dir, "..", "credentials", "credentials.json"), token_path=os.path.join(script_dir, "..", "credentials", "token.json") ) # Authentication with longer timeout try: await asyncio.wait_for(calendar_service.authenticate(), timeout=60) logger.info("Authentication successful") except asyncio.TimeoutError: logger.error("Authentication timed out") raise # Run the server logger.info("Calendar Server is ready") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="gcalendar", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ) ) ) except ConnectionError as e: logger.error(f"Connection error: {str(e)}") raise except IOError as e: logger.error(f"IO error: {str(e)}") raise except Exception as e: logger.error(f"Error in run: {str(e)}") raise finally: if calendar_service: logger.info("Closing calendar service...") calendar_service.close() async def main(): try: await run() except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: if calendar_service: calendar_service.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass # Handle graceful shutdown ``` -------------------------------------------------------------------------------- /src/mcp_server.py: -------------------------------------------------------------------------------- ```python from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types import logging import asyncio from calendar_service import CalendarService import os import signal from datetime import datetime # Set up logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Create a server instance server = Server("gcalendar-server") calendar_service = None # Signal handler for graceful shutdown def signal_handler(signum, frame): logger.info(f"Received signal {signum}") if calendar_service: calendar_service.close() raise KeyboardInterrupt signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @server.list_tools() async def handle_list_tools() -> list[types.Tool]: return [ types.Tool( name="list", description="List calendar events", arguments={}, inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="create-event", description="Create a new calendar event", arguments={}, inputSchema={ "type": "object", "properties": { "summary": { "type": "string", "description": "Event title" }, "start_time": { "type": "string", "description": "Start time in YYYY-MM-DDTHH:MM:SS format or date in YYYY-MM-DD format" }, "end_time": { "type": "string", "description": "End time (optional). If not provided, event will be 1 hour long" }, "description": { "type": "string", "description": "Event description (optional)" } }, "required": ["summary", "start_time"] } ), types.Tool( name="delete-duplicates", description="Delete duplicate events on a specific date", arguments={}, inputSchema={ "type": "object", "properties": { "target_date": { "type": "string", "description": "Target date in YYYY-MM-DD format" }, "event_summary": { "type": "string", "description": "Event title to match" } }, "required": ["target_date", "event_summary"] } ), types.Tool( name="delete-event", description="Delete a single calendar event", arguments={}, inputSchema={ "type": "object", "properties": { "event_time": { "type": "string", "description": "Event time from list output" }, "event_summary": { "type": "string", "description": "Event title to match" } }, "required": ["event_time", "event_summary"] } ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: try: if name == "list": formatted_text = await asyncio.wait_for( calendar_service.list_events(), timeout=60 ) return [types.TextContent( type="text", text=formatted_text )] elif name == "create-event": if not arguments.get("summary"): return [types.TextContent( type="text", text="Error: Event title (summary) is required" )] if not arguments.get("start_time"): return [types.TextContent( type="text", text="Error: Start time is required" )] result = await asyncio.wait_for( calendar_service.create_event( summary=arguments["summary"], start_time=arguments["start_time"], end_time=arguments.get("end_time"), description=arguments.get("description") ), timeout=60 ) return [types.TextContent( type="text", text=result )] elif name == "delete-duplicates": if not arguments.get("target_date"): return [types.TextContent( type="text", text="Error: Target date is required" )] if not arguments.get("event_summary"): return [types.TextContent( type="text", text="Error: Event title is required" )] result = await asyncio.wait_for( calendar_service.delete_duplicate_events( target_date=arguments["target_date"], event_summary=arguments["event_summary"] ), timeout=60 ) return [types.TextContent( type="text", text=result )] elif name == "delete-event": if not arguments.get("event_time"): return [types.TextContent( type="text", text="Error: Event time is required" )] if not arguments.get("event_summary"): return [types.TextContent( type="text", text="Error: Event title is required" )] result = await asyncio.wait_for( calendar_service.delete_single_event( event_time=arguments["event_time"], event_summary=arguments["event_summary"] ), timeout=60 ) return [types.TextContent( type="text", text=result )] else: return [types.TextContent( type="text", text=f"Unknown tool: {name}" )] except asyncio.TimeoutError: error_msg = f"Operation timed out while executing {name}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] except Exception as e: error_msg = f"Error executing {name}: {str(e)}" logger.error(error_msg) return [types.TextContent( type="text", text=error_msg )] @server.list_resources() async def handle_list_resources() -> list[types.Resource]: return [] @server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: return [] async def run(): global calendar_service try: # Initialize calendar service logger.info("Starting Calendar Server...") script_dir = os.path.dirname(os.path.abspath(__file__)) calendar_service = CalendarService( credentials_path=os.path.join(script_dir, "..", "credentials", "credentials.json"), token_path=os.path.join(script_dir, "..", "credentials", "token.json") ) # Authentication with longer timeout try: await asyncio.wait_for(calendar_service.authenticate(), timeout=60) logger.info("Authentication successful") except asyncio.TimeoutError: logger.error("Authentication timed out") raise # Run the server logger.info("Calendar Server is ready") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="gcalendar", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ) ) ) except ConnectionError as e: logger.error(f"Connection error: {str(e)}") raise except IOError as e: logger.error(f"IO error: {str(e)}") raise except Exception as e: logger.error(f"Error in run: {str(e)}") raise finally: if calendar_service: logger.info("Closing calendar service...") calendar_service.close() async def main(): try: await run() except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: if calendar_service: calendar_service.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass # Handle graceful shutdown ``` -------------------------------------------------------------------------------- /src/calendar_service.py: -------------------------------------------------------------------------------- ```python from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.errors import HttpError from datetime import datetime, timedelta import logging import asyncio import os import json from zoneinfo import ZoneInfo # Set up logging log_formatter = logging.Formatter( '[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) # File handler log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'GCalendar/logs', 'calendar_service.log') # Create logs directory if it doesn't exist log_dir = os.path.dirname(log_file) os.makedirs(log_dir, exist_ok=True) file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(log_formatter) # Set up logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.addHandler(console_handler) logger.addHandler(file_handler) class CalendarService: SCOPES = ['https://www.googleapis.com/auth/calendar'] TIMEZONE = 'Asia/Bangkok' def __init__(self, credentials_path: str, token_path: str): self.credentials_path = credentials_path self.token_path = token_path self.creds = None self.service = None self.tz = ZoneInfo(self.TIMEZONE) self.events_cache = {} # Initialize events cache async def authenticate(self): """Authenticate with Google Calendar API""" try: self.creds = Credentials.from_authorized_user_file(self.token_path, self.SCOPES) # Check if credentials are expired and refresh if needed if self.creds and self.creds.expired and self.creds.refresh_token: logger.info("Token expired, refreshing...") self.creds.refresh(Request()) # Save refreshed credentials with open(self.token_path, 'w') as token: token.write(self.creds.to_json()) logger.info("Token refreshed and saved") self.service = build('calendar', 'v3', credentials=self.creds) logger.info("Authentication successful") return True except Exception as e: logger.error(f"Authentication error: {str(e)}") raise async def list_events(self, max_results: int = 1000): """List calendar events and cache their IDs""" try: if not self.service: await self.authenticate() logger.info("Fetching calendar events...") now = datetime.now(self.tz) two_years_ago = now - timedelta(days=730) one_year_later = now + timedelta(days=365) events_result = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().list( calendarId='primary', timeMin=two_years_ago.isoformat(), timeMax=one_year_later.isoformat(), maxResults=max_results, singleEvents=True, orderBy='startTime', timeZone=self.TIMEZONE ).execute() ) events = events_result.get('items', []) logger.info(f"Found {len(events)} events") if not events: return "No events found." # Reset and update cache self.events_cache = {} formatted_text = "" for event in events: start_time = self._format_event_time(event) summary = event.get('summary', 'No title') cache_key = f"{start_time} {summary}" formatted_text += f"{cache_key}\n" self.events_cache[cache_key] = event['id'] return formatted_text except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg def _format_event_time(self, event: dict) -> str: """Format event time consistently with timezone""" start = event['start'].get('dateTime', event['start'].get('date')) if 'T' in start: # This is a datetime dt = datetime.fromisoformat(start) if dt.tzinfo is None: # Add timezone if not present dt = dt.replace(tzinfo=self.tz) formatted_time = dt.strftime('%Y-%m-%dT%H:%M:%S%z') else: # This is a date formatted_time = start return formatted_time async def delete_single_event(self, event_time: str, event_summary: str): """ Delete a specific event by its time and summary Parameters: - event_time: Event time in format matching list output - event_summary: Event title to match Returns: - Status message """ try: if not self.service: await self.authenticate() # First update the cache await self.list_events() # Get event ID from cache cache_key = f"{event_time} {event_summary}" event_id = self.events_cache.get(cache_key) if not event_id: return f"Event not found: {cache_key}" # Delete the event await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().delete( calendarId='primary', eventId=event_id ).execute() ) logger.info(f"Deleted event: {event_id} - {cache_key}") return f"Successfully deleted event: {cache_key}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def create_event(self, summary: str, start_time: str, end_time: str = None, description: str = None): """ Create a new calendar event Parameters: - summary: Event title - start_time: Start time in YYYY-MM-DDTHH:MM:SS format or date in YYYY-MM-DD format - end_time: End time (optional). If not provided, event will be 1 hour long - description: Event description (optional) Returns: - Status message """ try: if not self.service: await self.authenticate() # Parse start time try: # Check if time is included if 'T' in start_time: start_dt = datetime.fromisoformat(start_time) is_datetime = True else: start_dt = datetime.strptime(start_time, '%Y-%m-%d') is_datetime = False except ValueError: return f"Invalid start time format: {start_time}. Use YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD" # Handle end time if end_time: try: if 'T' in end_time: end_dt = datetime.fromisoformat(end_time) else: end_dt = datetime.strptime(end_time, '%Y-%m-%d') if is_datetime: # If start has time but end doesn't, make end time 23:59:59 end_dt = end_dt.replace(hour=23, minute=59, second=59) except ValueError: return f"Invalid end time format: {end_time}. Use YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD" else: # Default to 1 hour duration for datetime events, or same day for date events if is_datetime: end_dt = start_dt + timedelta(hours=1) else: end_dt = start_dt # Create event body event_body = { 'summary': summary, 'start': { 'dateTime' if is_datetime else 'date': start_dt.isoformat(), 'timeZone': self.TIMEZONE if is_datetime else None }, 'end': { 'dateTime' if is_datetime else 'date': end_dt.isoformat(), 'timeZone': self.TIMEZONE if is_datetime else None } } # Add optional description if provided if description: event_body['description'] = description # Create the event created_event = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().insert( calendarId='primary', body=event_body ).execute() ) logger.info(f"Created event: {created_event['id']} - {summary}") return f"Successfully created event: {summary}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def delete_duplicate_events(self, target_date: str, event_summary: str): """ Delete duplicate events on a specific date Parameters: - target_date: Target date in YYYY-MM-DD format - event_summary: Event title to match Returns: - Status message """ try: if not self.service: await self.authenticate() # Parse target date try: target_dt = datetime.strptime(target_date, '%Y-%m-%d') except ValueError: return f"Invalid date format: {target_date}. Use YYYY-MM-DD" # Set time range for the target date start_dt = target_dt.replace(hour=0, minute=0, second=0, tzinfo=self.tz) end_dt = target_dt.replace(hour=23, minute=59, second=59, tzinfo=self.tz) # Get events for the target date events_result = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().list( calendarId='primary', timeMin=start_dt.isoformat(), timeMax=end_dt.isoformat(), singleEvents=True, orderBy='startTime', timeZone=self.TIMEZONE ).execute() ) events = events_result.get('items', []) # Find duplicate events duplicate_ids = [] seen_times = set() for event in events: if event.get('summary') == event_summary: start_time = self._format_event_time(event) if start_time in seen_times: duplicate_ids.append(event['id']) else: seen_times.add(start_time) if not duplicate_ids: return f"No duplicate events found for '{event_summary}' on {target_date}" # Delete duplicate events deleted_count = 0 for event_id in duplicate_ids: try: await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().delete( calendarId='primary', eventId=event_id ).execute() ) deleted_count += 1 except HttpError as error: logger.error(f"Error deleting event {event_id}: {str(error)}") logger.info(f"Deleted {deleted_count} duplicate events for '{event_summary}' on {target_date}") return f"Successfully deleted {deleted_count} duplicate events for '{event_summary}' on {target_date}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def renew_token(self): """Renew the authentication token""" try: if not self.creds: self.creds = Credentials.from_authorized_user_file(self.token_path, self.SCOPES) if self.creds and self.creds.expired and self.creds.refresh_token: logger.info("Token expired, refreshing...") self.creds.refresh(Request()) # Save refreshed credentials with open(self.token_path, 'w') as token: token.write(self.creds.to_json()) logger.info("Token refreshed and saved") return "Token renewed successfully" else: return "Token is still valid" except Exception as e: error_msg = f"Error renewing token: {str(e)}" logger.error(error_msg) return error_msg def close(self): """Clean up resources""" if self.service: logger.info("Closing calendar service") self.service.close() ``` -------------------------------------------------------------------------------- /GCalendar/src/calendar_service.py: -------------------------------------------------------------------------------- ```python from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.errors import HttpError from datetime import datetime, timedelta import logging import asyncio import os import json from zoneinfo import ZoneInfo import sys # Set up logging log_formatter = logging.Formatter( '[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) # File handler log_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs') os.makedirs(log_dir, exist_ok=True) # Create logs directory if it doesn't exist log_file = os.path.join(log_dir, 'calendar_service.log') file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(log_formatter) # Set up logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.addHandler(console_handler) logger.addHandler(file_handler) class CalendarService: SCOPES = ['https://www.googleapis.com/auth/calendar'] # Try to use Bangkok timezone, but fallback to others if not available TIMEZONE = 'Asia/Bangkok' @classmethod def get_available_timezone(cls): """Get an available timezone closest to Bangkok, with fallbacks""" timezones_to_try = [ 'Asia/Bangkok', # First choice (UTC+7) 'Asia/Jakarta', # Second choice (UTC+7) 'Asia/Singapore', # Third choice (UTC+8) 'Asia/Kolkata', # Fourth choice (UTC+5:30) 'UTC' # Last resort ] for tz in timezones_to_try: try: ZoneInfo(tz) logger.info(f"Using timezone: {tz}") return tz except Exception as e: logger.warning(f"Timezone {tz} not available: {str(e)}") logger.error("No suitable timezone found. Please install tzdata package with: pip install tzdata") sys.exit(1) def __init__(self, credentials_path: str, token_path: str): self.credentials_path = credentials_path self.token_path = token_path self.creds = None self.service = None # Use the first available timezone self.TIMEZONE = self.get_available_timezone() self.tz = ZoneInfo(self.TIMEZONE) self.events_cache = {} # Initialize events cache async def authenticate(self): """Authenticate with Google Calendar API""" try: self.creds = Credentials.from_authorized_user_file(self.token_path, self.SCOPES) # Check if credentials are expired and refresh if needed if self.creds and self.creds.expired and self.creds.refresh_token: logger.info("Token expired, refreshing...") self.creds.refresh(Request()) # Save refreshed credentials with open(self.token_path, 'w') as token: token.write(self.creds.to_json()) logger.info("Token refreshed and saved") self.service = build('calendar', 'v3', credentials=self.creds) logger.info("Authentication successful") return True except Exception as e: logger.error(f"Authentication error: {str(e)}") raise async def list_events(self, max_results: int = 1000): """List calendar events and cache their IDs""" try: if not self.service: await self.authenticate() logger.info("Fetching calendar events...") now = datetime.now(self.tz) two_years_ago = now - timedelta(days=730) one_year_later = now + timedelta(days=365) events_result = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().list( calendarId='primary', timeMin=two_years_ago.isoformat(), timeMax=one_year_later.isoformat(), maxResults=max_results, singleEvents=True, orderBy='startTime', timeZone=self.TIMEZONE ).execute() ) events = events_result.get('items', []) logger.info(f"Found {len(events)} events") if not events: return "No events found." # Reset and update cache self.events_cache = {} formatted_text = "" for event in events: start_time = self._format_event_time(event) summary = event.get('summary', 'No title') cache_key = f"{start_time} {summary}" formatted_text += f"{cache_key}\n" self.events_cache[cache_key] = event['id'] return formatted_text except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg def _format_event_time(self, event: dict) -> str: """Format event time consistently with timezone""" start = event['start'].get('dateTime', event['start'].get('date')) if 'T' in start: # This is a datetime dt = datetime.fromisoformat(start) if dt.tzinfo is None: # Add timezone if not present dt = dt.replace(tzinfo=self.tz) formatted_time = dt.strftime('%Y-%m-%dT%H:%M:%S%z') else: # This is a date formatted_time = start return formatted_time async def delete_single_event(self, event_time: str, event_summary: str): """ Delete a specific event by its time and summary Parameters: - event_time: Event time in format matching list output - event_summary: Event title to match Returns: - Status message """ try: if not self.service: await self.authenticate() # First update the cache await self.list_events() # Get event ID from cache cache_key = f"{event_time} {event_summary}" event_id = self.events_cache.get(cache_key) if not event_id: return f"Event not found: {cache_key}" # Delete the event await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().delete( calendarId='primary', eventId=event_id ).execute() ) logger.info(f"Deleted event: {event_id} - {cache_key}") return f"Successfully deleted event: {cache_key}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def create_event(self, summary: str, start_time: str, end_time: str = None, description: str = None): """ Create a new calendar event Parameters: - summary: Event title - start_time: Start time in YYYY-MM-DDTHH:MM:SS format or date in YYYY-MM-DD format - end_time: End time (optional). If not provided, event will be 1 hour long - description: Event description (optional) Returns: - Status message """ try: if not self.service: await self.authenticate() # Parse start time try: # Check if time is included if 'T' in start_time: start_dt = datetime.fromisoformat(start_time) is_datetime = True else: start_dt = datetime.strptime(start_time, '%Y-%m-%d') is_datetime = False except ValueError: return f"Invalid start time format: {start_time}. Use YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD" # Handle end time if end_time: try: if 'T' in end_time: end_dt = datetime.fromisoformat(end_time) else: end_dt = datetime.strptime(end_time, '%Y-%m-%d') if is_datetime: # If start has time but end doesn't, make end time 23:59:59 end_dt = end_dt.replace(hour=23, minute=59, second=59) except ValueError: return f"Invalid end time format: {end_time}. Use YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD" else: # Default to 1 hour duration for datetime events, or same day for date events if is_datetime: end_dt = start_dt + timedelta(hours=1) else: end_dt = start_dt # Create event body event_body = { 'summary': summary, 'start': { 'dateTime' if is_datetime else 'date': start_dt.isoformat(), 'timeZone': self.TIMEZONE if is_datetime else None }, 'end': { 'dateTime' if is_datetime else 'date': end_dt.isoformat(), 'timeZone': self.TIMEZONE if is_datetime else None } } # Add optional description if provided if description: event_body['description'] = description # Create the event created_event = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().insert( calendarId='primary', body=event_body ).execute() ) logger.info(f"Created event: {created_event['id']} - {summary}") return f"Successfully created event: {summary}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def delete_duplicate_events(self, target_date: str, event_summary: str): """ Delete duplicate events on a specific date Parameters: - target_date: Target date in YYYY-MM-DD format - event_summary: Event title to match Returns: - Status message """ try: if not self.service: await self.authenticate() # Parse target date try: target_dt = datetime.strptime(target_date, '%Y-%m-%d') except ValueError: return f"Invalid date format: {target_date}. Use YYYY-MM-DD" # Set time range for the target date start_dt = target_dt.replace(hour=0, minute=0, second=0, tzinfo=self.tz) end_dt = target_dt.replace(hour=23, minute=59, second=59, tzinfo=self.tz) # Get events for the target date events_result = await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().list( calendarId='primary', timeMin=start_dt.isoformat(), timeMax=end_dt.isoformat(), singleEvents=True, orderBy='startTime', timeZone=self.TIMEZONE ).execute() ) events = events_result.get('items', []) # Find duplicate events duplicate_ids = [] seen_times = set() for event in events: if event.get('summary') == event_summary: start_time = self._format_event_time(event) if start_time in seen_times: duplicate_ids.append(event['id']) else: seen_times.add(start_time) if not duplicate_ids: return f"No duplicate events found for '{event_summary}' on {target_date}" # Delete duplicate events deleted_count = 0 for event_id in duplicate_ids: try: await asyncio.get_event_loop().run_in_executor( None, lambda: self.service.events().delete( calendarId='primary', eventId=event_id ).execute() ) deleted_count += 1 except HttpError as error: logger.error(f"Error deleting event {event_id}: {str(error)}") logger.info(f"Deleted {deleted_count} duplicate events for '{event_summary}' on {target_date}") return f"Successfully deleted {deleted_count} duplicate events for '{event_summary}' on {target_date}" except HttpError as error: error_msg = f"An error occurred: {str(error)}" logger.error(error_msg) return error_msg except Exception as error: error_msg = f"Unexpected error: {str(error)}" logger.error(error_msg) return error_msg async def renew_token(self): """Renew the authentication token""" try: if not self.creds: self.creds = Credentials.from_authorized_user_file(self.token_path, self.SCOPES) if self.creds and self.creds.expired and self.creds.refresh_token: logger.info("Token expired, refreshing...") self.creds.refresh(Request()) # Save refreshed credentials with open(self.token_path, 'w') as token: token.write(self.creds.to_json()) logger.info("Token refreshed and saved") return "Token renewed successfully" else: return "Token is still valid" except Exception as e: error_msg = f"Error renewing token: {str(e)}" logger.error(error_msg) return error_msg def close(self): """Clean up resources""" if self.service: logger.info("Closing calendar service") self.service.close() ```