This is page 7 of 33. Use http://codebase.md/pragmar/mcp_server_webcrawl?page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── CONTRIBUTING.md
├── docs
│ ├── _images
│ │ ├── interactive.document.webp
│ │ ├── interactive.search.webp
│ │ └── mcpswc.svg
│ ├── _modules
│ │ ├── index.html
│ │ ├── mcp_server_webcrawl
│ │ │ ├── crawlers
│ │ │ │ ├── archivebox
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ └── tests.html
│ │ │ │ ├── base
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── api.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ ├── indexed.html
│ │ │ │ │ └── tests.html
│ │ │ │ ├── httrack
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ └── tests.html
│ │ │ │ ├── interrobot
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ └── tests.html
│ │ │ │ ├── katana
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ └── tests.html
│ │ │ │ ├── siteone
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ └── tests.html
│ │ │ │ ├── warc
│ │ │ │ │ ├── adapter.html
│ │ │ │ │ ├── crawler.html
│ │ │ │ │ └── tests.html
│ │ │ │ └── wget
│ │ │ │ ├── adapter.html
│ │ │ │ ├── crawler.html
│ │ │ │ └── tests.html
│ │ │ ├── crawlers.html
│ │ │ ├── extras
│ │ │ │ ├── markdown.html
│ │ │ │ ├── regex.html
│ │ │ │ ├── snippets.html
│ │ │ │ ├── thumbnails.html
│ │ │ │ └── xpath.html
│ │ │ ├── interactive
│ │ │ │ ├── highlights.html
│ │ │ │ ├── search.html
│ │ │ │ ├── session.html
│ │ │ │ └── ui.html
│ │ │ ├── main.html
│ │ │ ├── models
│ │ │ │ ├── resources.html
│ │ │ │ └── sites.html
│ │ │ ├── templates
│ │ │ │ └── tests.html
│ │ │ ├── utils
│ │ │ │ ├── blobs.html
│ │ │ │ ├── cli.html
│ │ │ │ ├── logger.html
│ │ │ │ ├── querycache.html
│ │ │ │ ├── server.html
│ │ │ │ └── tools.html
│ │ │ └── utils.html
│ │ └── re.html
│ ├── _sources
│ │ ├── guides
│ │ │ ├── archivebox.rst.txt
│ │ │ ├── httrack.rst.txt
│ │ │ ├── interrobot.rst.txt
│ │ │ ├── katana.rst.txt
│ │ │ ├── siteone.rst.txt
│ │ │ ├── warc.rst.txt
│ │ │ └── wget.rst.txt
│ │ ├── guides.rst.txt
│ │ ├── index.rst.txt
│ │ ├── installation.rst.txt
│ │ ├── interactive.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.archivebox.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.base.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.httrack.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.interrobot.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.katana.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.siteone.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.warc.rst.txt
│ │ ├── mcp_server_webcrawl.crawlers.wget.rst.txt
│ │ ├── mcp_server_webcrawl.extras.rst.txt
│ │ ├── mcp_server_webcrawl.interactive.rst.txt
│ │ ├── mcp_server_webcrawl.models.rst.txt
│ │ ├── mcp_server_webcrawl.rst.txt
│ │ ├── mcp_server_webcrawl.templates.rst.txt
│ │ ├── mcp_server_webcrawl.utils.rst.txt
│ │ ├── modules.rst.txt
│ │ ├── prompts.rst.txt
│ │ └── usage.rst.txt
│ ├── _static
│ │ ├── _sphinx_javascript_frameworks_compat.js
│ │ ├── basic.css
│ │ ├── css
│ │ │ ├── badge_only.css
│ │ │ ├── fonts
│ │ │ │ ├── fontawesome-webfont.eot
│ │ │ │ ├── fontawesome-webfont.svg
│ │ │ │ ├── fontawesome-webfont.ttf
│ │ │ │ ├── fontawesome-webfont.woff
│ │ │ │ ├── fontawesome-webfont.woff2
│ │ │ │ ├── lato-bold-italic.woff
│ │ │ │ ├── lato-bold-italic.woff2
│ │ │ │ ├── lato-bold.woff
│ │ │ │ ├── lato-bold.woff2
│ │ │ │ ├── lato-normal-italic.woff
│ │ │ │ ├── lato-normal-italic.woff2
│ │ │ │ ├── lato-normal.woff
│ │ │ │ ├── lato-normal.woff2
│ │ │ │ ├── Roboto-Slab-Bold.woff
│ │ │ │ ├── Roboto-Slab-Bold.woff2
│ │ │ │ ├── Roboto-Slab-Regular.woff
│ │ │ │ └── Roboto-Slab-Regular.woff2
│ │ │ └── theme.css
│ │ ├── doctools.js
│ │ ├── documentation_options.js
│ │ ├── file.png
│ │ ├── fonts
│ │ │ ├── Lato
│ │ │ │ ├── lato-bold.eot
│ │ │ │ ├── lato-bold.ttf
│ │ │ │ ├── lato-bold.woff
│ │ │ │ ├── lato-bold.woff2
│ │ │ │ ├── lato-bolditalic.eot
│ │ │ │ ├── lato-bolditalic.ttf
│ │ │ │ ├── lato-bolditalic.woff
│ │ │ │ ├── lato-bolditalic.woff2
│ │ │ │ ├── lato-italic.eot
│ │ │ │ ├── lato-italic.ttf
│ │ │ │ ├── lato-italic.woff
│ │ │ │ ├── lato-italic.woff2
│ │ │ │ ├── lato-regular.eot
│ │ │ │ ├── lato-regular.ttf
│ │ │ │ ├── lato-regular.woff
│ │ │ │ └── lato-regular.woff2
│ │ │ └── RobotoSlab
│ │ │ ├── roboto-slab-v7-bold.eot
│ │ │ ├── roboto-slab-v7-bold.ttf
│ │ │ ├── roboto-slab-v7-bold.woff
│ │ │ ├── roboto-slab-v7-bold.woff2
│ │ │ ├── roboto-slab-v7-regular.eot
│ │ │ ├── roboto-slab-v7-regular.ttf
│ │ │ ├── roboto-slab-v7-regular.woff
│ │ │ └── roboto-slab-v7-regular.woff2
│ │ ├── images
│ │ │ ├── interactive.document.png
│ │ │ ├── interactive.document.webp
│ │ │ ├── interactive.search.png
│ │ │ ├── interactive.search.webp
│ │ │ └── mcpswc.svg
│ │ ├── jquery.js
│ │ ├── js
│ │ │ ├── badge_only.js
│ │ │ ├── theme.js
│ │ │ └── versions.js
│ │ ├── language_data.js
│ │ ├── minus.png
│ │ ├── plus.png
│ │ ├── pygments.css
│ │ ├── searchtools.js
│ │ └── sphinx_highlight.js
│ ├── .buildinfo
│ ├── .nojekyll
│ ├── genindex.html
│ ├── guides
│ │ ├── archivebox.html
│ │ ├── httrack.html
│ │ ├── interrobot.html
│ │ ├── katana.html
│ │ ├── siteone.html
│ │ ├── warc.html
│ │ └── wget.html
│ ├── guides.html
│ ├── index.html
│ ├── installation.html
│ ├── interactive.html
│ ├── mcp_server_webcrawl.crawlers.archivebox.html
│ ├── mcp_server_webcrawl.crawlers.base.html
│ ├── mcp_server_webcrawl.crawlers.html
│ ├── mcp_server_webcrawl.crawlers.httrack.html
│ ├── mcp_server_webcrawl.crawlers.interrobot.html
│ ├── mcp_server_webcrawl.crawlers.katana.html
│ ├── mcp_server_webcrawl.crawlers.siteone.html
│ ├── mcp_server_webcrawl.crawlers.warc.html
│ ├── mcp_server_webcrawl.crawlers.wget.html
│ ├── mcp_server_webcrawl.extras.html
│ ├── mcp_server_webcrawl.html
│ ├── mcp_server_webcrawl.interactive.html
│ ├── mcp_server_webcrawl.models.html
│ ├── mcp_server_webcrawl.templates.html
│ ├── mcp_server_webcrawl.utils.html
│ ├── modules.html
│ ├── objects.inv
│ ├── prompts.html
│ ├── py-modindex.html
│ ├── search.html
│ ├── searchindex.js
│ └── usage.html
├── LICENSE
├── MANIFEST.in
├── prompts
│ ├── audit404.md
│ ├── auditfiles.md
│ ├── auditperf.md
│ ├── auditseo.md
│ ├── gopher.md
│ ├── README.md
│ └── testsearch.md
├── pyproject.toml
├── README.md
├── setup.py
├── sphinx
│ ├── _static
│ │ └── images
│ │ ├── interactive.document.png
│ │ ├── interactive.document.webp
│ │ ├── interactive.search.png
│ │ ├── interactive.search.webp
│ │ └── mcpswc.svg
│ ├── _templates
│ │ └── layout.html
│ ├── conf.py
│ ├── guides
│ │ ├── archivebox.rst
│ │ ├── httrack.rst
│ │ ├── interrobot.rst
│ │ ├── katana.rst
│ │ ├── siteone.rst
│ │ ├── warc.rst
│ │ └── wget.rst
│ ├── guides.rst
│ ├── index.rst
│ ├── installation.rst
│ ├── interactive.rst
│ ├── make.bat
│ ├── Makefile
│ ├── mcp_server_webcrawl.crawlers.archivebox.rst
│ ├── mcp_server_webcrawl.crawlers.base.rst
│ ├── mcp_server_webcrawl.crawlers.httrack.rst
│ ├── mcp_server_webcrawl.crawlers.interrobot.rst
│ ├── mcp_server_webcrawl.crawlers.katana.rst
│ ├── mcp_server_webcrawl.crawlers.rst
│ ├── mcp_server_webcrawl.crawlers.siteone.rst
│ ├── mcp_server_webcrawl.crawlers.warc.rst
│ ├── mcp_server_webcrawl.crawlers.wget.rst
│ ├── mcp_server_webcrawl.extras.rst
│ ├── mcp_server_webcrawl.interactive.rst
│ ├── mcp_server_webcrawl.models.rst
│ ├── mcp_server_webcrawl.rst
│ ├── mcp_server_webcrawl.templates.rst
│ ├── mcp_server_webcrawl.utils.rst
│ ├── modules.rst
│ ├── prompts.rst
│ ├── readme.txt
│ └── usage.rst
└── src
└── mcp_server_webcrawl
├── __init__.py
├── crawlers
│ ├── __init__.py
│ ├── archivebox
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── crawler.py
│ │ └── tests.py
│ ├── base
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── api.py
│ │ ├── crawler.py
│ │ ├── indexed.py
│ │ └── tests.py
│ ├── httrack
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── crawler.py
│ │ └── tests.py
│ ├── interrobot
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── crawler.py
│ │ └── tests.py
│ ├── katana
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── crawler.py
│ │ └── tests.py
│ ├── siteone
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── crawler.py
│ │ └── tests.py
│ ├── warc
│ │ ├── __init__.py
│ │ ├── adapter.py
│ │ ├── crawler.py
│ │ └── tests.py
│ └── wget
│ ├── __init__.py
│ ├── adapter.py
│ ├── crawler.py
│ └── tests.py
├── extras
│ ├── __init__.py
│ ├── markdown.py
│ ├── regex.py
│ ├── snippets.py
│ ├── thumbnails.py
│ └── xpath.py
├── interactive
│ ├── __init__.py
│ ├── highlights.py
│ ├── search.py
│ ├── session.py
│ ├── ui.py
│ └── views
│ ├── base.py
│ ├── document.py
│ ├── help.py
│ ├── requirements.py
│ ├── results.py
│ └── searchform.py
├── main.py
├── models
│ ├── __init__.py
│ ├── base.py
│ ├── resources.py
│ └── sites.py
├── settings.py
├── templates
│ ├── __init__.py
│ ├── markdown.xslt
│ ├── tests_core.html
│ └── tests.py
└── utils
├── __init__.py
├── cli.py
├── logger.py
├── parser.py
├── parsetab.py
├── search.py
├── server.py
├── tests.py
└── tools.py
```
# Files
--------------------------------------------------------------------------------
/src/mcp_server_webcrawl/interactive/views/searchform.py:
--------------------------------------------------------------------------------
```python
import curses
from typing import TYPE_CHECKING
from mcp_server_webcrawl.interactive.ui import (
UiState, InputRadio, InputRadioGroup, InputText,
ThemeDefinition, NavigationDirection, safe_addstr
)
from mcp_server_webcrawl.interactive.views.base import BaseCursesView
from mcp_server_webcrawl.models.sites import SiteResult
from mcp_server_webcrawl.interactive.ui import safe_addstr
if TYPE_CHECKING:
from mcp_server_webcrawl.interactive.session import InteractiveSession
LAYOUT_QUERY_MAX_WIDTH = 50
LAYOUT_QUERY_MARGIN = 11
LAYOUT_QUERY_OFFSET = 9
LAYOUT_FILTER_COLUMN_PADDING = 8
LAYOUT_SORT_COLUMN_PADDING = 6
LAYOUT_FILTER_TO_SORT_SPACING = 8
LAYOUT_SORT_TO_SITES_SPACING = 6
LAYOUT_SITE_COLUMN_WIDTH = 22
LAYOUT_SITE_COLUMN_SPACING = 2
LAYOUT_SITES_VERTICAL_OFFSET = 6
LAYOUT_SITES_MIN_WIDTH_REQUIREMENT = 16
LAYOUT_CONSTRAINED_SITES_PER_COLUMN = 3
LAYOUT_TRUNCATED_LABEL_MAX_LENGTH = 18
LAYOUT_OVERFLOW_INDICATOR_MARGIN = 2
class SearchFormNavigationGrid:
def __init__(self, ui_state: UiState, filter_group: InputRadioGroup, sort_group: InputRadioGroup,
sites_group: InputRadioGroup, sites_per_column: int):
"""
Create virtual grid for navigation:
query(0)
filter0 sort0 site0 site3 site6
filter1 sort1 site1 site4 site7
sort2 site2 site5 site8+
"""
self.__grid: dict[tuple[int, int], int] = {}
self.__reverse_grid: dict[int, tuple[int, int]] = {}
# query spans columns 0-2, row 0
for col in range(3):
self.__grid[(0, col)] = 0
self.__reverse_grid[0] = (0, 0)
for i, _ in enumerate(filter_group.radios):
row = 1 + i
index = 1 + i # filter indices start at 1
self.__grid[(row, 0)] = index
self.__reverse_grid[index] = (row, 0)
sort_start_index = 1 + len(filter_group.radios)
for i, _ in enumerate(sort_group.radios):
row = 1 + i
index = sort_start_index + i
self.__grid[(row, 1)] = index
self.__reverse_grid[index] = (row, 1)
sites_start_index = 1 + len(filter_group.radios) + len(sort_group.radios)
self.__ui_state = ui_state
for i, _ in enumerate(sites_group.radios):
row = 1 + (i % sites_per_column)
col = 2 + (i // sites_per_column)
index = sites_start_index + i
self.__grid[(row, col)] = index
self.__reverse_grid[index] = (row, col)
def __rightmost_column(self, row: int) -> int:
"""
Get the rightmost column that has content in the given row.
"""
max_col = -1
for (r, c) in self.__grid.keys():
if r == row:
max_col = max(max_col, c)
return max_col
def __leftmost_column(self, row: int) -> int:
"""
Get the leftmost column that has content in the given row.
"""
min_col = float('inf')
for (r, c) in self.__grid.keys():
if r == row:
min_col = min(min_col, c)
return min_col if min_col != float('inf') else -1
def left(self, current_index: int) -> int | None:
"""
Navigate left from current index. Wraps to rightmost element if at left edge.
"""
if current_index not in self.__reverse_grid:
return None
row, col = self.__reverse_grid[current_index]
# move normally if destination exists
if col > 0:
new_pos = (row, col - 1)
if new_pos in self.__grid:
return self.__grid[new_pos]
# wrap on edge
rightmost_col = self.__rightmost_column(row)
if rightmost_col >= 0 and rightmost_col != col:
wrap_pos = (row, rightmost_col)
return self.__grid.get(wrap_pos)
return None
def right(self, current_index: int) -> int | None:
"""
Navigate right from current index. Wraps to leftmost element if at right edge.
"""
if current_index not in self.__reverse_grid:
return None
row, col = self.__reverse_grid[current_index]
# move normally if destination exists
new_pos = (row, col + 1)
if new_pos in self.__grid:
return self.__grid[new_pos]
# wrap on edge
leftmost_col = self.__leftmost_column(row)
if leftmost_col >= 0 and leftmost_col != col:
wrap_pos = (row, leftmost_col)
return self.__grid.get(wrap_pos)
return None
def up(self, current_index: int) -> int | None:
"""
Navigate up from current index. From any radio column goes to query(0).
"""
if current_index not in self.__reverse_grid:
return None
row, col = self.__reverse_grid[current_index]
if row == 0:
return None
if row == 1:
return 0
# otherwise, move up normally
if row > 1:
new_pos = (row - 1, col)
return self.__grid.get(new_pos)
return None
def down(self, current_index: int) -> int | None:
"""
Navigate down from current index.
"""
if current_index not in self.__reverse_grid:
return None
# In SEARCH_INIT mode, advance by one
if self.__ui_state == UiState.SEARCH_INIT:
return current_index + 1 if current_index + 1 in self.__reverse_grid else None
row, col = self.__reverse_grid[current_index]
new_pos = (row + 1, col)
return self.__grid.get(new_pos)
class SearchFormView(BaseCursesView):
"""
Handles search form state and rendering.
Takes over all the form_* properties and methods from session.
"""
def __init__(self, session: 'InteractiveSession', sites: list[SiteResult]):
"""
Initialize the search form view.
Args:
session: The interactive session instance
sites: List of available sites for selection
"""
super().__init__(session)
self.__search_attempted: bool = False
self.__sites: list[SiteResult] = sites
self.__sites_selected: list[SiteResult] = []
self.__query_input = InputText(initial_value="", label="Query")
self.__limit = 10
self.__offset = 0
if sites:
self.__sites_selected.append(self.__sites[0])
self.__filter_group: InputRadioGroup = InputRadioGroup("filter")
self.__sort_group: InputRadioGroup = InputRadioGroup("sort")
self.__sites_group: InputRadioGroup = InputRadioGroup("site", sites=self.__sites)
@property
def filter(self) -> str:
return self.__filter_group.value
@property
def limit(self) -> str:
return self.__limit
@property
def offset(self) -> str:
return self.__offset
@property
def query(self) -> str:
return self.__query_input.value
@property
def sort(self) -> str:
return self.__sort_group.value.lower() if self.__sort_group.value is not None else "+url"
def clear_query(self) -> None:
"""
Clear only the query, preserve selections (was session method).
"""
self.__search_attempted = False
self.__query_input.clear()
self._selected_index = 0
self.__offset = 0
def focus(self):
"""
Set focus on this view.
"""
self._focused = True
def get_selected_sites(self) -> list[SiteResult]:
return self.__sites_selected.copy()
def handle_input(self, key: int) -> bool:
"""
Handle keyboard input and trigger search when state changes.
Args:
key: The curses key code from user input
Returns:
bool: True if the input was handled, False otherwise
"""
handlers: dict[int, callable] = {
curses.KEY_UP: lambda: self.__navigate_form_selection(NavigationDirection.UP),
curses.KEY_DOWN: lambda: self.__navigate_form_selection(NavigationDirection.DOWN),
curses.KEY_LEFT: lambda: self.__handle_horizontal_arrow(NavigationDirection.LEFT),
curses.KEY_RIGHT: lambda: self.__handle_horizontal_arrow(NavigationDirection.RIGHT),
ord(' '): self.__handle_spacebar,
ord('\n'): self.__handle_enter,
ord('\r'): self.__handle_enter,
}
handler = handlers.get(key)
if handler:
handler()
return True
if self._selected_index == 0:
if self.__query_input.handle_input(key):
self.session.searchman.autosearch()
return True
return False
def page_next(self, total_results: int) -> bool:
"""
Navigate to next page.
Args:
total_results: Total number of search results available
Returns:
bool: True if page was changed, False otherwise
"""
if self.__offset + self.__limit < total_results:
self.__offset += self.__limit
return True
return False
def page_previous(self) -> bool:
"""
Navigate to previous page.
Returns:
bool: True if page was changed, False otherwise
"""
if self.__offset >= self.__limit:
self.__offset -= self.__limit
return True
return False
def render(self, stdscr: curses.window) -> None:
"""
Render the search form with multi-column sites layout.
"""
xb: int = self.bounds.x
yb: int = self.bounds.y
y_current: int = yb + 2 # y start
y_max: int = yb + self.bounds.height
if not self._renderable(stdscr):
return
safe_addstr(stdscr, y_current, xb + 2, "Query:")
box_width = min(LAYOUT_QUERY_MAX_WIDTH, self.bounds.width - LAYOUT_QUERY_MARGIN)
is_query_selected = (self._focused and self._selected_index == 0)
self.__query_input.render(stdscr, y_current, xb + LAYOUT_QUERY_OFFSET, box_width,
focused=is_query_selected, style=self._get_input_style())
y_current += 2
if y_current >= y_max:
return
# radio column layout - calculated positions based on content width
filter_column_width = self.__filter_group.calculate_group_width() + LAYOUT_FILTER_COLUMN_PADDING
sort_column_width = self.__sort_group.calculate_group_width() + LAYOUT_SORT_COLUMN_PADDING
sort_start_x = filter_column_width + LAYOUT_FILTER_TO_SORT_SPACING
sites_start_x = sort_start_x + sort_column_width + LAYOUT_SORT_TO_SITES_SPACING
safe_addstr(stdscr, y_current, xb + 2, self.__filter_group.label)
safe_addstr(stdscr, y_current, xb + sort_start_x, self.__sort_group.label)
if sites_start_x + LAYOUT_SITES_MIN_WIDTH_REQUIREMENT < self.bounds.width:
safe_addstr(stdscr, y_current, xb + sites_start_x, self.__sites_group.label)
if not self.__sites:
error_style = self.session.get_theme_color_pair(ThemeDefinition.UI_ERROR)
safe_addstr(stdscr, y_current + 1, xb + sites_start_x, "No sites available", error_style)
y_current += 1
available_width = self.bounds.width - sites_start_x - 4
is_constrained = self.session.ui_state == UiState.SEARCH_RESULTS
sites_per_column = (LAYOUT_CONSTRAINED_SITES_PER_COLUMN if is_constrained
else min(self.bounds.height - LAYOUT_SITES_VERTICAL_OFFSET, len(self.__sites_group.radios)))
max_columns = (max(1, available_width // (LAYOUT_SITE_COLUMN_WIDTH + LAYOUT_SITE_COLUMN_SPACING))
if available_width > LAYOUT_SITE_COLUMN_WIDTH else 1)
total_visible_sites = max_columns * sites_per_column
overflow_count = max(0, len(self.__sites_group.radios) - total_visible_sites)
max_rows = max(len(self.__filter_group.radios), len(self.__sort_group.radios), sites_per_column)
for i in range(max_rows):
if y_current >= y_max:
return
# filter radios
if i < len(self.__filter_group.radios):
filter_radio: InputRadio = self.__filter_group.radios[i]
field_index: int = 1 + i
is_selected: bool = self._selected_index == field_index
filter_radio.render(stdscr, y_current, xb + 2, field_index, 100, is_selected)
# sorts radios
if i < len(self.__sort_group.radios):
sort_radio: InputRadio = self.__sort_group.radios[i]
field_index: int = 1 + len(self.__filter_group.radios) + i
is_selected: bool = self._selected_index == field_index
sort_radio.render(stdscr, y_current, xb + sort_start_x, field_index, 100, is_selected)
# sites radios - multiple columns
if sites_start_x + LAYOUT_SITES_MIN_WIDTH_REQUIREMENT < self.bounds.width:
for col in range(max_columns):
site_index = col * sites_per_column + i
if site_index < len(self.__sites_group.radios) and site_index < total_visible_sites:
site_radio: InputRadio = self.__sites_group.radios[site_index]
field_index: int = 1 + len(self.__sort_group.radios) + len(self.__filter_group.radios) + site_index
is_selected: bool = self._selected_index == field_index
col_x = sites_start_x + col * (LAYOUT_SITE_COLUMN_WIDTH + LAYOUT_SITE_COLUMN_SPACING)
original_label = site_radio.label
site_radio.label = self.__truncate_label(original_label)
site_radio.render(stdscr, y_current, xb + col_x, field_index, LAYOUT_TRUNCATED_LABEL_MAX_LENGTH, is_selected)
site_radio.label = original_label # restore original label
# overflow indicator on last row, right-aligned
if (overflow_count > 0 and i == sites_per_column - 1 and
sites_start_x + LAYOUT_SITES_MIN_WIDTH_REQUIREMENT < self.bounds.width):
overflow_text: str = f"+{overflow_count} more"
overflow_x: int = self.bounds.width - len(overflow_text) - LAYOUT_OVERFLOW_INDICATOR_MARGIN
try:
safe_addstr(stdscr, y_current, overflow_x, overflow_text, curses.A_DIM)
except curses.error:
pass
y_current += 1
def set_search_attempted(self) -> None:
"""
Set attempted to True.
"""
self.__search_attempted = True
def unfocus(self):
"""
Remove focus from this view.
"""
self._focused = False
def __get_sites_per_column(self) -> int:
"""
Handle left arrow key navigation.
"""
is_constrained = self.session.ui_state == UiState.SEARCH_RESULTS
return (LAYOUT_CONSTRAINED_SITES_PER_COLUMN if is_constrained
else min(self.bounds.height - LAYOUT_SITES_VERTICAL_OFFSET, len(self.__sites_group.radios)))
def __handle_enter(self) -> None:
"""
Handle ENTER key - only toggles radio buttons, doesn't affect query field.
"""
if self._selected_index == 0: # query field
self.session.searchman.autosearch()
else: # radios
self.__handle_radio_toggle()
if self.session.ui_state != UiState.SEARCH_INIT:
self.session.searchman.autosearch(immediate=True)
def __handle_horizontal_arrow(self, direction: NavigationDirection) -> None:
"""
Handle left/right arrow navigation using the directional grid.
Args:
direction: The navigation direction (LEFT or RIGHT)
"""
if self.session.ui_state is None:
return
# query field handles cursor movement internally
if self._selected_index == 0:
if direction == NavigationDirection.LEFT:
self.__query_input.move_cursor_left()
else:
self.__query_input.move_cursor_right()
return
# use grid navigation for all other fields
grid = SearchFormNavigationGrid(self.session.ui_state, self.__filter_group, self.__sort_group,
self.__sites_group, self.__get_sites_per_column())
if direction == NavigationDirection.LEFT:
new_index = grid.left(self._selected_index)
else:
new_index = grid.right(self._selected_index)
if new_index is not None:
self._selected_index = new_index
def __handle_radio_toggle(self) -> None:
"""
Handle radio button toggles for filters and sites.
"""
filter_index_start: int = 1
sorts_index_start: int = filter_index_start + len(self.__filter_group.radios)
sites_index_start: int = sorts_index_start + len(self.__sort_group.radios)
if self._selected_index >= filter_index_start and self._selected_index < sorts_index_start:
filter_index = self._selected_index - filter_index_start
filter_input: InputRadio = self.__filter_group.radios[filter_index]
filter_input.next_state()
elif self._selected_index >= sorts_index_start and self._selected_index < sites_index_start:
sort_index = self._selected_index - sorts_index_start
sort_input: InputRadio = self.__sort_group.radios[sort_index]
sort_input.next_state()
elif self._selected_index >= sites_index_start:
site_index = self._selected_index - sites_index_start
if site_index < len(self.__sites) and site_index < len(self.__sites_group.radios):
site_input: InputRadio = self.__sites_group.radios[site_index]
site_input.next_state()
self.__sites_selected = [self.__sites[site_index]]
def __handle_spacebar(self) -> None:
"""
Handle spacebar for toggles. Updated for new field order: Query, Filters, Sites.
"""
if self._selected_index == 0: # query field
self.__query_input.insert_char(" ")
self.session.searchman.autosearch()
else: # radios
self.__handle_radio_toggle()
if self.session.ui_state != UiState.SEARCH_INIT:
self.session.searchman.autosearch()
def __navigate_form_selection(self, direction: NavigationDirection) -> None:
"""
Navigate between form fields. Updated for new field order: Query, Filters, Sites.
Args:
direction: The navigation direction (UP or DOWN)
"""
# query(0), filters(1-2), sorts(3-5), sites(...)
last_field_index = 5 + len(self.__sites)
if direction == NavigationDirection.UP:
if self._selected_index == 0:
self._selected_index = last_field_index
else:
self._selected_index -= 1
elif direction == NavigationDirection.DOWN:
if self._selected_index == last_field_index:
self._selected_index = 0
else:
self._selected_index += 1
def __truncate_label(self, label: str, max_length: int = LAYOUT_TRUNCATED_LABEL_MAX_LENGTH) -> str:
"""
Truncate label to max_length, replacing last char with ellipsis if needed.
Args:
label: The label text to truncate
max_length: Maximum allowed length for the label
Returns:
str: The truncated label with ellipsis if needed
"""
if len(label) <= max_length:
return label
return label[:max_length - 1] + "…"
```
--------------------------------------------------------------------------------
/docs/_modules/mcp_server_webcrawl/extras/regex.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html class="writer-html5" lang="en" data-content_root="../../../">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>mcp_server_webcrawl.extras.regex — mcp-server-webcrawl documentation</title>
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css?v=80d5e7a1" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/theme.css?v=e59714d7" />
<script src="../../../_static/jquery.js?v=5d32c60e"></script>
<script src="../../../_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script>
<script src="../../../_static/documentation_options.js?v=5929fcd5"></script>
<script src="../../../_static/doctools.js?v=888ff710"></script>
<script src="../../../_static/sphinx_highlight.js?v=dc90522c"></script>
<script src="../../../_static/js/theme.js"></script>
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home">
mcp-server-webcrawl
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" aria-label="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="Navigation menu">
<p class="caption" role="heading"><span class="caption-text">Contents:</span></p>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../guides.html">Setup Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../usage.html">Usage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../prompts.html">Prompt Routines</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../modules.html">mcp_server_webcrawl</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="Mobile navigation menu" >
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">mcp-server-webcrawl</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="Page navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html" class="icon icon-home" aria-label="Home"></a></li>
<li class="breadcrumb-item"><a href="../../index.html">Module code</a></li>
<li class="breadcrumb-item active">mcp_server_webcrawl.extras.regex</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for mcp_server_webcrawl.extras.regex</h1><div class="highlight"><pre>
<span></span><span class="kn">import</span> <span class="nn">re</span>
<span class="kn">from</span> <span class="nn">functools</span> <span class="kn">import</span> <span class="n">lru_cache</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Final</span>
<span class="kn">from</span> <span class="nn">logging</span> <span class="kn">import</span> <span class="n">Logger</span>
<span class="kn">from</span> <span class="nn">mcp_server_webcrawl.utils.logger</span> <span class="kn">import</span> <span class="n">get_logger</span>
<span class="n">__REGEX_PATTERNS_REGEX_HAZARDS</span><span class="p">:</span> <span class="n">Final</span><span class="p">[</span><span class="nb">list</span><span class="p">[</span><span class="nb">str</span><span class="p">]]</span> <span class="o">=</span> <span class="p">[</span>
<span class="sa">r</span><span class="s2">"\([^)]*\*[^)]*\+"</span><span class="p">,</span> <span class="c1"># (.*)*+, (.+)*+, etc.</span>
<span class="sa">r</span><span class="s2">"\([^)]*\+[^)]*\*"</span><span class="p">,</span> <span class="c1"># (.+)*., (.*)++, etc.</span>
<span class="sa">r</span><span class="s2">"\([^)]*\+[^)]*\+"</span><span class="p">,</span> <span class="c1"># (.+)+, (.++)+ etc.</span>
<span class="sa">r</span><span class="s2">"\([^)]*\*[^)]*\*"</span><span class="p">,</span> <span class="c1"># (.*)*, (.**) etc.</span>
<span class="sa">r</span><span class="s2">"\.\*.*\.\*"</span><span class="p">,</span> <span class="c1"># .*.* patterns</span>
<span class="sa">r</span><span class="s2">"\.\+.*\.\+"</span><span class="p">,</span> <span class="c1"># .+.+ patterns</span>
<span class="sa">r</span><span class="s2">"\([^)]*\?\)\*"</span><span class="p">,</span> <span class="c1"># (a?)* patterns</span>
<span class="sa">r</span><span class="s2">"\([^)]*\?\)\+"</span><span class="p">,</span> <span class="c1"># (a?)+ patterns</span>
<span class="sa">r</span><span class="s2">"\([^)]*[*+?][^)]*[*+?][^)]*\)[*+]"</span><span class="p">,</span> <span class="c1"># 2+ quantifiers inside, then quantifier outside</span>
<span class="p">]</span>
<span class="n">logger</span><span class="p">:</span> <span class="n">Logger</span> <span class="o">=</span> <span class="n">get_logger</span><span class="p">()</span>
<span class="nd">@lru_cache</span><span class="p">(</span><span class="n">maxsize</span><span class="o">=</span><span class="kc">None</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__get_compiled_hazard_patterns</span><span class="p">():</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Lazy load compiled patterns</span>
<span class="sd"> """</span>
<span class="n">compiled_patterns</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">hazard</span> <span class="ow">in</span> <span class="n">__REGEX_PATTERNS_REGEX_HAZARDS</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">compiled_patterns</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">re</span><span class="o">.</span><span class="n">compile</span><span class="p">(</span><span class="n">hazard</span><span class="p">))</span>
<span class="k">except</span> <span class="n">re</span><span class="o">.</span><span class="n">error</span> <span class="k">as</span> <span class="n">ex</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Invalid hazard pattern </span><span class="si">{</span><span class="n">hazard</span><span class="si">}</span><span class="s2">: </span><span class="si">{</span><span class="n">ex</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
<span class="k">continue</span>
<span class="k">return</span> <span class="n">compiled_patterns</span>
<span class="k">def</span> <span class="nf">__regex_is_hazardous</span><span class="p">(</span><span class="n">pattern</span><span class="p">:</span> <span class="nb">str</span><span class="p">)</span> <span class="o">-></span> <span class="nb">bool</span><span class="p">:</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Check if a regex pattern might cause catastrophic backtracking</span>
<span class="sd"> or otherwise unacceptable performance over up to 100 HTML files</span>
<span class="sd"> """</span>
<span class="n">compiled_hazards</span> <span class="o">=</span> <span class="n">__get_compiled_hazard_patterns</span><span class="p">()</span>
<span class="k">for</span> <span class="n">hazard_pattern</span> <span class="ow">in</span> <span class="n">compiled_hazards</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="n">hazard_pattern</span><span class="o">.</span><span class="n">search</span><span class="p">(</span><span class="n">pattern</span><span class="p">):</span>
<span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">"hazardous regex discarded </span><span class="si">{</span><span class="n">pattern</span><span class="si">}</span><span class="s2"> matched </span><span class="si">{</span><span class="n">hazard_pattern</span><span class="o">.</span><span class="n">pattern</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">except</span> <span class="n">re</span><span class="o">.</span><span class="n">error</span> <span class="k">as</span> <span class="n">ex</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Error checking hazard pattern </span><span class="si">{</span><span class="n">hazard_pattern</span><span class="o">.</span><span class="n">pattern</span><span class="si">}</span><span class="s2">: </span><span class="si">{</span><span class="n">ex</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
<span class="k">continue</span>
<span class="k">return</span> <span class="kc">False</span>
<div class="viewcode-block" id="get_regex">
<a class="viewcode-back" href="../../../mcp_server_webcrawl.extras.html#mcp_server_webcrawl.extras.regex.get_regex">[docs]</a>
<span class="k">def</span> <span class="nf">get_regex</span><span class="p">(</span><span class="n">headers</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">content</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">patterns</span><span class="p">:</span> <span class="nb">list</span><span class="p">[</span><span class="nb">str</span><span class="p">])</span> <span class="o">-></span> <span class="nb">list</span><span class="p">[</span><span class="nb">dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span> <span class="o">|</span> <span class="nb">int</span><span class="p">]]:</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Takes headers and content and gets regex matches</span>
<span class="sd"> Arguments:</span>
<span class="sd"> headers: The headers to search</span>
<span class="sd"> content: The content to search</span>
<span class="sd"> patterns: The regex patterns</span>
<span class="sd"> Returns:</span>
<span class="sd"> A list of dicts, with selector, value, groups, position info, and source</span>
<span class="sd"> """</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">content</span> <span class="o">=</span> <span class="s2">""</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">headers</span><span class="p">,</span> <span class="nb">str</span><span class="p">):</span>
<span class="n">headers</span> <span class="o">=</span> <span class="s2">""</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">patterns</span><span class="p">,</span> <span class="nb">list</span><span class="p">)</span> <span class="ow">or</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="nb">str</span><span class="p">)</span> <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="n">patterns</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"patterns must be a list of strings"</span><span class="p">)</span>
<span class="n">results</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">if</span> <span class="n">content</span> <span class="o">==</span> <span class="s2">""</span> <span class="ow">and</span> <span class="n">headers</span> <span class="o">==</span> <span class="s2">""</span><span class="p">:</span>
<span class="k">return</span> <span class="n">results</span>
<span class="n">re_patterns</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">pattern</span> <span class="ow">in</span> <span class="n">patterns</span><span class="p">:</span>
<span class="k">if</span> <span class="n">__regex_is_hazardous</span><span class="p">(</span><span class="n">pattern</span><span class="p">):</span>
<span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Hazardous regex pattern '</span><span class="si">{</span><span class="n">pattern</span><span class="si">}</span><span class="s2">'"</span><span class="p">)</span>
<span class="k">continue</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">re_pattern</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">compile</span><span class="p">(</span><span class="n">pattern</span><span class="p">)</span>
<span class="n">re_patterns</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">re_pattern</span><span class="p">)</span>
<span class="k">except</span> <span class="n">re</span><span class="o">.</span><span class="n">error</span> <span class="k">as</span> <span class="n">ex</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Invalid regex pattern '</span><span class="si">{</span><span class="n">pattern</span><span class="si">}</span><span class="s2">': </span><span class="si">{</span><span class="n">ex</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
<span class="k">continue</span>
<span class="c1"># search headers and content</span>
<span class="n">search_targets</span> <span class="o">=</span> <span class="p">[(</span><span class="s2">"headers"</span><span class="p">,</span> <span class="n">headers</span><span class="p">),</span> <span class="p">(</span><span class="s2">"content"</span><span class="p">,</span> <span class="n">content</span><span class="p">)]</span>
<span class="k">for</span> <span class="n">re_pattern</span> <span class="ow">in</span> <span class="n">re_patterns</span><span class="p">:</span>
<span class="k">for</span> <span class="n">source_name</span><span class="p">,</span> <span class="n">search_text</span> <span class="ow">in</span> <span class="n">search_targets</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">search_text</span><span class="p">:</span>
<span class="k">continue</span>
<span class="k">for</span> <span class="n">match</span> <span class="ow">in</span> <span class="n">re_pattern</span><span class="o">.</span><span class="n">finditer</span><span class="p">(</span><span class="n">search_text</span><span class="p">):</span>
<span class="n">regex_hit</span><span class="p">:</span> <span class="nb">dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span> <span class="o">|</span> <span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="p">{</span>
<span class="s2">"selector"</span><span class="p">:</span> <span class="n">re_pattern</span><span class="o">.</span><span class="n">pattern</span><span class="p">,</span>
<span class="s2">"value"</span><span class="p">:</span> <span class="n">match</span><span class="o">.</span><span class="n">group</span><span class="p">(</span><span class="mi">0</span><span class="p">),</span>
<span class="s2">"source"</span><span class="p">:</span> <span class="n">source_name</span> <span class="c1"># headers or content</span>
<span class="p">}</span>
<span class="k">if</span> <span class="n">match</span><span class="o">.</span><span class="n">groups</span><span class="p">():</span>
<span class="k">for</span> <span class="n">i</span><span class="p">,</span> <span class="n">group</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">match</span><span class="o">.</span><span class="n">groups</span><span class="p">(),</span> <span class="mi">1</span><span class="p">):</span>
<span class="k">if</span> <span class="n">group</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">regex_hit</span><span class="p">[</span><span class="sa">f</span><span class="s2">"group_</span><span class="si">{</span><span class="n">i</span><span class="si">}</span><span class="s2">"</span><span class="p">]</span> <span class="o">=</span> <span class="n">group</span>
<span class="n">regex_hit</span><span class="p">[</span><span class="s2">"start"</span><span class="p">]</span> <span class="o">=</span> <span class="n">match</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="n">regex_hit</span><span class="p">[</span><span class="s2">"end"</span><span class="p">]</span> <span class="o">=</span> <span class="n">match</span><span class="o">.</span><span class="n">end</span><span class="p">()</span>
<span class="n">results</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">regex_hit</span><span class="p">)</span>
<span class="k">return</span> <span class="n">results</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>© Copyright 2025, pragmar.</p>
</div>
Built with <a href="https://www.sphinx-doc.org/">Sphinx</a> using a
<a href="https://github.com/readthedocs/sphinx_rtd_theme">theme</a>
provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script>
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>
```
--------------------------------------------------------------------------------
/src/mcp_server_webcrawl/crawlers/archivebox/adapter.py:
--------------------------------------------------------------------------------
```python
import json
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from mcp_server_webcrawl.crawlers.base.adapter import (
BaseManager,
IndexState,
IndexStatus,
SitesGroup,
INDEXED_BATCH_SIZE,
INDEXED_TYPE_MAPPING,
INDEXED_IGNORE_DIRECTORIES,
)
from mcp_server_webcrawl.crawlers.base.indexed import IndexedManager
from mcp_server_webcrawl.models.resources import (
ResourceResult,
ResourceResultType,
RESOURCES_LIMIT_DEFAULT,
)
from mcp_server_webcrawl.models.sites import (
SiteResult,
SiteType,
SITES_FIELDS_BASE,
SITES_FIELDS_DEFAULT,
)
from mcp_server_webcrawl.utils.logger import get_logger
# skip metadata directories
ARCHIVEBOX_SKIP_DIRECTORIES: set[str] = {"media", "mercury"}
ARCHIVEBOX_COLLAPSE_FILENAMES: list[str] = ["/index.html", "/index.htm"]
logger = get_logger()
class ArchiveBoxManager(IndexedManager):
"""
Manages ArchiveBox in-memory SQLite databases for session-level reuse.
"""
def __init__(self) -> None:
"""
Initialize the ArchiveBox manager with empty cache and statistics.
"""
super().__init__()
def _load_site_data(self, connection: sqlite3.Connection, site_directory: Path,
site_id: int, index_state: IndexState = None) -> None:
"""
Load ArchiveBox site data into the database.
Args:
connection: SQLite connection
site_directory: path to the ArchiveBox site directory (e.g., "example" or "pragmar")
site_id: ID for the site
index_state: IndexState object for tracking progress
"""
# The site_directory should be something like "example" or "pragmar"
# We need to look for the "archive" subdirectory within it
archive_directory: Path = site_directory / "archive"
if not archive_directory.exists() or not archive_directory.is_dir():
logger.error(f"Archive directory not found in site: {archive_directory}")
return
if index_state is not None:
index_state.set_status(IndexStatus.INDEXING)
# page directories are timestamped (e.g. example/archive/1756357684.13023)
# these contiain page data/media
page_directories = self._get_page_directories(archive_directory)
if not page_directories:
logger.warning(f"No timestamped entries found in archive: {archive_directory}")
return
all_resources: list[ResourceResult] = []
# process each timestamped entry
for page_directory in page_directories:
if index_state is not None and index_state.is_timeout():
index_state.set_status(IndexStatus.PARTIAL)
break
try:
metadata = self._get_page_metadata(page_directory)
main_url: str = metadata["url"] if "url" in metadata else \
f"archivebox://unknown/{page_directory.name}"
# primary resource
main_resource = self._create_page_resource(page_directory, site_id, main_url, metadata)
if main_resource:
all_resources.append(main_resource)
if index_state is not None:
index_state.increment_processed()
# collect assets (external js/css/fonts/whatever)
domain_assets = self._get_page_domain_assets(page_directory, main_url)
for file_path, asset_url in domain_assets:
asset_resource = self._create_asset_resource(file_path, site_id, asset_url, page_directory)
if asset_resource:
all_resources.append(asset_resource)
if index_state is not None:
index_state.increment_processed()
except Exception as ex:
logger.error(f"Error processing entry {page_directory}: {ex}")
deduplicated_resources = self._dedupe_resources(all_resources)
with closing(connection.cursor()) as cursor:
for i in range(0, len(deduplicated_resources), INDEXED_BATCH_SIZE):
batch = deduplicated_resources[i:i+INDEXED_BATCH_SIZE]
self._execute_batch_insert(connection, cursor, batch)
if index_state is not None and index_state.status == IndexStatus.INDEXING:
index_state.set_status(IndexStatus.COMPLETE)
def _create_page_resource(self, resource_directory: Path, site_id: int,
url: str, metadata: dict) -> ResourceResult | None:
"""
Create ResourceResult for the main captured page.
"""
try:
# created/modified is directory stat
resource_stat: os.stat_result = resource_directory.stat()
created: datetime = datetime.fromtimestamp(resource_stat.st_ctime, tz=timezone.utc)
modified: datetime = datetime.fromtimestamp(resource_stat.st_mtime, tz=timezone.utc)
# select best content, with appropriate fallbacks
html_file: Path = None
if "canonical" in metadata:
# dom first, wget second, ignore singlefile (datauris generate too much storage)
canonical: dict[str, str] = metadata["canonical"]
prioritized_paths = ["dom_path", "wget_path"]
for path_key in prioritized_paths:
if path_key in canonical and canonical[path_key] is not None:
candidate_file = resource_directory / canonical[path_key]
if candidate_file.resolve().is_relative_to(resource_directory.resolve()) and candidate_file.exists():
html_file = candidate_file
break
# fallback to ArchiveBox index file (metadata file - barely useful, but dependable)
if html_file is None:
html_file = resource_directory / "index.html"
# read content
content: str|None = None
file_size: int = 0
if html_file.exists():
try:
with open(html_file, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
file_size: int = html_file.stat().st_size
except Exception as ex:
logger.warning(f"Could not read HTML from {html_file}: {ex}")
# assemble metadata
status_code: int = 200
headers_reconstructed: str = ""
if "http_headers" in metadata:
http_headers = metadata["http_headers"]
if "status" in http_headers:
try:
status_code = int(str(http_headers["status"]).split()[0])
except (ValueError, IndexError):
pass
headers_reconstructed = self._get_http_headers_string(http_headers)
if not headers_reconstructed:
headers_reconstructed = BaseManager.get_basic_headers(
file_size, ResourceResultType.PAGE)
return ResourceResult(
id=BaseManager.string_to_id(url),
site=site_id,
created=created,
modified=modified,
url=url,
type=ResourceResultType.PAGE,
status=status_code,
headers=headers_reconstructed,
content=content,
size=file_size,
time=0
)
except Exception as ex:
logger.error(f"Error creating main resource for {resource_directory}: {ex}")
return None
def _create_asset_resource(self, file_path: Path, site_id: int, url: str, entry_dir: Path) -> ResourceResult | None:
"""
Create ResourceResult for a domain asset file.
"""
try:
# get file info
if not file_path.exists():
return None
file_stat = file_path.stat()
created: datetime = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc)
modified: datetime = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc)
file_size: int = file_stat.st_size
extension: str = file_path.suffix.lower()
# ArchiveBox will stuff URL args into @... in the filename
# sometimes it's the filename, sometimes the extension
# both need cleaning
clean_url: str = url.split("@")[0]
clean_extension: str = extension.split("@")[0]
resource_type: str = INDEXED_TYPE_MAPPING.get(clean_extension, ResourceResultType.OTHER)
# read content for text files
content: str | None = BaseManager.read_file_contents(file_path, resource_type)
return ResourceResult(
id=BaseManager.string_to_id(clean_url),
site=site_id,
created=created,
modified=modified,
url=clean_url,
type=resource_type,
status=200, # assume assets successful
headers=BaseManager.get_basic_headers(file_size, resource_type, file_path),
content=content,
size=file_size,
time=0
)
except Exception as ex:
logger.error(f"Error creating asset resource for {file_path}: {ex}")
return None
def _get_page_directories(self, archive_directory: Path) -> list[Path]:
"""
Get webpage directories within ArchiveBox archive.
Args:
archive_directory: path to the ArchiveBox archive directory
Returns:
List of timestamped entry directory paths
"""
# page_directories are the timestamped directories,
# e.g. archive/1756342555.086082
page_directories = []
if not archive_directory.is_dir():
return page_directories
for item in archive_directory.iterdir():
# 1756342555.086082.replace(".", "") is numeric
if (item.is_dir() and item.name.replace(".", "").isdigit()):
data_files: list[Path] = [
(item / "index.json"),
(item / "headers.json"),
(item / "index.html"),
]
for data_file in data_files:
if data_file.exists():
page_directories.append(item)
break
return sorted(page_directories)
def _get_page_metadata(self, entry_directory: Path) -> dict:
"""
Extract metadata from ArchiveBox entry files.
Args:
entry_directory: path to the timestamped entry directory
Returns:
Dictionary containing extracted metadata
"""
page_metadata: dict[str, str] = {}
# read index.json for primary URL and metadata
index_json_path: Path = entry_directory / "index.json"
if index_json_path.exists():
try:
with open(index_json_path, "r", encoding="utf-8", errors="replace") as f:
index_data = json.load(f)
page_metadata.update(index_data)
except (json.JSONDecodeError, UnicodeDecodeError) as ex:
logger.warning(f"Could not parse index.json from {entry_directory}: {ex}")
except Exception as ex:
logger.error(f"Error reading index.json from {entry_directory}: {ex}")
# read headers.json for HTTP headers
headers_json_path = entry_directory / "headers.json"
if headers_json_path.exists():
try:
with open(headers_json_path, "r", encoding="utf-8", errors="replace") as f:
http_headers = json.load(f)
page_metadata["http_headers"] = http_headers
except (json.JSONDecodeError, UnicodeDecodeError) as ex:
logger.warning(f"Could not parse headers.json from {entry_directory}: {ex}")
except Exception as ex:
logger.error(f"Error reading headers.json from {entry_directory}: {ex}")
return page_metadata
def _get_page_domain_assets(self, entry_dir: Path, main_url: str) -> list[tuple[Path, str]]:
"""
Collect all domain asset files within an entry.
Args:
entry_dir: path to the timestamped entry
main_url: the main captured URL
Returns:
List of (file_path, reconstructed_url) tuples
"""
assets: list[tuple] = []
for item in entry_dir.iterdir():
if item.is_dir() and item.name not in ARCHIVEBOX_SKIP_DIRECTORIES:
# this is an archivebox domain directory
domain_name: str = item.name
# walk domain directories for assets
# (e.g. example/archive/1756357684.13023/example.com)
for root, _, files in os.walk(item):
for filename in files:
# *orig$ are dupes, not reliably in fileext form
if filename.endswith("orig"):
continue
file_path = Path(root) / filename
# clean up ArchiveBox's @timestamp suffixes for URL construction
clean_filename: str = filename.split("@")[0]
clean_file_path: Path = Path(root) / clean_filename
relative_path = clean_file_path.relative_to(item)
url = f"https://{domain_name}/{str(relative_path).replace(os.sep, '/')}"
for collapse_filename in ARCHIVEBOX_COLLAPSE_FILENAMES:
# turn ./index.html and variants into ./ (dir index) to help the indexer
if url.endswith(collapse_filename):
url = url[:-(len(collapse_filename))] + "/"
break
# Use original file_path for reading, clean url for storage
assets.append((file_path, url))
return assets
def _dedupe_resources(self, resources: list[ResourceResult]) -> list[ResourceResult]:
"""
Deduplicate resources based on URL and metadata
Args:
resources: list of ResourceResult objects
Returns:
Deduplicated list of ResourceResult objects
"""
seen_urls: dict[str, ResourceResult] = {}
deduplicated: list[ResourceResult] = []
resource: ResourceResult
for resource in resources:
if resource.url in seen_urls:
# url collision, check if content differs, prefer newer
existing = seen_urls[resource.url]
if resource.modified and existing.modified:
if resource.modified > existing.modified:
deduplicated = [r for r in deduplicated if r.url != resource.url]
deduplicated.append(resource)
seen_urls[resource.url] = resource
else:
# keep existing
seen_urls[resource.url] = resource
deduplicated.append(resource)
return deduplicated
def _get_http_headers_string(self, http_headers: dict) -> str:
"""
Format headers dictionary as HTTP headers string.
"""
if not http_headers:
return ""
headers_lines: list[str] = []
status: int = http_headers.get("Status-Code", 200)
headers_lines.append(f"HTTP/1.0 {status}")
for key, value in http_headers.items():
if key.lower() not in ["status-code"]:
headers_lines.append(f"{key}: {value}")
return "\r\n".join(headers_lines) + "\r\n\r\n"
manager: ArchiveBoxManager = ArchiveBoxManager()
def get_sites(
datasrc: Path,
ids: list[int] | None = None,
fields: list[str] | None = None
) -> list[SiteResult]:
"""
List ArchiveBox instances as separate sites.
Each subdirectory of datasrc that contains an "archive" folder is treated as a separate ArchiveBox instance.
Args:
datasrc: path to the directory containing ArchiveBox instance directories
ids: optional list of site IDs to filter by
fields: optional list of fields to include in the response
Returns:
List of SiteResult objects, one for each ArchiveBox instance
"""
assert datasrc is not None, f"datasrc not provided ({datasrc})"
if not datasrc.exists():
logger.error(f"Directory not found ({datasrc})")
return []
# determine which fields to include
selected_fields: set[str] = set(SITES_FIELDS_BASE)
if fields:
valid_fields: set[str] = set(SITES_FIELDS_DEFAULT)
selected_fields.update(f for f in fields if f in valid_fields)
else:
selected_fields.update(SITES_FIELDS_DEFAULT)
results: list[SiteResult] = []
# get all directories that contain an "archive" subdirectory
site_directories: list[Path] = []
for datasrc_item in datasrc.iterdir():
if (
datasrc_item.is_dir() and
not datasrc_item.name.startswith(".") and
datasrc_item.name not in INDEXED_IGNORE_DIRECTORIES and
(datasrc_item / "archive").is_dir()
):
site_directories.append(datasrc_item)
# map directory IDs to paths for filtering
site_directories_map: dict[int, Path] = {BaseManager.string_to_id(d.name): d for d in site_directories}
if ids:
site_directories_map = {id_val: path for id_val, path in site_directories_map.items() if id_val in ids}
# process each ArchiveBox instance directory
for site_id, site_directory in sorted(site_directories_map.items()):
site_directory_stat = site_directory.stat()
created_time: datetime = datetime.fromtimestamp(site_directory_stat.st_ctime)
modified_time: datetime = datetime.fromtimestamp(site_directory_stat.st_mtime)
site = SiteResult(
path=site_directory,
id=site_id,
name=site_directory.name, # NEW: the directory name
type=SiteType.CRAWLED_LIST, # NEW: always CRAWLED_LIST for archivebox
urls=[f"archivebox://{site_directory.name}/"], # CHANGED: now a list
created=created_time if "created" in selected_fields else None,
modified=modified_time if "modified" in selected_fields else None,
)
results.append(site)
return results
def get_resources(
datasrc: Path,
sites: list[int] | None = None,
query: str = "",
fields: list[str] | None = None,
sort: str | None = None,
limit: int = RESOURCES_LIMIT_DEFAULT,
offset: int = 0,
) -> tuple[list[ResourceResult], int, IndexState]:
"""
Get resources from ArchiveBox instances using in-memory SQLite.
Args:
datasrc: path to the directory containing ArchiveBox instance directories
sites: optional list of site IDs to filter by
query: search query string
fields: optional list of fields to include in response
sort: sort order for results
limit: maximum number of results to return
offset: number of results to skip for pagination
Returns:
Tuple of (list of ResourceResult objects, total count, IndexState)
"""
sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites)
assert sites_results, "At least one site is required to search"
# use the actual site directories as paths (e.g., "example", "pragmar")
site_paths = [site.path for site in sites_results]
sites_group = SitesGroup(datasrc, sites or [site.id for site in sites_results], site_paths)
return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset)
```
--------------------------------------------------------------------------------
/docs/py-modindex.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html class="writer-html5" lang="en" data-content_root="./">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Python Module Index — mcp-server-webcrawl documentation</title>
<link rel="stylesheet" type="text/css" href="_static/pygments.css?v=80d5e7a1" />
<link rel="stylesheet" type="text/css" href="_static/css/theme.css?v=e59714d7" />
<script src="_static/jquery.js?v=5d32c60e"></script>
<script src="_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script>
<script src="_static/documentation_options.js?v=5929fcd5"></script>
<script src="_static/doctools.js?v=888ff710"></script>
<script src="_static/sphinx_highlight.js?v=dc90522c"></script>
<script src="_static/js/theme.js"></script>
<link rel="index" title="Index" href="genindex.html" />
<link rel="search" title="Search" href="search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="index.html" class="icon icon-home">
mcp-server-webcrawl
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="search.html" method="get">
<input type="text" name="q" placeholder="Search docs" aria-label="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="Navigation menu">
<p class="caption" role="heading"><span class="caption-text">Contents:</span></p>
<ul>
<li class="toctree-l1"><a class="reference internal" href="installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="guides.html">Setup Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="usage.html">Usage</a></li>
<li class="toctree-l1"><a class="reference internal" href="prompts.html">Prompt Routines</a></li>
<li class="toctree-l1"><a class="reference internal" href="interactive.html">Interactive Mode</a></li>
<li class="toctree-l1"><a class="reference internal" href="modules.html">mcp_server_webcrawl</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="Mobile navigation menu" >
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="index.html">mcp-server-webcrawl</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="Page navigation">
<ul class="wy-breadcrumbs">
<li><a href="index.html" class="icon icon-home" aria-label="Home"></a></li>
<li class="breadcrumb-item active">Python Module Index</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Python Module Index</h1>
<div class="modindex-jumpbox">
<a href="#cap-m"><strong>m</strong></a>
</div>
<table class="indextable modindextable">
<tr class="pcap"><td></td><td> </td><td></td></tr>
<tr class="cap" id="cap-m"><td></td><td>
<strong>m</strong></td><td></td></tr>
<tr>
<td><img src="_static/minus.png" class="toggler"
id="toggle-1" style="display: none" alt="-" /></td>
<td>
<a href="mcp_server_webcrawl.html#module-mcp_server_webcrawl"><code class="xref">mcp_server_webcrawl</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.html#module-mcp_server_webcrawl.crawlers"><code class="xref">mcp_server_webcrawl.crawlers</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.archivebox.html#module-mcp_server_webcrawl.crawlers.archivebox"><code class="xref">mcp_server_webcrawl.crawlers.archivebox</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.archivebox.html#module-mcp_server_webcrawl.crawlers.archivebox.adapter"><code class="xref">mcp_server_webcrawl.crawlers.archivebox.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.archivebox.html#module-mcp_server_webcrawl.crawlers.archivebox.crawler"><code class="xref">mcp_server_webcrawl.crawlers.archivebox.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.archivebox.html#module-mcp_server_webcrawl.crawlers.archivebox.tests"><code class="xref">mcp_server_webcrawl.crawlers.archivebox.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.base.html#module-mcp_server_webcrawl.crawlers.base"><code class="xref">mcp_server_webcrawl.crawlers.base</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.base.html#module-mcp_server_webcrawl.crawlers.base.adapter"><code class="xref">mcp_server_webcrawl.crawlers.base.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.base.html#module-mcp_server_webcrawl.crawlers.base.api"><code class="xref">mcp_server_webcrawl.crawlers.base.api</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.base.html#module-mcp_server_webcrawl.crawlers.base.crawler"><code class="xref">mcp_server_webcrawl.crawlers.base.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.base.html#module-mcp_server_webcrawl.crawlers.base.indexed"><code class="xref">mcp_server_webcrawl.crawlers.base.indexed</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.base.html#module-mcp_server_webcrawl.crawlers.base.tests"><code class="xref">mcp_server_webcrawl.crawlers.base.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.httrack.html#module-mcp_server_webcrawl.crawlers.httrack"><code class="xref">mcp_server_webcrawl.crawlers.httrack</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.httrack.html#module-mcp_server_webcrawl.crawlers.httrack.adapter"><code class="xref">mcp_server_webcrawl.crawlers.httrack.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.httrack.html#module-mcp_server_webcrawl.crawlers.httrack.crawler"><code class="xref">mcp_server_webcrawl.crawlers.httrack.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.httrack.html#module-mcp_server_webcrawl.crawlers.httrack.tests"><code class="xref">mcp_server_webcrawl.crawlers.httrack.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.interrobot.html#module-mcp_server_webcrawl.crawlers.interrobot"><code class="xref">mcp_server_webcrawl.crawlers.interrobot</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.interrobot.html#module-mcp_server_webcrawl.crawlers.interrobot.adapter"><code class="xref">mcp_server_webcrawl.crawlers.interrobot.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.interrobot.html#module-mcp_server_webcrawl.crawlers.interrobot.crawler"><code class="xref">mcp_server_webcrawl.crawlers.interrobot.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.interrobot.html#module-mcp_server_webcrawl.crawlers.interrobot.tests"><code class="xref">mcp_server_webcrawl.crawlers.interrobot.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.katana.html#module-mcp_server_webcrawl.crawlers.katana"><code class="xref">mcp_server_webcrawl.crawlers.katana</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.katana.html#module-mcp_server_webcrawl.crawlers.katana.adapter"><code class="xref">mcp_server_webcrawl.crawlers.katana.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.katana.html#module-mcp_server_webcrawl.crawlers.katana.crawler"><code class="xref">mcp_server_webcrawl.crawlers.katana.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.katana.html#module-mcp_server_webcrawl.crawlers.katana.tests"><code class="xref">mcp_server_webcrawl.crawlers.katana.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.siteone.html#module-mcp_server_webcrawl.crawlers.siteone"><code class="xref">mcp_server_webcrawl.crawlers.siteone</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.siteone.html#module-mcp_server_webcrawl.crawlers.siteone.adapter"><code class="xref">mcp_server_webcrawl.crawlers.siteone.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.siteone.html#module-mcp_server_webcrawl.crawlers.siteone.crawler"><code class="xref">mcp_server_webcrawl.crawlers.siteone.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.siteone.html#module-mcp_server_webcrawl.crawlers.siteone.tests"><code class="xref">mcp_server_webcrawl.crawlers.siteone.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.warc.html#module-mcp_server_webcrawl.crawlers.warc"><code class="xref">mcp_server_webcrawl.crawlers.warc</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.warc.html#module-mcp_server_webcrawl.crawlers.warc.adapter"><code class="xref">mcp_server_webcrawl.crawlers.warc.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.warc.html#module-mcp_server_webcrawl.crawlers.warc.crawler"><code class="xref">mcp_server_webcrawl.crawlers.warc.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.warc.html#module-mcp_server_webcrawl.crawlers.warc.tests"><code class="xref">mcp_server_webcrawl.crawlers.warc.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.wget.html#module-mcp_server_webcrawl.crawlers.wget"><code class="xref">mcp_server_webcrawl.crawlers.wget</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.wget.html#module-mcp_server_webcrawl.crawlers.wget.adapter"><code class="xref">mcp_server_webcrawl.crawlers.wget.adapter</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.wget.html#module-mcp_server_webcrawl.crawlers.wget.crawler"><code class="xref">mcp_server_webcrawl.crawlers.wget.crawler</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.crawlers.wget.html#module-mcp_server_webcrawl.crawlers.wget.tests"><code class="xref">mcp_server_webcrawl.crawlers.wget.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.extras.html#module-mcp_server_webcrawl.extras"><code class="xref">mcp_server_webcrawl.extras</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.extras.html#module-mcp_server_webcrawl.extras.markdown"><code class="xref">mcp_server_webcrawl.extras.markdown</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.extras.html#module-mcp_server_webcrawl.extras.regex"><code class="xref">mcp_server_webcrawl.extras.regex</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.extras.html#module-mcp_server_webcrawl.extras.snippets"><code class="xref">mcp_server_webcrawl.extras.snippets</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.extras.html#module-mcp_server_webcrawl.extras.thumbnails"><code class="xref">mcp_server_webcrawl.extras.thumbnails</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.extras.html#module-mcp_server_webcrawl.extras.xpath"><code class="xref">mcp_server_webcrawl.extras.xpath</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.interactive.html#module-mcp_server_webcrawl.interactive"><code class="xref">mcp_server_webcrawl.interactive</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.interactive.html#module-mcp_server_webcrawl.interactive.highlights"><code class="xref">mcp_server_webcrawl.interactive.highlights</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.interactive.html#module-mcp_server_webcrawl.interactive.search"><code class="xref">mcp_server_webcrawl.interactive.search</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.interactive.html#module-mcp_server_webcrawl.interactive.session"><code class="xref">mcp_server_webcrawl.interactive.session</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.interactive.html#module-mcp_server_webcrawl.interactive.ui"><code class="xref">mcp_server_webcrawl.interactive.ui</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.models.html#module-mcp_server_webcrawl.models"><code class="xref">mcp_server_webcrawl.models</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.models.html#module-mcp_server_webcrawl.models.resources"><code class="xref">mcp_server_webcrawl.models.resources</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.models.html#module-mcp_server_webcrawl.models.sites"><code class="xref">mcp_server_webcrawl.models.sites</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.html#module-mcp_server_webcrawl.settings"><code class="xref">mcp_server_webcrawl.settings</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.html#module-mcp_server_webcrawl.settings_local"><code class="xref">mcp_server_webcrawl.settings_local</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.templates.html#module-mcp_server_webcrawl.templates"><code class="xref">mcp_server_webcrawl.templates</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.templates.html#module-mcp_server_webcrawl.templates.tests"><code class="xref">mcp_server_webcrawl.templates.tests</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.utils.html#module-mcp_server_webcrawl.utils"><code class="xref">mcp_server_webcrawl.utils</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.utils.html#module-mcp_server_webcrawl.utils.cli"><code class="xref">mcp_server_webcrawl.utils.cli</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.utils.html#module-mcp_server_webcrawl.utils.logger"><code class="xref">mcp_server_webcrawl.utils.logger</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.utils.html#module-mcp_server_webcrawl.utils.server"><code class="xref">mcp_server_webcrawl.utils.server</code></a></td><td>
<em></em></td></tr>
<tr class="cg-1">
<td></td>
<td>   
<a href="mcp_server_webcrawl.utils.html#module-mcp_server_webcrawl.utils.tools"><code class="xref">mcp_server_webcrawl.utils.tools</code></a></td><td>
<em></em></td></tr>
</table>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>© Copyright 2025, pragmar.</p>
</div>
Built with <a href="https://www.sphinx-doc.org/">Sphinx</a> using a
<a href="https://github.com/readthedocs/sphinx_rtd_theme">theme</a>
provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script>
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>
```
--------------------------------------------------------------------------------
/src/mcp_server_webcrawl/interactive/ui.py:
--------------------------------------------------------------------------------
```python
import curses
from enum import Enum, auto
from typing import NamedTuple, Optional, Tuple
from mcp_server_webcrawl.crawlers import VALID_CRAWLER_CHOICES
SITE_COLUMN_WIDTH = 18
LAYOUT_GRID_COLUMN_SPACING = 2
LAYOUT_CONSTRAINED_SITES_PER_COLUMN = 3
LAYOUT_SITES_GRID_OFFSET = 6
DEFAULT_GROUP_WIDTH = 12
INPUT_BOX_BRACKET_WIDTH = 2
CURSOR_SCROLL_THRESHOLD = 5
class DocumentMode(Enum):
MARKDOWN = auto()
RAW = auto()
HEADERS = auto()
class NavigationDirection(Enum):
UP = auto()
DOWN = auto()
LEFT = auto()
RIGHT = auto()
class ScrollDirection(Enum):
UP = auto()
DOWN = auto()
class SearchFilterType(Enum):
ANY = 0
PAGES = 1
class ThemeDefinition(Enum):
# https://www.ditig.com/256-colors-cheat-sheet
DOCUMENT_MODE = (1, curses.COLOR_BLUE, 51)
HEADER_ACTIVE = (2, curses.COLOR_WHITE, 17)
HEADER_INACTIVE = (3, curses.COLOR_WHITE, 233)
HEADER_OUTER = (4, curses.COLOR_WHITE, 235)
HELP_LINK = (5, curses.COLOR_WHITE, 27)
HTTP_ERROR = (6, curses.COLOR_WHITE, 88)
HTTP_WARN = (7, curses.COLOR_WHITE, 130)
INACTIVE_QUERY = (8, 245, 237)
SNIPPET_DEFAULT = (9, 243, curses.A_DIM)
SNIPPET_HIGHLIGHT = (10, 232, 51)
UI_ERROR = (11, curses.COLOR_WHITE, 88)
class UiFocusable(Enum):
UNDEFINED = auto()
SEARCH_FORM = auto()
SEARCH_RESULTS = auto()
class UiState(Enum):
UNDEFINED = auto()
REQUIREMENTS = auto()
SEARCH_INIT = auto()
SEARCH_RESULTS = auto()
DOCUMENT = auto()
HELP = auto()
def safe_addstr(stdscr: curses.window, y: int, x: int, text: str, style: int = curses.A_NORMAL) -> None:
"""
Safe addstr that handles screen edge errors.
"""
try:
stdscr.addstr(y, x, text, style)
except curses.error:
pass
class InputRadio:
def __init__(self, group, name: str, label: str, index: int, states: list = None):
"""
Radio input with 2-3 possible states (e.g., on/off or state1/state2/off)
Args:
group: The InputRadioGroup this radio belongs to
name: The form radio group name
label: The form radio label
index: The current state index
states: List of InputRadioState objects defining each possible state
"""
# used like so states_radio = InputRadio(groupname, label, index, states=InputRadioGroup.get_filters())
if states is None:
states = []
assert states, "states must be provided and non-empty"
assert 0 <= index < len(states), f"index {index} out of range for {len(states)} states"
self.name = name
self.label = label
self.index = index
self._states = states
self._group = group
@property
def current_state(self):
"""
Get the current state
"""
return self._states[self.index]
@property
def display_label(self) -> str:
"""
Get the current display label
"""
return self.current_state.label
@property
def value(self) -> str:
"""
Get the current value
"""
return self.current_state.value
def next_state(self) -> None:
"""
Cycle to the next state
"""
# clear group for single-selection radios
if self._group.name in ["filter", "site", "crawler"]:
self._group.clear()
if self._group.name == "sort":
if self.index == 0: # inactive " " -> active ascending "+"
self._group.clear()
self.index = 1
elif self.index == 1: # ascending "+" -> descending "-"
self.index = 2
elif self.index == 2: # descending "-" -> ascending "+"
self.index = 1
else:
# standard cycling for other radios
self.index = (self.index + 1) % len(self._states)
def render(self, stdscr: curses.window, y: int, x: int, field_index: int, max_width: int = None, focused: bool = False) -> None:
"""
Render a single radio option.
"""
radio_symbol = self.display_label
display_text = self.label
if max_width and len(display_text) > max_width:
display_text = display_text[:max_width - 1] + "…"
line = f"({radio_symbol}) {display_text}"
style = curses.A_REVERSE if focused else curses.A_NORMAL
try:
safe_addstr(stdscr, y, x, line, style)
except curses.error:
pass # screen edge
def set_state(self, index: int) -> None:
"""
Set the current state by index
"""
if 0 <= index < len(self._states):
self.index = index
else:
raise IndexError(f"State index {index} out of range")
def set_states(self, states: list) -> None:
"""
Set the current state by index
"""
self._states = states
def __str__(self) -> str:
return f"{self.label}: {self.display_label} ({self.value})"
class InputRadioGroup:
"""
Radio group with navigation and layout management capabilities.
"""
def __init__(self, name: str, sites: list = None):
"""
Radio input group with layout and navigation support.
Args:
name: The form radio group name ("filter", "sort", "site", or "crawler")
sites: List of SiteResult objects, required only for "site" group type
"""
sites = sites if sites is not None else []
self.name: str = name
self.label: str = name
self.__selected_index: int = 0
# layout configuration
self.__available_width: int = 0
self.__available_height: int = 0
self.__is_constrained: bool = False
self.__sites_per_column: int = 0
self.__max_columns: int = 0
self.radios: list[InputRadio] = []
group_config = {
"filter": (self.__load_filters, "Filter:"),
"site": (lambda: self.__load_sites(sites), "Sites:"),
"sort": (self.__load_sorts, "Sorts:"),
"crawler": (self.__load_crawlers, "Crawlers:"),
}
if self.name in group_config:
data_loader, label = group_config[self.name]
self.label = label
data_loader()
else:
raise Exception(f"Unsupported radio option: {self.name}")
if self.radios:
self.radios[0].next_state()
@property
def value(self) -> str:
for radio in self.radios:
if radio.value == "on" or radio.display_label in ["+", "-"]: # selected state
if self.name == "filter":
return "html" if radio.label == "HTML" else ""
elif self.name == "sort":
if radio.display_label == "+":
return f"+{radio.label}"
elif radio.display_label == "-":
return f"-{radio.label}"
return ""
elif self.name == "site":
return radio.label # or site ID/URL however you want to identify it
elif self.name == "crawler":
return radio.label
return ""
def calculate_group_width(self) -> int:
"""
Calculate the display width needed for a radio group.
"""
if not self.radios:
return DEFAULT_GROUP_WIDTH
return max(len(radio.label) for radio in self.radios)
def clear(self) -> None:
for r in self.radios:
r.index = 0
def set_layout_constraints(self, available_width: int, available_height: int, is_constrained: bool = False) -> None:
"""
Set layout constraints for grid-based groups (like sites).
Args:
available_width: Available horizontal space
available_height: Available vertical space
is_constrained: Whether layout is constrained (affects sites per column)
"""
self.__available_width = available_width
self.__available_height = available_height
self.__is_constrained = is_constrained
if self.name == "site":
self.__calculate_grid_layout()
def get_grid_position(self, radio_index: int) -> Tuple[int, int]:
"""
Convert linear radio index to grid position.
Only applies to site groups; other groups return (radio_index, 0).
Args:
radio_index: Linear index in radios list
Returns:
tuple: (row, column) position in grid layout
"""
if self.name != "site" or self.__sites_per_column == 0:
return (radio_index, 0)
row = radio_index % self.__sites_per_column
col = radio_index // self.__sites_per_column
return (row, col)
def get_index_from_grid(self, row: int, col: int) -> Optional[int]:
"""
Convert grid position to linear radio index.
Only works for site groups; returns None for other group types.
Args:
row: Row in grid (0-based)
col: Column in grid (0-based)
Returns:
Linear index if position exists within grid bounds, None otherwise
"""
if self.name != "site":
return row if 0 <= row < len(self.radios) else None
#
if self.__sites_per_column == 0:
return None
#
radio_index = col * self.__sites_per_column + row
if (0 <= radio_index < len(self.radios) and
radio_index < self.__sites_per_column * self.__max_columns):
return radio_index
return None
def navigate_left(self, current_radio_index: int) -> Optional[int]:
"""
Navigate left within this group's layout.
Args:
current_radio_index: Current position in radios list
Returns:
New radio index if navigation successful, None if should exit group
"""
if self.name != "site":
# don't support horizontal navigation
return None
current_row, current_col = self.get_grid_position(current_radio_index)
if current_col > 0:
# to previous column, same row
return self.get_index_from_grid(current_row, current_col - 1)
else:
# at leftmost column, signal exit to parent
return None
def navigate_right(self, current_radio_index: int) -> Optional[int]:
"""
Navigate right within this group's layout.
Args:
current_radio_index: Current position in radios list
Returns:
New radio index if navigation successful, None if should exit group
"""
if self.name != "site":
# don't support horizontal navigation
return None
current_row, current_col = self.get_grid_position(current_radio_index)
new_index = self.get_index_from_grid(current_row, current_col + 1)
return new_index # if invalid/out of bounds
def navigate_to_row(self, target_row: int, from_column: int = 0) -> Optional[int]:
"""
Navigate to a specific row from an external column position.
"""
if self.name != "site":
return target_row if 0 <= target_row < len(self.radios) else None
if self.__sites_per_column == 0:
return target_row if 0 <= target_row < len(self.radios) else None
return self.get_index_from_grid(target_row, from_column)
def get_row_from_index(self, radio_index: int) -> int:
"""
Get the row number for navigation between groups.
Args:
radio_index: Linear index in radios list
Returns:
Row number for inter-group navigation
"""
if self.name != "site":
return radio_index
row, _ = self.get_grid_position(radio_index)
return row
def __calculate_grid_layout(self) -> None:
"""
Calculate grid layout parameters for sites group.
"""
if self.name != "site":
return
self.__sites_per_column = (LAYOUT_CONSTRAINED_SITES_PER_COLUMN if self.__is_constrained
else min(self.__available_height - LAYOUT_SITES_GRID_OFFSET, len(self.radios)))
if self.__available_width > SITE_COLUMN_WIDTH:
self.__max_columns = max(1, self.__available_width // (SITE_COLUMN_WIDTH + LAYOUT_GRID_COLUMN_SPACING))
else:
self.__max_columns = 1
def __display_url(self, url: str) -> str:
return url.split("://")[-1].rstrip("/")
def __get_on_off_state(self) -> list:
return [
InputRadioState(" ", ""),
InputRadioState("●", "on")
]
def __load_crawlers(self) -> None:
# "archivebox", "httrack", "interrobot", "katana", "siteone", "warc", "wget"
self.radios = [
InputRadio(self, "crawler", label, 0, self.__get_on_off_state())
for label in VALID_CRAWLER_CHOICES
]
def __load_filters(self) -> None:
self.radios = [
InputRadio(self, "filter", "HTML", 0, self.__get_on_off_state()),
InputRadio(self, "filter", "any", 0, self.__get_on_off_state())
]
def __load_sites(self, sites: list) -> None:
site_labels = [self.__display_url(s.urls[0]) if s.urls else "unknown" for s in sites]
self.radios = [
InputRadio(self, "site", label, 0, self.__get_on_off_state())
for label in site_labels
]
def __load_sorts(self) -> None:
sort_states = [
InputRadioState(" ", ""),
InputRadioState("+", "+"),
InputRadioState("-", "-")
]
sort_labels = ["URL", "status", "size"]
self.radios = [
InputRadio(self, "sort", label, 0, sort_states.copy())
for label in sort_labels
]
class InputRadioState(NamedTuple):
label: str # "●", " ", "-", "+"
value: str # "", "+url", "-sort"
class InputText:
"""
A reusable text input field with cursor management, rendering, and input handling.
Consolidates the common text input functionality used across the application.
"""
def __init__(self, initial_value: str = "", max_length: int = None, label: str = ""):
"""
Initialize the text input field.
Args:
initial_value: Starting text value
max_length: Maximum allowed text length (None for unlimited)
label: Display label for the field
"""
self.value: str = initial_value
self.cursor_pos: int = len(initial_value)
self.max_length: int = max_length
self.label: str = label
self._last_display_cache: Optional[tuple] = None
self._last_value_hash: int = 0
def backspace(self) -> None:
"""
Remove the character before the cursor.
"""
if self.cursor_pos > 0:
self.value = self.value[:self.cursor_pos - 1] + self.value[self.cursor_pos:]
self.cursor_pos -= 1
def clear(self) -> None:
"""
Clear all text and reset cursor.
"""
self.value = ""
self.cursor_pos = 0
def delete(self) -> None:
"""
Remove the character at the cursor position.
"""
if self.cursor_pos < len(self.value):
self.value = self.value[:self.cursor_pos] + self.value[self.cursor_pos + 1:]
def end(self) -> None:
"""
Move cursor to the end of the text.
"""
self.cursor_pos = len(self.value)
def handle_input(self, key: int) -> bool:
"""
Handle keyboard input for the text field.
Args:
key: The curses key code
Returns:
bool: True if the input was handled, False otherwise
"""
handlers: dict[int, callable] = {
curses.KEY_LEFT: self.move_cursor_left,
curses.KEY_RIGHT: self.move_cursor_right,
curses.KEY_HOME: self.home,
curses.KEY_END: self.end,
curses.KEY_BACKSPACE: self.backspace,
127: self.backspace, # alternative backspace
8: self.backspace, # alternative backspace
curses.KEY_DC: self.delete,
}
handler = handlers.get(key)
if handler:
handler()
return True
if 32 <= key <= 126: # printable characters
char: str = chr(key)
self.insert_char(char)
return True
return False
def home(self) -> None:
"""
Move cursor to the beginning of the text.
"""
self.cursor_pos = 0
def insert_char(self, char: str) -> None:
"""
Insert a character at the current cursor position.
"""
sanitized = self.__sanitize_input(char)
if sanitized is None:
return
if self.max_length is not None and len(self.value) >= self.max_length:
return
self.value = self.value[:self.cursor_pos] + char + self.value[self.cursor_pos:]
self.cursor_pos += 1
def is_empty(self) -> bool:
"""
Check if the text field is empty.
"""
return len(self.value.strip()) == 0
def move_cursor_left(self) -> None:
"""
Move cursor one position to the left.
"""
if self.cursor_pos > 0:
self.cursor_pos -= 1
def move_cursor_right(self) -> None:
"""
Move cursor one position to the right.
"""
if self.cursor_pos < len(self.value):
self.cursor_pos += 1
def render(self, stdscr: curses.window, y: int, x: int, width: int,
focused: bool = False, style: int = None) -> None:
"""
Render the text input field with box, text, and cursor.
Args:
stdscr: The curses window
y: Y position to render at
x: X position to render at
width: Total width of the input box
focused: Whether this field has focus (shows cursor)
style: Curses style attributes to apply
"""
# account for [ ] brackets
inner_width = max(1, width - INPUT_BOX_BRACKET_WIDTH)
display_text, display_cursor_pos = self.__calculate_display_text_and_cursor(inner_width)
box_content = f"[{display_text.ljust(inner_width)}]"
if style is None:
style = curses.A_REVERSE if focused else curses.A_NORMAL
safe_addstr(stdscr, y, x, box_content, style)
if focused:
self.__render_cursor(stdscr, y, x, display_text, display_cursor_pos, inner_width)
def set_value(self, new_value: str) -> None:
"""
Set the text value and adjust cursor if needed.
"""
self.value = new_value
# cursor doesn't go beyond text length
self.cursor_pos = min(self.cursor_pos, len(self.value))
def __sanitize_input(self, char: str) -> Optional[str]:
"""
Sanitize input character, return None if should be rejected
"""
# strip control characters
if ord(char) < 32 or ord(char) == 127:
return None
# add more checks here as needed
return char
def __render_cursor(self, stdscr: curses.window, y: int, x: int,
display_text: str, display_cursor_pos: int, inner_width: int) -> None:
"""
Render the cursor at the appropriate position.
Args:
stdscr: The curses window
y: Y position of the input box
x: X position of the input box
display_text: The currently displayed text
display_cursor_pos: Where the cursor appears in the displayed text
inner_width: Available width inside the box
"""
try:
if display_cursor_pos < len(display_text) and display_cursor_pos < inner_width:
cursor_x = x + 1 + display_cursor_pos
# highlight the character under cursor instead of just reversing
char_under_cursor = display_text[display_cursor_pos]
safe_addstr(stdscr, y, cursor_x, char_under_cursor, curses.A_REVERSE | curses.A_BOLD)
elif display_cursor_pos >= 0 and x + 1 + display_cursor_pos < x + 1 + inner_width:
# cursor at end - underscore
cursor_x = x + 1 + display_cursor_pos
safe_addstr(stdscr, y, cursor_x, '_', curses.A_REVERSE | curses.A_BOLD)
except curses.error:
pass
def __calculate_display_text_and_cursor(self, inner_width: int) -> tuple[str, int]:
"""
Calculate what portion of text to display and where the cursor should appear.
Handles horizontal scrolling for long text.
Args:
inner_width: Available width inside the input box
Returns:
tuple: (display_text, display_cursor_position)
"""
current_hash = hash((self.value, self.cursor_pos, inner_width))
if current_hash == self._last_value_hash and self._last_display_cache:
return self._last_display_cache
if len(self.value) <= inner_width:
# text fits entirely
return self.value, self.cursor_pos
# text is longer than available space, scroll
if self.cursor_pos >= inner_width - CURSOR_SCROLL_THRESHOLD:
start_pos = max(0, len(self.value) - inner_width)
display_text = self.value[start_pos:]
display_cursor_pos = self.cursor_pos - start_pos
else:
display_text = self.value[:inner_width]
display_cursor_pos = min(self.cursor_pos, inner_width)
return display_text, display_cursor_pos
class ViewBounds:
def __init__(self, x: int = 0, y: int = 0, width: int = 0, height: int = 0):
self.x = x
self.y = y
self.width = width
self.height = height
```
--------------------------------------------------------------------------------
/src/mcp_server_webcrawl/interactive/views/results.py:
--------------------------------------------------------------------------------
```python
import curses
import textwrap
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
from mcp_server_webcrawl.interactive.ui import ThemeDefinition, UiState, ViewBounds
from mcp_server_webcrawl.interactive.views.base import BaseCursesView
from mcp_server_webcrawl.interactive.highlights import HighlightProcessor, HighlightSpan
from mcp_server_webcrawl.models.resources import ResourceResult
from mcp_server_webcrawl.interactive.ui import safe_addstr
if TYPE_CHECKING:
from mcp_server_webcrawl.interactive.session import InteractiveSession
SEARCH_RESULT_SNIPPET_MARGIN: int = 6
SEARCH_RESULT_SNIPPET_MAX_LINES: int = 6
LAYOUT_ZERO_PAD_THRESHOLD = 10
LAYOUT_RESULT_METADATA_SPACING = 2
LAYOUT_RESULT_LINE_MARGIN = 2
LAYOUT_RESULT_WIDTH_BUFFER = 4
LAYOUT_FOOTER_MARGIN = 2
LAYOUT_FOOTER_TEXT_SPACING = 3
LAYOUT_HEADER_FOOTER_HEIGHT = 2
LAYOUT_STATUS_MESSAGE_X_OFFSET = 2
HTTP_ERROR_THRESHOLD = 500
HTTP_WARN_THRESHOLD = 400
TYPE_FIELD_WIDTH = 7
SIZE_FIELD_WIDTH = 7
URL_PADDING_BUFFER = 3
@dataclass
class SnippetData:
"""
Container for processed snippet data.
"""
clean_text: str
highlights: list[HighlightSpan]
wrapped_lines: list[str]
def get_capped_line_count(self) -> int:
"""
Get the line count capped at maximum allowed snippet lines.
Returns:
int: The minimum of wrapped lines count and maximum snippet lines
"""
return min(len(self.wrapped_lines), SEARCH_RESULT_SNIPPET_MAX_LINES)
class SearchResultsView(BaseCursesView):
"""
A renderable curses view, but takes cues from searchform, which will handle
all input on this screen.
"""
def __init__(self, session: 'InteractiveSession'):
"""
Initialize the search results view.
Args:
session: The interactive session instance
"""
super().__init__(session)
self.__results: list[ResourceResult] = []
self.__results_total: int = 0
self.__results_indexer_status: str = ""
self.__results_indexer_processed: int = 0
self.__results_indexer_duration: float = 0
self.__scroll_offset: int = 0
self._focused: bool = False
self.__displayed_results: int = 0
@property
def indexing_time(self) -> float:
return self.__results_indexer_duration
@property
def results(self) -> list[ResourceResult]:
return self.__results
@property
def results_total(self) -> int:
return self.__results_total
def clear(self) -> None:
"""
Clear all results and reset state.
"""
self.__results = []
self.__results_total = 0
self._selected_index = 0
self.__scroll_offset = 0
def draw_inner_footer(self, stdscr: curses.window, bounds: ViewBounds, text: str) -> None:
"""
Draw footer with pagination on left and indexing info on right.
Args:
stdscr: The curses window to draw on
bounds: The view bounds defining the drawing area
text: The footer text to display
"""
footer_y: int = bounds.y + bounds.height - 1
safe_addstr(stdscr, footer_y, bounds.x, self._get_bounded_line(), self._get_inner_header_style())
if not self.__results:
left_text: str = ""
else:
searchform_offset: int = self.session.searchform.offset
index_start: int = searchform_offset + 1 # 1-based indexing for display
index_end: int = searchform_offset + len(self.__results)
left_text = f"Displaying {index_start:,}-{index_end:,} of {self.__results_total:,}"
if self.__results_indexer_processed > 0:
duration_seconds: float = self.__results_indexer_duration
right_text: str = f"{self.__results_indexer_processed:,} Indexed ({duration_seconds:.2f}s)"
else:
right_text = ""
max_width: int = bounds.width - LAYOUT_FOOTER_MARGIN
if left_text:
if len(left_text) > max_width // 2:
left_text = f"{left_text[:max_width // 2 - 1]}…"
safe_addstr(stdscr, footer_y, bounds.x + 1, left_text, self._get_inner_header_style())
if right_text:
right_text_len: int = len(right_text)
if right_text_len <= max_width:
# right text doesn't overlap with left text
min_right_x: int = bounds.x + 1 + len(left_text) + LAYOUT_FOOTER_TEXT_SPACING if left_text else bounds.x + 1
right_x: int = max(min_right_x, bounds.x + bounds.width - right_text_len - 1)
# draw if enough space
if right_x + right_text_len < bounds.x + bounds.width:
safe_addstr(stdscr, footer_y, right_x, right_text, self._get_inner_header_style())
def draw_inner_header(self, stdscr: curses.window, bounds: ViewBounds, text: str) -> None:
"""
Draw the application header with results count on left and search time on right.
Args:
stdscr: The curses window to draw on
bounds: The view bounds defining the drawing area
text: The header text to display
"""
header_y: int = bounds.y
# write out a line, then update it
safe_addstr(stdscr, header_y, bounds.x, self._get_bounded_line(), self._get_inner_header_style())
# results count
if self.__results and not (self.session.searchman.is_searching()):
left_text: str = f"Results ({self.__results_total:,} Found)"
else:
left_text = "Results:"
# 1 char margin on each side
max_width: int = bounds.width - LAYOUT_FOOTER_MARGIN
if left_text:
if len(left_text) > max_width // 2: # no more than half
left_text = f"{left_text[:max_width // 2 - 1]}…"
safe_addstr(stdscr, header_y, bounds.x + 1, left_text, self._get_inner_header_style())
def get_selected_result(self) -> Optional[ResourceResult]:
"""
Get the currently selected search result.
Returns:
Optional[ResourceResult]: The selected result or None if no valid selection
"""
if 0 <= self._selected_index < len(self.__results):
return self.__results[self._selected_index]
return None
def handle_input(self, key: int) -> bool:
"""
Handle keyboard input for results navigation and selection.
Args:
key: The curses key code from user input
Returns:
bool: True if the input was handled, False otherwise
"""
if not self._focused or not self.__results:
return False
def handle_page_previous() -> None:
if self.session.searchform.page_previous():
self.session.searchman.autosearch()
def handle_page_next() -> None:
if self.session.searchform.page_next(self.__results_total):
self.session.searchman.autosearch()
handlers: dict[int, callable] = {
curses.KEY_LEFT: handle_page_previous,
curses.KEY_RIGHT: handle_page_next,
curses.KEY_UP: self.__select_previous,
curses.KEY_DOWN: self.__select_next,
ord('\n'): self.__handle_document_selection,
ord('\r'): self.__handle_document_selection,
}
handler: Optional[callable] = handlers.get(key)
if handler:
handler()
return True
return False
def render(self, stdscr: curses.window) -> None:
"""
Render only the results content - headers/footers handled by session.
Args:
stdscr: The curses window to draw on
"""
if not self._renderable(stdscr):
return
xb: int = self.bounds.x
yb: int = self.bounds.y
y_current: int = yb + 1
# create content area excluding header/footer rows
# header takes row 0, footer takes row height-1, content gets the middle
if self.bounds.height <= LAYOUT_HEADER_FOOTER_HEIGHT:
return
# check if search is in progress
is_searching: bool = self.session.searchman.is_searching()
message: str = ""
if is_searching:
message = "Searching…"
elif not self.__results:
if self.__results_indexer_status in ("idle", "indexing", ""):
message = "Indexing…"
else:
message = "No results found."
if message != "":
safe_addstr(stdscr, y_current, LAYOUT_STATUS_MESSAGE_X_OFFSET, message, curses.A_DIM)
else:
self.__render_results_list(stdscr, y_current, 0)
def update(self, results: list[ResourceResult], total: int, indexer_status: str, indexer_processed: int, indexer_duration: float) -> None:
"""
Update the search results view with new data and reset selection.
Args:
results: List of search result resources for current page
total: Total number of results across all pages
indexer_processed: Number of resources processed during indexing
indexer_duration: Time taken for indexing in seconds
"""
self.__results = results
self.__results_total = total
self.__results_indexer_status = indexer_status
self.__results_indexer_processed = indexer_processed
self.__results_indexer_duration = indexer_duration
self._selected_index = 0
self.__scroll_offset = 0
def __ensure_visible(self) -> None:
"""
Ensure selected item is completely visible in viewport with line-by-line scrolling.
"""
if not self.__results or self._selected_index >= len(self.__results):
return
result_line_positions: list[int] = []
result_line_counts: list[int] = []
current_line: int = 0
for result in self.__results:
result_line_positions.append(current_line)
lines_for_this_result: int = 1
current_line += 1
snippet: Optional[str] = result.get_extra("snippets")
if snippet and snippet.strip():
snippet_data: SnippetData = self.__process_snippet(snippet)
snippet_lines: int = min(len(snippet_data.wrapped_lines), SEARCH_RESULT_SNIPPET_MAX_LINES)
lines_for_this_result += snippet_lines
current_line += snippet_lines
result_line_counts.append(lines_for_this_result)
selected_start_line: int = result_line_positions[self._selected_index]
selected_total_lines: int = result_line_counts[self._selected_index]
selected_end_line: int = selected_start_line + selected_total_lines - 1
visible_height: int = self.bounds.height - LAYOUT_HEADER_FOOTER_HEIGHT # account for header/footer
if selected_start_line < self.__scroll_offset:
self.__scroll_offset = selected_start_line
elif selected_end_line >= self.__scroll_offset + visible_height:
self.__scroll_offset = max(0, selected_end_line - visible_height + 1)
if self._selected_index + 1 < len(result_line_positions):
next_result_line: int = result_line_positions[self._selected_index + 1]
if next_result_line < self.__scroll_offset + visible_height:
self.__scroll_offset = min(self.__scroll_offset, next_result_line - visible_height + 1)
def __handle_document_selection(self) -> None:
"""
Handle document viewing when ENTER is pressed on a result.
"""
selected_result: Optional[ResourceResult] = self.get_selected_result()
if not selected_result or not selected_result.id:
return
selected_sites = self.session.searchform.get_selected_sites()
site_ids: list[int] = [site.id for site in selected_sites] if selected_sites else []
try:
query: str = f"id: {selected_result.id}"
query_api = self.session.crawler.get_resources_api(
sites=site_ids if site_ids else None,
query=query,
offset=0,
limit=1,
fields=["headers", "content", "status", "size"],
extras=["markdown"]
)
document_results: list[ResourceResult] = query_api.get_results()
if document_results:
self.session.document.update(document_results[0])
self.session.set_ui_state(UiState.DOCUMENT)
except Exception:
pass
def __process_snippet(self, snippet_text: str) -> SnippetData:
"""
Process raw snippet text using shared highlight utility.
Args:
snippet_text: Raw snippet text with highlight markers
Returns:
SnippetData: Processed data with clean text, highlight positions, and wrapped lines
"""
clean_text, highlights = HighlightProcessor.extract_snippet_highlights(snippet_text)
snippet_width: int = self.bounds.width - (SEARCH_RESULT_SNIPPET_MARGIN * 2)
wrapped_text: str = textwrap.fill(
clean_text,
width=snippet_width,
expand_tabs=True,
replace_whitespace=True,
break_long_words=True,
break_on_hyphens=True,
)
wrapped_lines: list[str] = wrapped_text.split("\n")
return SnippetData(
clean_text=clean_text,
highlights=highlights,
wrapped_lines=wrapped_lines
)
def __render_results_list(self, stdscr: curses.window, start_y: int, margin_x: int) -> None:
"""
Render results with metadata and snippets, respecting scroll offset.
Args:
stdscr: The curses window to draw on
start_y: Starting Y position for rendering
margin_x: Left margin for content
"""
xb: int = self.bounds.x
yb: int = self.bounds.y
y_current: int = start_y
y_max: int = yb + self.bounds.height
y_available: int = y_max - start_y
searchform_offset: int = self.session.searchform.offset
displayed_results: int = 0
# for scrolling
current_line: int = 0
for result_index in range(len(self.__results)):
if y_current >= start_y + y_available:
break
result: ResourceResult = self.__results[result_index]
is_selected: bool = self._focused and result_index == self._selected_index
global_result_num: int = searchform_offset + result_index + 1
# check if skip due to scrolling
if current_line < self.__scroll_offset:
current_line += 1
snippet: Optional[str] = result.get_extra("snippets")
if snippet and snippet.strip():
snippet_data: SnippetData = self.__process_snippet(snippet)
current_line += snippet_data.get_capped_line_count()
continue
# leading zero for 01-09, natural 10+
result_num: str
if global_result_num < LAYOUT_ZERO_PAD_THRESHOLD:
result_num = f"{global_result_num:02d}. "
else:
result_num = f"{global_result_num}. "
url: str = result.url or "No URL"
metadata_parts: list[tuple[str, int]] = []
# resource type
if result.type.value:
type_str: str = f"[{result.type.value}]"
type_str = f"{type_str:>{TYPE_FIELD_WIDTH}}"
metadata_parts.append((type_str, curses.A_NORMAL))
# file size
humanized_bytes: str = BaseCursesView.humanized_bytes(result)
if humanized_bytes and humanized_bytes != "0B":
metadata_parts.append((f"{humanized_bytes:>{SIZE_FIELD_WIDTH}}", curses.A_NORMAL))
# HTTP status
status_style = curses.A_NORMAL
if result.status >= HTTP_ERROR_THRESHOLD:
status_style = self.session.get_theme_color_pair(ThemeDefinition.HTTP_ERROR)
elif result.status >= HTTP_WARN_THRESHOLD:
status_style = self.session.get_theme_color_pair(ThemeDefinition.HTTP_WARN)
metadata_parts.append((str(result.status), status_style))
metadata_text: str = " ".join(part[0] for part in metadata_parts)
line_x: int = margin_x + LAYOUT_RESULT_LINE_MARGIN
available_width: int = min(self.bounds.width - LAYOUT_RESULT_WIDTH_BUFFER, self.bounds.width - line_x)
selected_style: int = curses.A_REVERSE if is_selected else curses.A_NORMAL
if metadata_parts:
url_space: int = available_width - len(result_num) - len(metadata_text) - URL_PADDING_BUFFER
if len(url) > url_space:
url = url[:max(0, url_space - 1)] + "…"
padding: int = available_width - len(result_num) - len(url) - len(metadata_text)
result_url_part: str = f"{result_num}{url}"
safe_addstr(stdscr, y_current, line_x, result_url_part, selected_style)
metadata_start_x: int = line_x + len(result_url_part)
if padding > 0 and metadata_start_x < line_x + available_width:
safe_addstr(stdscr, y_current, metadata_start_x, " " * padding, curses.A_NORMAL)
metadata_start_x += padding
for part_text, part_style in metadata_parts:
if metadata_start_x < line_x + available_width:
safe_addstr(stdscr, y_current, metadata_start_x, part_text, part_style)
metadata_start_x += len(part_text) + LAYOUT_RESULT_METADATA_SPACING
else:
url_space = available_width - len(result_num)
if len(url) > url_space:
url = url[:max(0, url_space - 1)] + "…"
result_line: str = f"{result_num}{url}"
safe_addstr(stdscr, y_current, line_x, result_line[:available_width], selected_style)
y_current += 1
current_line += 1
displayed_results += 1
snippet = result.get_extra("snippets")
if snippet and snippet.strip():
if y_current < start_y + y_available and y_current < y_max:
snippet_data = self.__process_snippet(snippet)
snippet_lines: int = min(len(snippet_data.wrapped_lines), SEARCH_RESULT_SNIPPET_MAX_LINES)
lines_to_skip: int = max(0, self.__scroll_offset - current_line)
if lines_to_skip < snippet_lines:
lines_rendered: int = self.__render_snippet_with_highlights(stdscr, snippet_data, y_current)
y_current += lines_rendered
current_line += snippet_lines
else:
current_line += snippet_lines
self.__displayed_results = displayed_results
def __render_snippet_with_highlights(self, stdscr: curses.window, snippet_data: SnippetData, y: int) -> int:
"""
Render a snippet using the processed snippet data with proper highlighting.
Args:
stdscr: The curses window to draw on
snippet_data: Processed snippet data with highlights
y: Starting Y position for rendering
Returns:
int: The number of lines actually rendered
"""
lines_to_render: int = min(len(snippet_data.wrapped_lines), SEARCH_RESULT_SNIPPET_MAX_LINES)
lines_rendered: int = 0
snippet_default_pair: int = self.session.get_theme_color_pair(ThemeDefinition.SNIPPET_DEFAULT)
snippet_highlight_pair: int = self.session.get_theme_color_pair(ThemeDefinition.SNIPPET_HIGHLIGHT)
# track character position in the original clean text
# this allows replacing ** highlights with natural text wrapping
char_position: int = 0
for i in range(lines_to_render):
if i >= len(snippet_data.wrapped_lines):
break
line_text: str = snippet_data.wrapped_lines[i]
if not line_text.strip():
char_position += len(line_text) + 1 # +1 for newline
continue
current_y: int = y + i
current_x: int = SEARCH_RESULT_SNIPPET_MARGIN
line_highlights: list[dict[str, int]] = []
line_end_pos: int = char_position + len(line_text)
for highlight in snippet_data.highlights:
if (highlight.start < line_end_pos and highlight.end > char_position):
# highlight intersects with current line
highlight_start_in_line: int = max(0, highlight.start - char_position)
highlight_end_in_line: int = min(len(line_text), highlight.end - char_position)
line_highlights.append({
"start": highlight_start_in_line,
"end": highlight_end_in_line
})
line_highlights.sort(key=lambda x: x["start"])
pos: int = 0
max_width: int = self.bounds.width - current_x - LAYOUT_RESULT_WIDTH_BUFFER
for highlight in line_highlights:
# text before highlight
if highlight["start"] > pos:
text_before: str = line_text[pos:highlight["start"]]
if current_x - SEARCH_RESULT_SNIPPET_MARGIN + len(text_before) <= max_width:
safe_addstr(stdscr, current_y, current_x, text_before, snippet_default_pair)
current_x += len(text_before)
pos = highlight["start"]
# highlighted text
highlighted_text: str = line_text[highlight["start"]:highlight["end"]]
if current_x - SEARCH_RESULT_SNIPPET_MARGIN + len(highlighted_text) <= max_width:
safe_addstr(stdscr, current_y, current_x, highlighted_text, snippet_highlight_pair)
current_x += len(highlighted_text)
pos = highlight["end"]
# remaining
if pos < len(line_text):
remaining_text: str = line_text[pos:]
remaining_width: int = max_width - (current_x - SEARCH_RESULT_SNIPPET_MARGIN)
if remaining_width > 0:
safe_addstr(stdscr, current_y, current_x, remaining_text[:remaining_width], snippet_default_pair)
# advance by the actual line length
char_position += len(line_text)
# add space if there's actually a space in the original text (otherwise hyphen off by one)
if (char_position < len(snippet_data.clean_text) and
snippet_data.clean_text[char_position].isspace()):
char_position += 1
lines_rendered += 1
return lines_rendered
def __select_next(self) -> None:
"""
Move selection to the next result.
"""
if self._selected_index < len(self.__results) - 1:
self._selected_index += 1
self.__ensure_visible()
def __select_previous(self) -> None:
"""
Move selection to the previous result.
"""
if self._selected_index > 0:
self._selected_index -= 1
self.__ensure_visible()
```
--------------------------------------------------------------------------------
/docs/_modules/mcp_server_webcrawl/crawlers/siteone/tests.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html class="writer-html5" lang="en" data-content_root="../../../../">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>mcp_server_webcrawl.crawlers.siteone.tests — mcp-server-webcrawl documentation</title>
<link rel="stylesheet" type="text/css" href="../../../../_static/pygments.css?v=80d5e7a1" />
<link rel="stylesheet" type="text/css" href="../../../../_static/css/theme.css?v=e59714d7" />
<script src="../../../../_static/jquery.js?v=5d32c60e"></script>
<script src="../../../../_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script>
<script src="../../../../_static/documentation_options.js?v=5929fcd5"></script>
<script src="../../../../_static/doctools.js?v=888ff710"></script>
<script src="../../../../_static/sphinx_highlight.js?v=dc90522c"></script>
<script src="../../../../_static/js/theme.js"></script>
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home">
mcp-server-webcrawl
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" aria-label="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="Navigation menu">
<p class="caption" role="heading"><span class="caption-text">Contents:</span></p>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../guides.html">Setup Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../usage.html">Usage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../prompts.html">Prompt Routines</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../modules.html">mcp_server_webcrawl</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="Mobile navigation menu" >
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">mcp-server-webcrawl</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="Page navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html" class="icon icon-home" aria-label="Home"></a></li>
<li class="breadcrumb-item"><a href="../../../index.html">Module code</a></li>
<li class="breadcrumb-item"><a href="../../crawlers.html">mcp_server_webcrawl.crawlers</a></li>
<li class="breadcrumb-item active">mcp_server_webcrawl.crawlers.siteone.tests</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for mcp_server_webcrawl.crawlers.siteone.tests</h1><div class="highlight"><pre>
<span></span><span class="kn">from</span> <span class="nn">mcp_server_webcrawl.crawlers.siteone.crawler</span> <span class="kn">import</span> <span class="n">SiteOneCrawler</span>
<span class="kn">from</span> <span class="nn">mcp_server_webcrawl.crawlers.base.tests</span> <span class="kn">import</span> <span class="n">BaseCrawlerTests</span>
<span class="kn">from</span> <span class="nn">mcp_server_webcrawl.crawlers</span> <span class="kn">import</span> <span class="n">get_fixture_directory</span>
<span class="kn">from</span> <span class="nn">mcp_server_webcrawl.crawlers.siteone.adapter</span> <span class="kn">import</span> <span class="n">SiteOneManager</span>
<span class="kn">from</span> <span class="nn">mcp_server_webcrawl.utils.logger</span> <span class="kn">import</span> <span class="n">get_logger</span>
<span class="n">logger</span> <span class="o">=</span> <span class="n">get_logger</span><span class="p">()</span>
<span class="c1"># calculate using same hash function as adapter</span>
<span class="n">EXAMPLE_SITE_ID</span> <span class="o">=</span> <span class="n">SiteOneManager</span><span class="o">.</span><span class="n">string_to_id</span><span class="p">(</span><span class="s2">"example.com"</span><span class="p">)</span>
<span class="n">PRAGMAR_SITE_ID</span> <span class="o">=</span> <span class="n">SiteOneManager</span><span class="o">.</span><span class="n">string_to_id</span><span class="p">(</span><span class="s2">"pragmar.com"</span><span class="p">)</span>
<div class="viewcode-block" id="SiteOneTests">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests">[docs]</a>
<span class="k">class</span> <span class="nc">SiteOneTests</span><span class="p">(</span><span class="n">BaseCrawlerTests</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test suite for the SiteOne crawler implementation.</span>
<span class="sd"> Uses all wrapped test methods from BaseCrawlerTests plus SiteOne-specific features.</span>
<span class="sd"> """</span>
<div class="viewcode-block" id="SiteOneTests.setUp">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.setUp">[docs]</a>
<span class="k">def</span> <span class="nf">setUp</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Set up the test environment with fixture data.</span>
<span class="sd"> """</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="n">setUp</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span> <span class="o">=</span> <span class="n">get_fixture_directory</span><span class="p">()</span> <span class="o">/</span> <span class="s2">"siteone"</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_pulse">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_pulse">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_pulse</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test basic crawler initialization.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertIsNotNone</span><span class="p">(</span><span class="n">crawler</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertTrue</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="o">.</span><span class="n">is_dir</span><span class="p">())</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_sites">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_sites">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_sites</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test site retrieval API functionality.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_pragmar_site_tests</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_search">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_search">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_search</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test boolean search functionality</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_pragmar_search_tests</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_resources">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_resources">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_resources</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test resource retrieval API functionality with various parameters.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_sites_resources_tests</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">,</span> <span class="n">EXAMPLE_SITE_ID</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_interrobot_images">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_interrobot_images">[docs]</a>
<span class="k">def</span> <span class="nf">test_interrobot_images</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test InterroBot-specific image handling and thumbnails.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_pragmar_image_tests</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_sorts">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_sorts">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_sorts</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test random sort functionality using the '?' sort parameter.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_pragmar_sort_tests</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_content_parsing">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_content_parsing">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_content_parsing</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test content type detection and parsing.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_pragmar_content_tests</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">,</span> <span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_siteone_advanced_features">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_siteone_advanced_features">[docs]</a>
<span class="k">def</span> <span class="nf">test_siteone_advanced_features</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Test SiteOne-specific advanced features not covered by base tests.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="c1"># numeric status operators (SiteOne-specific feature)</span>
<span class="n">status_resources_gt</span> <span class="o">=</span> <span class="n">crawler</span><span class="o">.</span><span class="n">get_resources_api</span><span class="p">(</span>
<span class="n">sites</span><span class="o">=</span><span class="p">[</span><span class="n">PRAGMAR_SITE_ID</span><span class="p">],</span>
<span class="n">query</span><span class="o">=</span><span class="s2">"status: >400"</span><span class="p">,</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertTrue</span><span class="p">(</span><span class="n">status_resources_gt</span><span class="o">.</span><span class="n">total</span> <span class="o">></span> <span class="mi">0</span><span class="p">,</span> <span class="s2">"Numeric status operator should return results"</span><span class="p">)</span>
<span class="k">for</span> <span class="n">resource</span> <span class="ow">in</span> <span class="n">status_resources_gt</span><span class="o">.</span><span class="n">_results</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertGreater</span><span class="p">(</span><span class="n">resource</span><span class="o">.</span><span class="n">status</span><span class="p">,</span> <span class="mi">400</span><span class="p">)</span>
<span class="c1"># redirect status codes</span>
<span class="n">status_resources_redirect</span> <span class="o">=</span> <span class="n">crawler</span><span class="o">.</span><span class="n">get_resources_api</span><span class="p">(</span>
<span class="n">sites</span><span class="o">=</span><span class="p">[</span><span class="n">PRAGMAR_SITE_ID</span><span class="p">],</span>
<span class="n">query</span><span class="o">=</span><span class="s2">"status: 301"</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertTrue</span><span class="p">(</span><span class="n">status_resources_redirect</span><span class="o">.</span><span class="n">total</span> <span class="o">></span> <span class="mi">0</span><span class="p">,</span> <span class="s2">"301 status filtering should return results"</span><span class="p">)</span>
<span class="k">for</span> <span class="n">resource</span> <span class="ow">in</span> <span class="n">status_resources_redirect</span><span class="o">.</span><span class="n">_results</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span><span class="n">resource</span><span class="o">.</span><span class="n">status</span><span class="p">,</span> <span class="mi">301</span><span class="p">)</span>
<span class="c1"># 404 with size validation</span>
<span class="n">status_resources_not_found</span> <span class="o">=</span> <span class="n">crawler</span><span class="o">.</span><span class="n">get_resources_api</span><span class="p">(</span>
<span class="n">sites</span><span class="o">=</span><span class="p">[</span><span class="n">PRAGMAR_SITE_ID</span><span class="p">],</span>
<span class="n">query</span><span class="o">=</span><span class="s2">"status: 404"</span><span class="p">,</span>
<span class="n">fields</span><span class="o">=</span><span class="p">[</span><span class="s2">"size"</span><span class="p">]</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertTrue</span><span class="p">(</span><span class="n">status_resources_not_found</span><span class="o">.</span><span class="n">total</span> <span class="o">></span> <span class="mi">0</span><span class="p">,</span> <span class="s2">"404 status filtering should return results"</span><span class="p">)</span>
<span class="k">for</span> <span class="n">resource</span> <span class="ow">in</span> <span class="n">status_resources_not_found</span><span class="o">.</span><span class="n">_results</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertEqual</span><span class="p">(</span><span class="n">resource</span><span class="o">.</span><span class="n">status</span><span class="p">,</span> <span class="mi">404</span><span class="p">)</span>
<span class="n">not_found_result</span> <span class="o">=</span> <span class="n">status_resources_not_found</span><span class="o">.</span><span class="n">_results</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">to_dict</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertIn</span><span class="p">(</span><span class="s2">"size"</span><span class="p">,</span> <span class="n">not_found_result</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertGreater</span><span class="p">(</span><span class="n">not_found_result</span><span class="p">[</span><span class="s2">"size"</span><span class="p">],</span> <span class="mi">0</span><span class="p">,</span> <span class="s2">"404 responses should still have size > 0"</span><span class="p">)</span>
<span class="n">custom_fields</span> <span class="o">=</span> <span class="p">[</span><span class="s2">"content"</span><span class="p">,</span> <span class="s2">"headers"</span><span class="p">,</span> <span class="s2">"time"</span><span class="p">]</span>
<span class="n">field_resources</span> <span class="o">=</span> <span class="n">crawler</span><span class="o">.</span><span class="n">get_resources_api</span><span class="p">(</span>
<span class="n">sites</span><span class="o">=</span><span class="p">[</span><span class="n">PRAGMAR_SITE_ID</span><span class="p">],</span>
<span class="n">fields</span><span class="o">=</span><span class="n">custom_fields</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertTrue</span><span class="p">(</span><span class="n">field_resources</span><span class="o">.</span><span class="n">total</span> <span class="o">></span> <span class="mi">0</span><span class="p">)</span>
<span class="c1"># Test the SiteOne-specific forcefield dict method</span>
<span class="n">resource_dict</span> <span class="o">=</span> <span class="n">field_resources</span><span class="o">.</span><span class="n">_results</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">to_forcefield_dict</span><span class="p">(</span><span class="n">custom_fields</span><span class="p">)</span>
<span class="k">for</span> <span class="n">field</span> <span class="ow">in</span> <span class="n">custom_fields</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">assertIn</span><span class="p">(</span><span class="n">field</span><span class="p">,</span> <span class="n">resource_dict</span><span class="p">,</span> <span class="sa">f</span><span class="s2">"Field '</span><span class="si">{</span><span class="n">field</span><span class="si">}</span><span class="s2">' should be in forcefield response"</span><span class="p">)</span></div>
<div class="viewcode-block" id="SiteOneTests.test_report">
<a class="viewcode-back" href="../../../../mcp_server_webcrawl.crawlers.siteone.html#mcp_server_webcrawl.crawlers.siteone.tests.SiteOneTests.test_report">[docs]</a>
<span class="k">def</span> <span class="nf">test_report</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="w"> </span><span class="sd">"""</span>
<span class="sd"> Run test report, save to data directory.</span>
<span class="sd"> """</span>
<span class="n">crawler</span> <span class="o">=</span> <span class="n">SiteOneCrawler</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_datasrc</span><span class="p">)</span>
<span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_pragmar_report</span><span class="p">(</span><span class="n">crawler</span><span class="p">,</span> <span class="n">PRAGMAR_SITE_ID</span><span class="p">,</span> <span class="s2">"SiteOne"</span><span class="p">))</span></div>
</div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>© Copyright 2025, pragmar.</p>
</div>
Built with <a href="https://www.sphinx-doc.org/">Sphinx</a> using a
<a href="https://github.com/readthedocs/sphinx_rtd_theme">theme</a>
provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script>
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>
```