# Directory Structure ``` ├── .gitignore ├── .python-version ├── LICENSE ├── pyproject.toml ├── README.md ├── src │ └── flights-mcp │ ├── main.py │ └── proposal.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.12 2 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python-generated files 2 | __pycache__/ 3 | *.py[oc] 4 | build/ 5 | dist/ 6 | wheels/ 7 | *.egg-info 8 | 9 | # Virtual environments 10 | .venv 11 | .env 12 | *.ipynb 13 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # ✈️ Flights MCP Server 2 | 3 | A Model Context Protocol (MCP) server that provides flight search capabilities using the Aviasales Flight Search API. This server allows you to search for flights, filter results, get detailed flight information, and generate booking links. 4 | 5 | https://github.com/user-attachments/assets/87d79d54-c4ab-4938-9792-18572315f1ba 6 | 7 | ## How to use 8 | 9 | You can either use the **remote MCP server** or deploy your own instance: 10 | 11 | - **Remote MCP** 12 | A public instance is available at: 13 | `https://findflights.me/sse` 14 | This server uses the **SSE** transport protocol and is ready to use without setup. 15 | > ⚠️ **Important:** Currently not all LLM clients support remote MCP connections. For example, Claude.ai supports remote MCP integrations only on **Pro+ plans**. 16 | 17 | - **Self-Hosted Deployment** 18 | If you prefer to run your own server, follow the instructions in the [Installation](#installation) section. 19 | > **Note**: To deploy your own server, you must obtain an Aviasales API Key and Marker ID. 20 | 21 | ## Features 22 | 23 | - **Flight Search**: Search for one-way, round-trip, and multi-city flights 24 | - **Advanced Filtering**: Filter results by price, duration, airlines, departure/arrival times, and number of stops 25 | - **Multiple Sorting Options**: Sort by price, departure time, arrival time, or duration 26 | - **Detailed Flight Information**: Get comprehensive flight details including baggage allowances and airline information 27 | - **Booking Links**: Generate booking links for selected flights 28 | - **Local Storage**: Performed searches are stored locally so LLM can access past searches without waiting 29 | - **Multiple MCP Transport Options**: Supports stdio, HTTP, and SSE transports 30 | 31 | ## Installation 32 | 33 | ### Prerequisites 34 | 35 | - Aviasales API key 36 | - Python 3.12 or higher 37 | - UV package manager 38 | 39 | ### Setup 40 | 41 | 1. Clone the repository: 42 | ```bash 43 | git clone <repository-url> 44 | cd flights-mcp 45 | ``` 46 | 47 | 3. Set up environment variables (see [Environment Variables](#environment-variables) section) 48 | 49 | 4. Run the server 50 | ```bash 51 | uv run src/flights-mcp/main.py 52 | ``` 53 | 54 | The server binds to 0.0.0.0, making it accessible on all network interfaces of the host machine. 55 | 56 | ## Environment Variables 57 | 58 | The following environment variables are required: 59 | 60 | - **`FLIGHTS_AVIASALES_API_TOKEN`** *(required)*: Your Aviasales API token 61 | 62 | - **`FLIGHTS_AVIASALES_MARKER`** *(required)*: Your Aviasales marker ID 63 | 64 | - **`FLIGHTS_TRANSPORT`** *(optional)*: Transport protocol to use 65 | - Options: `stdio` (default), `streamable_http`, `sse` 66 | 67 | - **`FLIGHTS_HTTP_PORT`** *(optional)*: Port for HTTP/SSE transport 68 | - Only used when `FLIGHTS_TRANSPORT` is `streamable_http` or `sse` 69 | - Default: `4200` 70 | 71 | - **`FLIGHTS_HTTP_PATH`** *(optional)*: URI path for the endpoint 72 | - Only used when `FLIGHTS_TRANSPORT` is `streamable_http` or `sse` 73 | - Default: `/mcp` 74 | 75 | ## MCP Tools 76 | 77 | The server provides the following MCP tools: 78 | 79 | | Tool | Description | 80 | |-------------------------|--------------------------------------------------------------------------------------------------------------------| 81 | | `search_flights` | Searches for flights using the Aviasales Flight Search API. Returns search description with `search_id` and summary of found options. | 82 | | `get_flight_options` | Retrieves, filters, and sorts flight options from a previous search. Returns a paginated list of filtered flight options. | 83 | | `get_flight_option_details` | Returns detailed flight information including segments, pricing, baggage allowances, and agency terms. | 84 | | `request_booking_link` | Generates a booking link for a specific flight option. | 85 | 86 | 87 | ## Typical Usage Pattern 88 | 89 | 1. **Search for flights** using `search_flights()` - Call multiple times for flexible dates 90 | 2. **Filter and browse options** using `get_flight_options()` - Lightweight tool, call multiple times with different filters and sorting option 91 | 3. **Get detailed information** using `get_flight_option_details()` - For user's preferred options 92 | 4. **Generate booking link** using `request_booking_link()` - Only when user confirms booking intent 93 | 94 | ## Support 95 | 96 | For issues related to: 97 | - **MCP Server**: Open an issue in this repository 98 | - **MCP Protocol**: See [Model Context Protocol Documentation](https://modelcontextprotocol.io/) 99 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "flights-mcp" 3 | version = "0.1.0" 4 | description = "Add your description here" 5 | readme = "README.md" 6 | requires-python = ">=3.12" 7 | dependencies = [ 8 | "cachetools>=6.0.0", 9 | "fastmcp>=2.5.2", 10 | ] 11 | ``` -------------------------------------------------------------------------------- /src/flights-mcp/main.py: -------------------------------------------------------------------------------- ```python 1 | import hashlib 2 | import os 3 | from typing import Any, Dict, List 4 | import sys 5 | import httpx 6 | import asyncio 7 | from datetime import datetime, timedelta 8 | from pydantic import BaseModel, Field 9 | from fastmcp import FastMCP, Context 10 | from fastmcp.exceptions import ToolError 11 | from proposal import * 12 | import json 13 | from cachetools import TTLCache 14 | 15 | 16 | API_TOKEN: str = os.getenv("FLIGHTS_AVIASALES_API_TOKEN") 17 | MARKER: str = os.getenv("FLIGHTS_AVIASALES_MARKER") 18 | if not API_TOKEN: 19 | raise ValueError("FLIGHTS_AVIASALES_API_TOKEN environment variable is not set.") 20 | if not MARKER: 21 | raise ValueError("FLIGHTS_AVIASALES_MARKER environment variable is not set.") 22 | 23 | SEARCH_URL = "https://api.travelpayouts.com/v1/flight_search" 24 | RESULTS_URL = "https://api.travelpayouts.com/v1/flight_search_results" 25 | 26 | search_results_cache = TTLCache( 27 | maxsize=10000, # Maximum number of cached items 28 | ttl=10 * 60, # Time to live for each cached item (10 minutes) 29 | ) 30 | 31 | def _collect_values_sorted(obj: Any) -> List[str]: 32 | """Return *primitive* values from *obj* following Travelpayouts ordering.""" 33 | 34 | if obj is None: 35 | return [] 36 | if isinstance(obj, (str, int, float, bool)): 37 | return [str(obj)] 38 | if isinstance(obj, dict): 39 | values: List[str] = [] 40 | for key in sorted(obj.keys()): # alphabetical keys 41 | values.extend(_collect_values_sorted(obj[key])) 42 | return values 43 | if isinstance(obj, (list, tuple)): 44 | values: List[str] = [] 45 | for item in obj: # *preserve* list order 46 | values.extend(_collect_values_sorted(item)) 47 | return values 48 | # Unsupported types – ignore 49 | return [] 50 | 51 | 52 | def _generate_signature(token: str, body_without_sig: Dict[str, Any]) -> str: 53 | """Compute MD5 signature per Travelpayouts Flight Search spec.""" 54 | 55 | ordered_values = _collect_values_sorted(body_without_sig) 56 | base_string = ":".join([token] + ordered_values) 57 | print(f"Signature base string: {base_string}", file=sys.stderr) 58 | return hashlib.md5(base_string.encode()).hexdigest() 59 | 60 | class SearchRequestSegmentModel(BaseModel): 61 | origin: str = Field(..., description='Origin IATA (this can be airport IATA or in case city has multiple airports better to use city IATA). The IATA code is shown in uppercase letters LON or MOW') 62 | destination: str = Field(..., description='Destination IATA (this can be airport IATA or in case city has multiple airports better to use city IATA). The IATA code is shown in uppercase letters LON or MOW') 63 | date: str = Field(..., description="Departure date in YYYY-MM-DD format") 64 | 65 | class SearchRequestModel(BaseModel): 66 | """Search request model for Travelpayouts Flight Search API.""" 67 | segments: List[SearchRequestSegmentModel] = Field(..., description='''List of CONNECTED flight segments for the same journey. Each segment represents one leg of a multi-city trip or round trip. 68 | IMPORTANT: Do NOT use multiple segments for alternative dates of the same route. For flexible dates, perform separate searches. 69 | 70 | Examples: 71 | - One way: [{'origin': 'SFO', 'destination': 'LAX', 'date': '2023-10-01'}] 72 | - Round trip: [{'origin': 'SFO', 'destination': 'LAX', 'date': '2023-10-01'}, {'origin': 'LAX', 'destination': 'SFO', 'date': '2023-10-15'}] 73 | - Multi-city: [{'origin': 'SFO', 'destination': 'LAX', 'date': '2023-10-01'}, {'origin': 'LAX', 'destination': 'JFK', 'date': '2023-10-05'}] 74 | 75 | For alternative dates (e.g., 'July 13 OR July 14'), use separate calls of this tool.''') 76 | adults: int = Field(1, ge=1, le=9, description="Number of adult passengers (12 years old and older)") 77 | children: int = Field(0, ge=0, le=6, description="Number of children (2-11 years old)") 78 | infants: int = Field(0, ge=0, le=6, description="Number of infants (under 2 years old)") 79 | trip_class: str = Field("Y", description="Trip class - single letter: Y for economy, C for business. Default is Y (economy class)") 80 | currency: str = Field("USD", description="Currency code (default is USD)") 81 | locale: str = Field("en", description="Locale for the response (default is en). These are the supported locales: en-us, en-gb, ru, de, es, fr, pl") 82 | 83 | mcp = FastMCP("Flights Search", 84 | description="This MCP allows you to search for flights using the Aviasales Flight Search API. " \ 85 | "You can specify flight segments, number of passengers, trip class, currency, and locale, apply various filters, and retrieve detailed flight options. " \ 86 | "Typical usage pattern:\n1. search_flights() - Initial broad search (call multiple times if dates are flexible)\n2. get_flight_options() - Filter and sort results (very lightweight tool, call multiple times with different filters)\n3. get_flight_option_details() - Get full details for user's preferred options\n4. request_booking_link() - Only when user confirms booking intent\n\n") 87 | 88 | @mcp.tool( 89 | description="Search for flights using the Aviasales Flight Search API. " \ 90 | "This tool performs search based on the provided flight segments, number of passengers, trip class, currency, and locale. " \ 91 | "It provides search_id and description of search results and saves found options internally." \ 92 | "After receiving the result client can use `get_flight_options` tool to retrieve the found options with more granular filters." \ 93 | "IMPORTANT: All times are local to departure/arrival locations and use HH:MM 24-hour format." \ 94 | "IMPORTANT: Call this tool as many times as needed to find the best flight options." 95 | ) 96 | async def search_flights( 97 | request: SearchRequestModel, 98 | ctx: Context 99 | ) -> Dict[str, Any]: 100 | """Search for flights using Travelpayouts Flight Search API.""" 101 | 102 | request_body = request.model_dump() 103 | request_body["token"] = API_TOKEN 104 | request_body["marker"] = MARKER 105 | request_body["passengers"] = { 106 | "adults": request.adults, 107 | "children": request.children, 108 | "infants": request.infants 109 | } 110 | del request_body["adults"] 111 | del request_body["children"] 112 | del request_body["infants"] 113 | 114 | signature = _generate_signature(API_TOKEN, request_body) 115 | request_body["signature"] = signature 116 | 117 | async with httpx.AsyncClient(timeout=40) as client: 118 | init_resp = await client.post(SEARCH_URL, json=request_body) 119 | if init_resp.status_code != 200: 120 | raise ToolError(f"Aviasales API returned non-200 status code: {init_resp.status_code}, raw text: {init_resp.text}") 121 | 122 | init_data = init_resp.json() 123 | search_id = init_data["search_id"] 124 | set_currency_rates(init_data["currency_rates"]) 125 | 126 | deadline = datetime.now() + timedelta(seconds=90) 127 | batch_proposals = None 128 | 129 | while datetime.now() < deadline: 130 | await asyncio.sleep(5) 131 | res_r = await client.get(f"{RESULTS_URL}?uuid={search_id}") 132 | res_r.raise_for_status() 133 | res_json = res_r.json() 134 | 135 | # Defensive: ensure we got a *list* per the API spec. 136 | if not isinstance(res_json, list): 137 | raise ToolError("Unexpected response format: expected a list of results") 138 | 139 | # Aggregate proposals from every object that contains them. 140 | for obj in res_json: 141 | if isinstance(obj, dict) and obj.get("proposals"): 142 | try: 143 | if not batch_proposals: 144 | batch_proposals = parse_proposals_batch(obj) 145 | else: 146 | batch_proposals = batch_proposals.combine_with(parse_proposals_batch(obj)) 147 | except Exception as e: 148 | print(f"Error parsing proposals: \n {json.dumps(obj, indent=2)}", file=sys.stderr) 149 | raise 150 | 151 | ctx.report_progress(progress=len(batch_proposals.proposals), total=None, message=f"Found {len(batch_proposals.proposals)} options so far...") 152 | if set(obj.keys()) == {"search_id"}: 153 | search_results_cache[search_id] = batch_proposals 154 | return batch_proposals.get_description() 155 | 156 | search_results_cache[search_id] = batch_proposals 157 | return batch_proposals.get_description() if batch_proposals else "No proposals found until the search timed out." 158 | 159 | @mcp.tool( 160 | description="Get flight options from the previously performed search. " \ 161 | "This tool allows you to filter the found flight options by price, departure and arrival times, and airlines. " \ 162 | "It returns a paginated list of flight options that match the specified filters and sorting option." \ 163 | "IMPORTANT: This is very cheap operation, so you can call it as many times as needed to find the best flight options." 164 | ) 165 | def get_flight_options( 166 | search_id: str, 167 | filters: FiltersModel, 168 | page: int = Field(0, description="Page number for pagination. Default is 0."), 169 | page_size: int = Field(10, description="Number of results per page. Default is 10.") 170 | ): 171 | batch = search_results_cache.get(search_id) 172 | if not batch: 173 | raise ToolError(f"No search results found for search_id: {search_id}. " \ 174 | "It may have expired after 10 minutes or not been performed yet. " \ 175 | "Please perform a search first using the `search_flights` tool.") 176 | filtered_batch = batch.apply_filters(filters) 177 | 178 | if not filtered_batch.proposals: 179 | raise ToolError(f"No flight options found for search_id: {search_id} with the specified filters.") 180 | 181 | total_results = len(filtered_batch.proposals) 182 | start_index = page * page_size 183 | end_index = start_index + page_size 184 | paginated_results = filtered_batch.proposals[start_index:end_index] 185 | result = f'Retrieved {len(paginated_results)} flight options for search_id: {search_id} (Page {page}/{(total_results // page_size) + 1})\n\n' 186 | 187 | for i, proposal in enumerate(paginated_results): 188 | result += proposal.get_short_description() 189 | if i < len(paginated_results) - 1: 190 | result += "\n---\n" 191 | return result 192 | 193 | @mcp.tool(description="Retrieve detailed information about a specific flight option from the search results. " \ 194 | "This tool provides detailed information about a flight option, including its segments, price, baggage info. " \ 195 | "It is useful for getting more granular information about a specific flight option.") 196 | def get_flight_option_details( 197 | search_id: str = Field(..., description="Search ID from the previous search_flights tool."), 198 | offer_id: str = Field(..., description="Offer ID of the flight option for which to request a booking link."), 199 | ) -> Dict[str, Any]: 200 | """Get detailed information about a specific flight option from the search results.""" 201 | 202 | batch = search_results_cache.get(search_id) 203 | if not batch: 204 | raise ToolError(f"No search results found for search_id: {search_id}. " \ 205 | "It may have expired after 10 minutes. " \ 206 | "Please perform a search first using the `search_flights` tool.") 207 | 208 | proposal = batch.get_proposal_by_id(offer_id) 209 | if not proposal: 210 | raise ToolError(f"No flight details found for offer_id: {offer_id} in search_id: {search_id}.") 211 | 212 | return proposal.get_full_description() 213 | 214 | @mcp.tool( 215 | description="Request link for booking a flight option. " \ 216 | "This tool generates a booking link for a specific flight option." \ 217 | "This tool is recommended to be used after the user expressed intention to book the flight option." \ 218 | ) 219 | async def request_booking_link( 220 | search_id: str = Field(..., description="Search ID from the previous search_flights tool."), 221 | offer_id: str = Field(..., description="Offer ID of the flight option for which to request a booking link."), 222 | agency_id: str = Field(..., description="Internal agency ID for generating booking link.") 223 | ) -> str: 224 | """Request a booking link for a specific flight option.""" 225 | 226 | batch = search_results_cache.get(search_id) 227 | if not batch: 228 | raise ToolError(f"No search results found for search_id: {search_id}. " \ 229 | "It may have expired after 10 minutes. " \ 230 | "Please perform a search first using the `search_flights` tool.") 231 | 232 | proposal = batch.get_proposal_by_id(offer_id) 233 | if not proposal: 234 | raise ToolError(f"No flight details found for offer_id: {offer_id} in search_id: {search_id}.") 235 | 236 | terms = proposal.terms[agency_id] 237 | 238 | get_book_link_api_url = f"https://api.travelpayouts.com/v1/flight_searches/{search_id}/clicks/{terms.url}.json?marker={MARKER}" 239 | async with httpx.AsyncClient(timeout=40) as client: 240 | response = await client.get(get_book_link_api_url) 241 | if response.status_code != 200: 242 | raise ToolError(f"Aviasales API returned non-200 status code: {response.status_code}", raw_response=response.text) 243 | data = response.json() 244 | if not data or "url" not in data: 245 | raise ToolError("Booking link not found in the response from Aviasales API.") 246 | book_link = data["url"] 247 | agency_name = batch.gates_info.get(agency_id).label if batch.gates_info.get(agency_id) else '' 248 | return f"Booking link on {agency_name}: {book_link}" 249 | 250 | return 251 | 252 | if __name__ == "__main__": 253 | use_streamable_http = os.getenv("FLIGHTS_TRANSPORT", '').lower() == 'streamable_http' 254 | use_sse = os.getenv("FLIGHTS_TRANSPORT", '').lower() == 'sse' 255 | if use_streamable_http: 256 | mcp.run( 257 | transport="streamable-http", 258 | host="0.0.0.0", 259 | port=int(os.getenv("FLIGHTS_HTTP_PORT", 4200)), 260 | path=os.getenv("FLIGHTS_HTTP_PATH", "/mcp"), 261 | log_level="debug", 262 | ) 263 | elif use_sse: 264 | mcp.run( 265 | transport="sse", 266 | host="0.0.0.0", 267 | port=int(os.getenv("FLIGHTS_HTTP_PORT", 4200)), 268 | path=os.getenv("FLIGHTS_HTTP_PATH", "/mcp"), 269 | log_level="debug", 270 | ) 271 | else: 272 | mcp.run(transport="stdio") 273 | 274 | ``` -------------------------------------------------------------------------------- /src/flights-mcp/proposal.py: -------------------------------------------------------------------------------- ```python 1 | from datetime import datetime, time 2 | from typing import Dict, List, Optional, Any, Union 3 | from pydantic import BaseModel, Field, field_validator, model_validator 4 | from typing_extensions import Self 5 | from enum import Enum 6 | 7 | currency_rates: Dict[str, float] = {} 8 | def set_currency_rates(rates: Dict[str, float]) -> None: 9 | global currency_rates 10 | currency_rates = rates 11 | def get_currency_rate(currency: str) -> float: 12 | if currency not in currency_rates: 13 | raise ValueError(f"Currency rate for {currency} not found in price rates.") 14 | return currency_rates[currency] 15 | 16 | def convert_unified_price_to_user(unified_price: int, currency: str) -> int: 17 | user_currency_rate = get_currency_rate(currency) 18 | if user_currency_rate is None: 19 | raise ValueError(f"Currency rate for {currency} not found in price rates.") 20 | return int(unified_price / user_currency_rate) 21 | 22 | def convert_user_price_to_unified(user_price: int, currency: str) -> int: 23 | user_currency_rate = get_currency_rate(currency) 24 | if user_currency_rate is None: 25 | raise ValueError(f"Currency rate for {currency} not found in price rates.") 26 | return int(user_price * user_currency_rate) 27 | 28 | def format_duration(minutes): 29 | hours = minutes // 60 30 | mins = minutes % 60 31 | return f"{hours:02d}h {mins:02d}m" 32 | 33 | class SortingMethod(str, Enum): 34 | CHEAP_FIRST = "cheap_first" 35 | EARLY_DEPARTURE_FIRST = "early_departure_first" 36 | EARLY_ARRIVAL_FIRST = "early_arrival_first" 37 | MINIMAL_DURATION_FIRST = "minimal_duration_first" 38 | 39 | 40 | class TimeRange(BaseModel): 41 | """Time range for filtering departure/arrival times""" 42 | start_time: Optional[str] = Field(None, description="Start time in HH:MM format (e.g., '08:00')") 43 | end_time: Optional[str] = Field(None, description="End time in HH:MM format (e.g., '22:00')") 44 | 45 | @field_validator('start_time', 'end_time') 46 | def validate_time_format(cls, v): 47 | if v is None: 48 | return v 49 | try: 50 | datetime.strptime(v, '%H:%M') 51 | return v 52 | except ValueError: 53 | raise ValueError(f"Time must be in HH:MM format, got: {v}") 54 | 55 | 56 | class SegmentTimeFilter(BaseModel): 57 | """Time filters for a specific segment""" 58 | departure_time_range: Optional[TimeRange] = None 59 | arrival_time_range: Optional[TimeRange] = None 60 | 61 | 62 | class FiltersModel(BaseModel): 63 | """Model for filtering flight proposals""" 64 | max_total_duration: Optional[int] = Field(None, description="Maximum total duration of whole trip in minutes") 65 | max_price: Optional[int] = Field(None, description="Maximum price filter") 66 | allowed_airlines: Optional[List[str]] = Field(None, description="List of allowed airline IATA codes") 67 | segment_time_filters: Optional[List[SegmentTimeFilter]] = Field(None, description="Time filters for each segment") 68 | max_stops: Optional[int] = Field(None, description="Maximum number of stops allowed") 69 | sorting: Optional[SortingMethod] = Field(SortingMethod.CHEAP_FIRST, description="Sorting method") 70 | 71 | 72 | class TransferTerms(BaseModel): 73 | is_virtual_interline: bool 74 | 75 | class FlightAdditionalTariffInfo(BaseModel): 76 | return_before_flight: Optional[bool] = None 77 | return_after_flight: Optional[bool] = None 78 | change_before_flight: Optional[bool] = None 79 | change_after_flight: Optional[bool] = None 80 | 81 | @field_validator('*', mode='before') 82 | def extract_available(cls, value): 83 | if isinstance(value, dict): 84 | return value.get('available') 85 | return value 86 | 87 | 88 | class Flight(BaseModel): 89 | aircraft: Optional[str] = None 90 | arrival: str 91 | arrival_date: str 92 | arrival_time: str 93 | arrival_timestamp: int 94 | delay: int 95 | departure: str 96 | departure_date: str 97 | departure_time: str 98 | departure_timestamp: int 99 | duration: int 100 | equipment: Optional[str] = None 101 | local_arrival_timestamp: int 102 | local_departure_timestamp: int 103 | marketing_carrier: Optional[str] = None 104 | number: str 105 | operating_carrier: str 106 | operated_by: str 107 | rating: Optional[int] 108 | technical_stops: Optional[Any] 109 | trip_class: str 110 | # Baggage and tariff info moved from Terms to Flight level 111 | # baggage: Union[bool, str] = Field(description="Baggage allowance for this flight") 112 | # handbag: Union[bool, str] = Field(description="Handbag allowance for this flight") 113 | # additional_tariff_info: Optional[FlightAdditionalTariffInfo] = Field(description="Tariff information for this flight") 114 | # baggage_source: int = Field(description="Source of baggage information") 115 | # handbag_source: int = Field(description="Source of handbag information") 116 | 117 | def get_full_flight_number(self) -> str: 118 | if self.marketing_carrier is not None: 119 | return f"{self.marketing_carrier} {self.number}" 120 | return f"{self.operating_carrier} {self.number}" 121 | 122 | class Terms(BaseModel): 123 | currency: str 124 | price: int 125 | unified_price: int 126 | url: int 127 | transfer_terms: List[List[TransferTerms]] 128 | flights_baggage: List[List[Union[bool, str]]] 129 | flights_handbags: List[List[Union[bool, str]]] 130 | # Removed flights_baggage, flights_handbags, flight_additional_tariff_infos 131 | # as they are now part of Flight model 132 | 133 | 134 | class SegmentRating(BaseModel): 135 | total: float 136 | 137 | 138 | class VisaRules(BaseModel): 139 | required: bool 140 | 141 | 142 | class Duration(BaseModel): 143 | seconds: int 144 | 145 | 146 | class Transfer(BaseModel): 147 | at: str 148 | to: str 149 | airports: List[str] 150 | airlines: List[str] 151 | country_code: str 152 | city_code: str 153 | visa_rules: VisaRules 154 | night_transfer: bool 155 | tags: Optional[List[str]] = None 156 | duration_seconds: int 157 | duration: Duration 158 | 159 | 160 | class Segment(BaseModel): 161 | flight: List[Flight] 162 | rating: Optional[SegmentRating] = None 163 | transfers: Optional[List[Transfer]] = None 164 | 165 | @model_validator(mode='after') 166 | def check_passwords_match(self) -> Self: 167 | if len(self.flight) > 1: 168 | if not self.transfers: 169 | raise ValueError("Transfers must be provided for segments with multiple flights") 170 | return self 171 | 172 | class Proposal(BaseModel): 173 | terms: Dict[str, Terms] 174 | xterms: Dict[str, Dict[str, Terms]] 175 | segment: List[Segment] 176 | total_duration: int 177 | stops_airports: List[str] 178 | is_charter: bool 179 | max_stops: int 180 | max_stop_duration: Optional[int] = None 181 | min_stop_duration: Optional[int] = None 182 | carriers: List[str] 183 | segment_durations: List[int] 184 | segments_time: List[List[int]] 185 | segments_airports: List[List[str]] 186 | sign: str 187 | is_direct: bool 188 | flight_weight: float 189 | popularity: int 190 | segments_rating: Optional[float] = None 191 | tags: Optional[List[str]] = None 192 | validating_carrier: str 193 | 194 | batch_ref: 'ProposalsBatchModel' = Field(default=None, exclude=True) 195 | 196 | def merge_terms(self, other: 'Proposal') -> None: 197 | """ 198 | Merge terms from another Proposal into this one. 199 | 200 | Args: 201 | other: Another Proposal instance to merge terms from 202 | """ 203 | for gate_id, terms in other.terms.items(): 204 | if gate_id not in self.terms: 205 | self.terms[gate_id] = terms 206 | else: 207 | pass # If terms already exist, we assume they are the same and do not merge 208 | 209 | # Merge xterms similarly if needed 210 | for gate_id, xterms in other.xterms.items(): 211 | if gate_id not in self.xterms: 212 | self.xterms[gate_id] = xterms 213 | else: 214 | pass # If terms already exist, we assume they are the same and do not merge 215 | 216 | def get_cheapest_unified_price(self) -> int: 217 | """ 218 | Get the cheapest price and currency from all available terms. 219 | 220 | Returns: 221 | Tuple of (price, currency) 222 | """ 223 | min_unified_price = float('inf') 224 | 225 | for gate_id, terms in self.terms.items(): 226 | if terms.unified_price < min_unified_price: 227 | min_unified_price = terms.unified_price 228 | 229 | return int(min_unified_price) 230 | 231 | def get_earliest_departure_time(self) -> datetime: 232 | """Get the earliest departure time across all segments""" 233 | earliest_timestamp = min( 234 | flight.departure_timestamp 235 | for segment in self.segment 236 | for flight in segment.flight 237 | ) 238 | return datetime.fromtimestamp(earliest_timestamp) 239 | 240 | def get_latest_arrival_time(self) -> datetime: 241 | """Get the latest arrival time across all segments""" 242 | latest_timestamp = max( 243 | flight.arrival_timestamp 244 | for segment in self.segment 245 | for flight in segment.flight 246 | ) 247 | return datetime.fromtimestamp(latest_timestamp) 248 | 249 | def get_short_description(self, include_cheapest_price: bool = True, include_aircraft: bool = False) -> str: 250 | """ 251 | Generate a short description of the proposal. 252 | 253 | Returns: 254 | A string summarizing the proposal 255 | """ 256 | cheapest_price = self.get_cheapest_unified_price() 257 | formatted_price = f"{convert_unified_price_to_user(cheapest_price, self.batch_ref.currency)} {self.batch_ref.currency.upper()}" 258 | 259 | description_parts = [] 260 | description_parts.append(f"* **Offer ID**: {self.sign}") 261 | if include_cheapest_price: 262 | description_parts.append(f"* **Price**: {formatted_price}") 263 | for segment in self.segment: 264 | layover_count = len(segment.transfers) if segment.transfers else 0 265 | layovers_text = f"{layover_count} layover{'s' if layover_count != 1 else ''}" if layover_count > 0 else "direct flight" 266 | description_parts.append(f"* From {segment.flight[0].departure} to {segment.flight[-1].arrival} ({layovers_text})") 267 | 268 | for i, flight in enumerate(segment.flight): 269 | departure_airport = self.batch_ref.airports.get(flight.departure, flight.departure) 270 | arrival_airport = self.batch_ref.airports.get(flight.arrival, flight.arrival) 271 | airline_name = self.batch_ref.airlines.get(flight.operating_carrier).name if flight.operating_carrier in self.batch_ref.airlines else flight.operating_carrier 272 | aircraft_name = f", aircraft: {flight.aircraft}" if include_aircraft and flight.aircraft else '' 273 | description_parts.append(f" * From {departure_airport.name} ({flight.departure}) on {flight.departure_date} at {flight.departure_time} -> " \ 274 | f"to {arrival_airport.name} ({flight.arrival}) on {flight.arrival_date} at {flight.arrival_time} " \ 275 | f"(duration: {format_duration(flight.duration)}), flight number: {flight.get_full_flight_number()}{aircraft_name}, operated by {airline_name})") 276 | 277 | 278 | transfer = segment.transfers[i] if segment.transfers and i < len(segment.transfers) else None 279 | if transfer: 280 | if transfer.at != transfer.to: 281 | description_parts.append(f" * Airport change from {transfer.at} to {transfer.to}") 282 | else: 283 | transfer_duration = f'{transfer.duration_seconds // 60 // 60}h {transfer.duration_seconds // 60 % 60}m' 284 | description_parts.append(f" * Layover at {transfer.at} for {transfer_duration}") 285 | 286 | description_parts.append(f"* Total Duration: {format_duration(self.total_duration)}") 287 | return "\n".join(description_parts) 288 | 289 | def get_full_description(self) -> str: 290 | description_parts = [self.get_short_description(include_cheapest_price=False, include_aircraft=True)] 291 | if self.is_charter: 292 | description_parts.append("* This is a charter flight.") 293 | description_parts.append(f"* This ticket is offered by {len(self.terms)} agencies with the following terms:") 294 | for term_id, terms in self.terms.items(): 295 | agency = self.batch_ref.gates_info.get(term_id) 296 | if agency: 297 | description_parts.append(f"* Agency {agency.label} (internal agency ID: {term_id})") 298 | else: 299 | description_parts.append(f"* Agency (internal agency ID: {term_id})") 300 | description_parts.append(f" * **Price:** {terms.price} {terms.currency.upper()} " \ 301 | f"(in user currency: {convert_unified_price_to_user(terms.unified_price, self.batch_ref.currency)} {self.batch_ref.currency.upper()})") 302 | 303 | description_parts.append(f" * Baggage info:") 304 | for segment_idx, segment in enumerate(self.segment): 305 | for flight_idx, flight in enumerate(segment.flight): 306 | baggage = terms.flights_baggage[segment_idx][flight_idx] 307 | 308 | handbag = terms.flights_handbags[segment_idx][flight_idx] 309 | description_parts.append(f" * Flight {flight.get_full_flight_number()}: \n" \ 310 | f" * {parse_baggage_string(baggage)} \n" \ 311 | f" * {parse_carry_on_string(handbag)}") 312 | if self.tags: 313 | description_parts.append(f"* Tags: {', '.join(self.tags)}") 314 | 315 | return "\n".join(description_parts) 316 | 317 | def parse_baggage_string(baggage_str: bool | str, ) -> Union[bool, str]: 318 | """ 319 | "" — there is no information about baggage 320 | false — baggage is not included in the price 321 | 0PC — without baggage 322 | {int}PC{int} — number of bags by %somevalue% kilogram. For example, 2PC23 — means two baggage pieces of 23 kg 323 | {int}PC - number of bags without weight information. For example, 1PC means one piece of luggage. 324 | {int} — number of bags does not matter, the total weight is limited 325 | """ 326 | if isinstance(baggage_str, bool) and baggage_str == False: 327 | return "Baggage is not included in the price" 328 | elif baggage_str == '': 329 | return "No baggage information available" 330 | elif baggage_str == "0PC": 331 | return "Baggage is not included in the price" 332 | elif "PC" in baggage_str: 333 | parts = baggage_str.split("PC") 334 | if parts[0].isdigit(): 335 | return f"{parts[0]} piece(s) of baggage" 336 | else: 337 | return "no baggage information" 338 | return "no baggage information" 339 | 340 | def parse_carry_on_string(baggage_str: bool | str, ) -> Union[bool, str]: 341 | """ 342 | "" — there is no information about baggage 343 | false — baggage is not included in the price 344 | 0PC — without baggage 345 | {int}PC{int} — number of bags by %somevalue% kilogram. For example, 2PC23 — means two baggage pieces of 23 kg 346 | {int}PC - number of bags without weight information. For example, 1PC means one piece of luggage. 347 | {int} — number of bags does not matter, the total weight is limited 348 | """ 349 | if isinstance(baggage_str, bool) and baggage_str == False: 350 | return "Carry-on piece is not included in the price" 351 | elif baggage_str == '': 352 | return "No Carry-on piece information available" 353 | elif baggage_str == "0PC": 354 | return "Carry-on piece is not included in the price" 355 | elif "PC" in baggage_str: 356 | parts = baggage_str.split("PC") 357 | if parts[0].isdigit(): 358 | return f"{parts[0]} piece(s) of carry-on piece" 359 | else: 360 | return "no carry-on piece information" 361 | return "no Carry-on piece information" 362 | 363 | class Coordinates(BaseModel): 364 | lat: float 365 | lon: float 366 | 367 | 368 | class Airport(BaseModel): 369 | name: str 370 | city: str 371 | city_code: str 372 | country: str 373 | country_code: str 374 | time_zone: str 375 | coordinates: Coordinates 376 | 377 | 378 | class GateError(BaseModel): 379 | code: int 380 | tos: str 381 | 382 | 383 | class Gate(BaseModel): 384 | count: int 385 | good_count: int 386 | bad_count: Dict[str, Any] 387 | duration: float 388 | id: int 389 | gate_label: str 390 | merged_codeshares: int 391 | error: GateError 392 | created_at: int 393 | server_name: str 394 | cache: bool 395 | cache_search_uuid: str 396 | 397 | 398 | class Meta(BaseModel): 399 | uuid: str 400 | gates: List[Gate] 401 | 402 | 403 | class Airline(BaseModel): 404 | id: Optional[int] = None 405 | iata: str 406 | lowcost: Optional[bool] = False 407 | average_rate: Optional[float] = None 408 | rates: Optional[int] = None 409 | name: Optional[str] = None 410 | brand_color: Optional[str] = None 411 | 412 | 413 | class GateInfo(BaseModel): 414 | id: int 415 | label: str 416 | payment_methods: List[str] 417 | mobile_version: Optional[bool] = None 418 | productivity: int 419 | currency_code: str 420 | 421 | 422 | class SegmentInfo(BaseModel): 423 | origin: str 424 | origin_country: str 425 | original_origin: str 426 | destination: str 427 | destination_country: str 428 | original_destination: str 429 | date: str 430 | depart_date: str 431 | 432 | 433 | class ProposalsBatchModel(BaseModel): 434 | proposals: List[Proposal] 435 | airports: Dict[str, Airport] 436 | search_id: str 437 | chunk_id: str 438 | meta: Meta 439 | airlines: Dict[str, Airline] 440 | gates_info: Dict[str, GateInfo] 441 | flight_info: Dict[str, Any] 442 | segments: List[SegmentInfo] 443 | market: str 444 | clean_marker: str 445 | open_jaw: bool 446 | currency: str 447 | initiated_at: str 448 | 449 | def combine_with(self, other: 'ProposalsBatchModel') -> 'ProposalsBatchModel': 450 | """ 451 | Combine this batch with another batch of proposals. 452 | 453 | Args: 454 | other: Another ProposalsBatchModel to combine with this one 455 | 456 | Returns: 457 | A new ProposalsBatchModel with combined data 458 | """ 459 | # Combine proposals 460 | 461 | combined_proposals = self.proposals.copy() 462 | 463 | for pi2, p2 in enumerate(other.proposals): 464 | exists = False 465 | for pi1, p1 in enumerate(self.proposals): 466 | if p1.sign == p2.sign: 467 | combined_proposals[pi1].merge_terms(p2) 468 | exists = True 469 | if not exists: 470 | combined_proposals.append(p2) 471 | 472 | # Combine airports (merge dictionaries, other takes precedence for conflicts) 473 | combined_airports = {**self.airports, **other.airports} 474 | 475 | # Combine airlines (merge dictionaries, other takes precedence for conflicts) 476 | combined_airlines = {**self.airlines, **other.airlines} 477 | 478 | # Combine gates_info (merge dictionaries, other takes precedence for conflicts) 479 | combined_gates_info = {**self.gates_info, **other.gates_info} 480 | 481 | # Combine flight_info (merge dictionaries, other takes precedence for conflicts) 482 | combined_flight_info = {**self.flight_info, **other.flight_info} 483 | 484 | # Combine gates in meta 485 | combined_gates = self.meta.gates + other.meta.gates 486 | combined_meta = Meta( 487 | uuid=other.meta.uuid, # Use the latest UUID 488 | gates=combined_gates 489 | ) 490 | 491 | if self.segments != other.segments: 492 | raise ValueError("Segments in both batches must be identical to combine them.") 493 | 494 | return ProposalsBatchModel( 495 | proposals=combined_proposals, 496 | airports=combined_airports, 497 | search_id=other.search_id, # Use the latest search_id 498 | chunk_id=other.chunk_id, # Use the latest chunk_id 499 | meta=combined_meta, 500 | airlines=combined_airlines, 501 | gates_info=combined_gates_info, 502 | flight_info=combined_flight_info, 503 | segments=other.segments, # Use the latest segments 504 | market=other.market, # Use the latest market 505 | clean_marker=other.clean_marker, # Use the latest clean_marker 506 | open_jaw=other.open_jaw, # Use the latest open_jaw 507 | currency=other.currency, # Use the latest currency 508 | initiated_at=other.initiated_at # Use the latest initiated_at 509 | ) 510 | 511 | def get_proposal_by_id(self, proposal_id: str) -> Optional[Proposal]: 512 | for proposal in self.proposals: 513 | if proposal.sign == proposal_id: 514 | return proposal 515 | return None 516 | 517 | def apply_filters(self, filters: FiltersModel) -> 'ProposalsBatchModel': 518 | """ 519 | Apply filters to the proposals and return a new ProposalsBatchModel with filtered results. 520 | 521 | Args: 522 | filters: FiltersModel containing all filter criteria 523 | 524 | Returns: 525 | New ProposalsBatchModel with filtered proposals 526 | """ 527 | filtered_proposals = [] 528 | 529 | for proposal in self.proposals: 530 | if self._proposal_matches_filters(proposal, filters): 531 | filtered_proposals.append(proposal) 532 | 533 | # Apply sorting 534 | if filters.sorting: 535 | filtered_proposals = self._sort_proposals(filtered_proposals, filters.sorting) 536 | 537 | # Create new instance with filtered proposals 538 | return ProposalsBatchModel( 539 | proposals=filtered_proposals, 540 | airports=self.airports, 541 | search_id=self.search_id, 542 | chunk_id=self.chunk_id, 543 | meta=self.meta, 544 | airlines=self.airlines, 545 | gates_info=self.gates_info, 546 | flight_info=self.flight_info, 547 | segments=self.segments, 548 | market=self.market, 549 | clean_marker=self.clean_marker, 550 | open_jaw=self.open_jaw, 551 | currency=self.currency, 552 | initiated_at=self.initiated_at 553 | ) 554 | 555 | def _proposal_matches_filters(self, proposal: Proposal, filters: FiltersModel) -> bool: 556 | """Check if a proposal matches all filter criteria""" 557 | 558 | # Duration filter 559 | if filters.max_total_duration is not None: 560 | if proposal.total_duration > filters.max_total_duration: 561 | return False 562 | 563 | # Price filter 564 | if filters.max_price is not None: 565 | price = proposal.get_cheapest_unified_price() 566 | if filters.max_price is not None and convert_unified_price_to_user(price, self.currency) > filters.max_price: 567 | return False 568 | 569 | # Airlines filter 570 | if filters.allowed_airlines is not None: 571 | # Check if any of the proposal's carriers are in the allowed list 572 | allowed_set = set(filters.allowed_airlines) 573 | proposal_carriers = set(proposal.carriers) 574 | if not proposal_carriers.intersection(allowed_set): 575 | return False 576 | 577 | # Stops filter 578 | if filters.max_stops is not None: 579 | if proposal.max_stops > filters.max_stops: 580 | return False 581 | 582 | # Segment time filters 583 | if filters.segment_time_filters is not None: 584 | for seg_idx, segment in enumerate(proposal.segment): 585 | if seg_idx < len(filters.segment_time_filters): 586 | seg_filter = filters.segment_time_filters[seg_idx] 587 | if not self._segment_matches_time_filter(segment, seg_filter): 588 | return False 589 | 590 | return True 591 | 592 | def _segment_matches_time_filter(self, segment: Segment, time_filter: SegmentTimeFilter) -> bool: 593 | """Check if a segment matches time filter criteria""" 594 | 595 | for flight in segment.flight: 596 | # Check departure time 597 | if time_filter.departure_time_range: 598 | if not self._time_in_range(flight.departure_time, time_filter.departure_time_range): 599 | return False 600 | 601 | # Check arrival time 602 | if time_filter.arrival_time_range: 603 | if not self._time_in_range(flight.arrival_time, time_filter.arrival_time_range): 604 | return False 605 | 606 | return True 607 | 608 | def _time_in_range(self, flight_time: str, time_range: TimeRange) -> bool: 609 | """Check if flight time is within the specified range""" 610 | if not time_range.start_time and not time_range.end_time: 611 | return True 612 | 613 | try: 614 | # Parse flight time (assuming format like "14:30") 615 | flight_time_obj = datetime.strptime(flight_time, '%H:%M').time() 616 | 617 | if time_range.start_time: 618 | start_time_obj = datetime.strptime(time_range.start_time, '%H:%M').time() 619 | if flight_time_obj < start_time_obj: 620 | return False 621 | 622 | if time_range.end_time: 623 | end_time_obj = datetime.strptime(time_range.end_time, '%H:%M').time() 624 | if flight_time_obj > end_time_obj: 625 | return False 626 | 627 | return True 628 | except ValueError: 629 | # If time parsing fails, assume it matches (don't filter out due to data issues) 630 | return True 631 | 632 | def _sort_proposals(self, proposals: List[Proposal], sorting: SortingMethod) -> List[Proposal]: 633 | """Sort proposals based on the specified sorting method""" 634 | 635 | if sorting == SortingMethod.CHEAP_FIRST: 636 | return sorted(proposals, key=lambda p: p.get_cheapest_unified_price()) 637 | 638 | elif sorting == SortingMethod.EARLY_DEPARTURE_FIRST: 639 | return sorted(proposals, key=lambda p: p.get_earliest_departure_time()) 640 | 641 | elif sorting == SortingMethod.EARLY_ARRIVAL_FIRST: 642 | return sorted(proposals, key=lambda p: p.get_latest_arrival_time()) 643 | 644 | elif sorting == SortingMethod.MINIMAL_DURATION_FIRST: 645 | return sorted(proposals, key=lambda p: p.total_duration) 646 | 647 | else: 648 | return proposals 649 | 650 | 651 | 652 | def get_description(self) -> str: 653 | """ 654 | Generate a human-readable description of the flight search batch. 655 | 656 | Returns: 657 | A formatted string describing the search results 658 | """ 659 | if not self.proposals: 660 | return f"Search results (search_id = {self.search_id}) contains no options." 661 | 662 | description_parts = [] 663 | 664 | # Header with proposal count and segments 665 | segment_count = len(self.segments) 666 | 667 | description_parts.append(f"Search results (search_id = {self.search_id}) contains {len(self.proposals)} flight options with {segment_count} flight segment{'s' if segment_count != 1 else ''}:") 668 | 669 | # Analyze each segment 670 | for i, segment in enumerate(self.segments, 1): 671 | origin_airport = self.airports.get(segment.origin, None) 672 | dest_airport = self.airports.get(segment.destination, None) 673 | 674 | origin_name = f"{origin_airport.city} ({segment.origin})" if origin_airport else segment.origin 675 | dest_name = f"{dest_airport.city} ({segment.destination})" if dest_airport else segment.destination 676 | 677 | # Parse date to make it more readable 678 | try: 679 | from datetime import datetime 680 | date_obj = datetime.strptime(segment.date, "%Y-%m-%d") 681 | formatted_date = date_obj.strftime("%d.%m.%Y") 682 | except: 683 | formatted_date = segment.date 684 | 685 | description_parts.append(f"{i}. from {origin_name} to {dest_name} on departure date {formatted_date}") 686 | 687 | # Stops count and pricing analysis 688 | stops_analysis = self._analyze_stops_and_pricing() 689 | if stops_analysis: 690 | description_parts.append(f"Stops count ranges:") 691 | for stops, min_price in stops_analysis.items(): 692 | stops_text = "direct flights" if stops == 0 else f"{stops} stop{'s' if stops != 1 else ''}" 693 | description_parts.append(f"- {stops_text} - min price {convert_unified_price_to_user(min_price, self.currency)} {self.currency.upper()}") 694 | 695 | # Price range 696 | unified_prices = [] 697 | for proposal in self.proposals: 698 | unified_price = proposal.get_cheapest_unified_price() 699 | unified_prices.append(unified_price) 700 | 701 | if unified_prices: 702 | min_price = min(unified_prices) 703 | max_price = max(unified_prices) 704 | description_parts.append(f"Total price ranges from {convert_unified_price_to_user(min_price, self.currency)} {self.currency.upper()} to {convert_unified_price_to_user(max_price, self.currency)} {self.currency.upper()}") 705 | 706 | # Duration range 707 | durations = [p.total_duration for p in self.proposals] 708 | if durations: 709 | min_duration = min(durations) 710 | max_duration = max(durations) 711 | 712 | description_parts.append(f"Total duration ranges from {format_duration(min_duration)} to {format_duration(max_duration)}") 713 | 714 | # Airlines 715 | all_carriers = set() 716 | for proposal in self.proposals: 717 | all_carriers.update(proposal.carriers) 718 | 719 | if all_carriers: 720 | airline_names = [] 721 | for carrier in sorted(all_carriers): 722 | airline = self.airlines.get(carrier) 723 | if airline: 724 | airline_names.append(f"{airline.name} ({carrier})") 725 | else: 726 | airline_names.append(carrier) 727 | 728 | description_parts.append(f"Airlines options: {', '.join(airline_names)}") 729 | 730 | return "\n".join(description_parts) 731 | 732 | def _analyze_stops_and_pricing(self) -> Dict[int, int]: 733 | """Analyze pricing by number of stops.""" 734 | stops_pricing = {} 735 | 736 | for proposal in self.proposals: 737 | stops = proposal.max_stops 738 | price = proposal.get_cheapest_unified_price() 739 | 740 | if stops not in stops_pricing or price < stops_pricing[stops]: 741 | stops_pricing[stops] = price 742 | 743 | return dict(sorted(stops_pricing.items())) 744 | 745 | def parse_proposals_batch(api_response: dict) -> ProposalsBatchModel: 746 | """ 747 | Parse API response dictionary into ProposalsBatchModel. 748 | 749 | Args: 750 | api_response: Dictionary containing the API response 751 | 752 | Returns: 753 | ProposalsBatchModel instance with flattened flight data 754 | """ 755 | batch = ProposalsBatchModel(**api_response) 756 | for i in range(len(batch.proposals)): 757 | batch.proposals[i].batch_ref = batch 758 | return batch 759 | 760 | 761 | # Example usage: 762 | """ 763 | # Parse batch 764 | batch = parse_proposals_batch(api_response) 765 | 766 | # Create filters 767 | filters = FiltersModel( 768 | max_total_duration=600, # 10 hours max 769 | min_price=100, 770 | max_price=1000, 771 | allowed_airlines=["LH", "BA", "AF"], # Lufthansa, British Airways, Air France 772 | segment_time_filters=[ 773 | SegmentTimeFilter( 774 | departure_time_range=TimeRange(start_time="06:00", end_time="22:00"), 775 | arrival_time_range=TimeRange(start_time="08:00", end_time="23:59") 776 | ) 777 | ], 778 | max_stops=1, 779 | sorting=SortingMethod.CHEAP_FIRST 780 | ) 781 | 782 | # Apply filters 783 | filtered_batch = batch.apply_filters(filters) 784 | 785 | print(f"Original proposals: {len(batch.proposals)}") 786 | print(f"Filtered proposals: {len(filtered_batch.proposals)}") 787 | """ ```