This is page 2 of 7. Use http://codebase.md/datalab-to/marker?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── ISSUE_TEMPLATE │ │ ├── breaking-bug-report.md │ │ ├── feature_request.md │ │ └── output-bug-report.md │ └── workflows │ ├── benchmarks.yml │ ├── ci.yml │ ├── cla.yml │ ├── publish.yml │ └── scripts.yml ├── .gitignore ├── .pre-commit-config.yaml ├── benchmarks │ ├── __init__.py │ ├── overall │ │ ├── __init__.py │ │ ├── display │ │ │ ├── __init__.py │ │ │ ├── dataset.py │ │ │ └── table.py │ │ ├── download │ │ │ ├── __init__.py │ │ │ ├── base.py │ │ │ ├── llamaparse.py │ │ │ ├── main.py │ │ │ ├── mathpix.py │ │ │ └── mistral.py │ │ ├── elo.py │ │ ├── methods │ │ │ ├── __init__.py │ │ │ ├── docling.py │ │ │ ├── gt.py │ │ │ ├── llamaparse.py │ │ │ ├── marker.py │ │ │ ├── mathpix.py │ │ │ ├── mistral.py │ │ │ ├── olmocr.py │ │ │ └── schema.py │ │ ├── overall.py │ │ ├── registry.py │ │ ├── schema.py │ │ └── scorers │ │ ├── __init__.py │ │ ├── clean.py │ │ ├── heuristic.py │ │ ├── llm.py │ │ └── schema.py │ ├── table │ │ ├── __init__.py │ │ ├── gemini.py │ │ ├── inference.py │ │ ├── scoring.py │ │ └── table.py │ ├── throughput │ │ ├── __init__.py │ │ └── main.py │ └── verify_scores.py ├── chunk_convert.py ├── CLA.md ├── convert_single.py ├── convert.py ├── data │ ├── .gitignore │ ├── examples │ │ ├── json │ │ │ ├── multicolcnn.json │ │ │ ├── switch_trans.json │ │ │ └── thinkpython.json │ │ └── markdown │ │ ├── multicolcnn │ │ │ ├── _page_1_Figure_0.jpeg │ │ │ ├── _page_2_Picture_0.jpeg │ │ │ ├── _page_6_Figure_0.jpeg │ │ │ ├── _page_7_Figure_0.jpeg │ │ │ ├── multicolcnn_meta.json │ │ │ └── multicolcnn.md │ │ ├── switch_transformers │ │ │ ├── _page_11_Figure_4.jpeg │ │ │ ├── _page_12_Figure_4.jpeg │ │ │ ├── _page_13_Figure_2.jpeg │ │ │ ├── _page_18_Figure_1.jpeg │ │ │ ├── _page_18_Figure_3.jpeg │ │ │ ├── _page_2_Figure_3.jpeg │ │ │ ├── _page_20_Figure_1.jpeg │ │ │ ├── _page_20_Figure_4.jpeg │ │ │ ├── _page_27_Figure_1.jpeg │ │ │ ├── _page_29_Figure_1.jpeg │ │ │ ├── _page_30_Figure_1.jpeg │ │ │ ├── _page_31_Figure_3.jpeg │ │ │ ├── _page_4_Figure_1.jpeg │ │ │ ├── _page_5_Figure_3.jpeg │ │ │ ├── switch_trans_meta.json │ │ │ └── switch_trans.md │ │ └── thinkpython │ │ ├── _page_109_Figure_1.jpeg │ │ ├── _page_115_Figure_1.jpeg │ │ ├── _page_116_Figure_3.jpeg │ │ ├── _page_127_Figure_1.jpeg │ │ ├── _page_128_Figure_1.jpeg │ │ ├── _page_167_Figure_1.jpeg │ │ ├── _page_169_Figure_1.jpeg │ │ ├── _page_173_Figure_1.jpeg │ │ ├── _page_190_Figure_1.jpeg │ │ ├── _page_195_Figure_1.jpeg │ │ ├── _page_205_Figure_1.jpeg │ │ ├── _page_23_Figure_1.jpeg │ │ ├── _page_23_Figure_3.jpeg │ │ ├── _page_230_Figure_1.jpeg │ │ ├── _page_233_Figure_1.jpeg │ │ ├── _page_233_Figure_3.jpeg │ │ ├── _page_234_Figure_1.jpeg │ │ ├── _page_235_Figure_1.jpeg │ │ ├── _page_236_Figure_1.jpeg │ │ ├── _page_236_Figure_3.jpeg │ │ ├── _page_237_Figure_1.jpeg │ │ ├── _page_238_Figure_1.jpeg │ │ ├── _page_46_Figure_1.jpeg │ │ ├── _page_60_Figure_1.jpeg │ │ ├── _page_60_Figure_3.jpeg │ │ ├── _page_67_Figure_1.jpeg │ │ ├── _page_71_Figure_1.jpeg │ │ ├── _page_78_Figure_1.jpeg │ │ ├── _page_85_Figure_1.jpeg │ │ ├── _page_94_Figure_1.jpeg │ │ ├── _page_99_Figure_17.jpeg │ │ ├── _page_99_Figure_178.jpeg │ │ ├── thinkpython_meta.json │ │ └── thinkpython.md │ ├── images │ │ ├── overall.png │ │ ├── per_doc.png │ │ └── table.png │ └── latex_to_md.sh ├── examples │ ├── marker_modal_deployment.py │ └── README.md ├── extraction_app.py ├── LICENSE ├── marker │ ├── builders │ │ ├── __init__.py │ │ ├── document.py │ │ ├── layout.py │ │ ├── line.py │ │ ├── ocr.py │ │ └── structure.py │ ├── config │ │ ├── __init__.py │ │ ├── crawler.py │ │ ├── parser.py │ │ └── printer.py │ ├── converters │ │ ├── __init__.py │ │ ├── extraction.py │ │ ├── ocr.py │ │ ├── pdf.py │ │ └── table.py │ ├── extractors │ │ ├── __init__.py │ │ ├── document.py │ │ └── page.py │ ├── logger.py │ ├── models.py │ ├── output.py │ ├── processors │ │ ├── __init__.py │ │ ├── blank_page.py │ │ ├── block_relabel.py │ │ ├── blockquote.py │ │ ├── code.py │ │ ├── debug.py │ │ ├── document_toc.py │ │ ├── equation.py │ │ ├── footnote.py │ │ ├── ignoretext.py │ │ ├── line_merge.py │ │ ├── line_numbers.py │ │ ├── list.py │ │ ├── llm │ │ │ ├── __init__.py │ │ │ ├── llm_complex.py │ │ │ ├── llm_equation.py │ │ │ ├── llm_form.py │ │ │ ├── llm_handwriting.py │ │ │ ├── llm_image_description.py │ │ │ ├── llm_mathblock.py │ │ │ ├── llm_meta.py │ │ │ ├── llm_page_correction.py │ │ │ ├── llm_sectionheader.py │ │ │ ├── llm_table_merge.py │ │ │ └── llm_table.py │ │ ├── order.py │ │ ├── page_header.py │ │ ├── reference.py │ │ ├── sectionheader.py │ │ ├── table.py │ │ ├── text.py │ │ └── util.py │ ├── providers │ │ ├── __init__.py │ │ ├── document.py │ │ ├── epub.py │ │ ├── html.py │ │ ├── image.py │ │ ├── pdf.py │ │ ├── powerpoint.py │ │ ├── registry.py │ │ ├── spreadsheet.py │ │ └── utils.py │ ├── renderers │ │ ├── __init__.py │ │ ├── chunk.py │ │ ├── extraction.py │ │ ├── html.py │ │ ├── json.py │ │ ├── markdown.py │ │ └── ocr_json.py │ ├── schema │ │ ├── __init__.py │ │ ├── blocks │ │ │ ├── __init__.py │ │ │ ├── base.py │ │ │ ├── basetable.py │ │ │ ├── caption.py │ │ │ ├── code.py │ │ │ ├── complexregion.py │ │ │ ├── equation.py │ │ │ ├── figure.py │ │ │ ├── footnote.py │ │ │ ├── form.py │ │ │ ├── handwriting.py │ │ │ ├── inlinemath.py │ │ │ ├── listitem.py │ │ │ ├── pagefooter.py │ │ │ ├── pageheader.py │ │ │ ├── picture.py │ │ │ ├── reference.py │ │ │ ├── sectionheader.py │ │ │ ├── table.py │ │ │ ├── tablecell.py │ │ │ ├── text.py │ │ │ └── toc.py │ │ ├── document.py │ │ ├── groups │ │ │ ├── __init__.py │ │ │ ├── base.py │ │ │ ├── figure.py │ │ │ ├── list.py │ │ │ ├── page.py │ │ │ ├── picture.py │ │ │ └── table.py │ │ ├── polygon.py │ │ ├── registry.py │ │ └── text │ │ ├── __init__.py │ │ ├── char.py │ │ ├── line.py │ │ └── span.py │ ├── scripts │ │ ├── __init__.py │ │ ├── chunk_convert.py │ │ ├── chunk_convert.sh │ │ ├── common.py │ │ ├── convert_single.py │ │ ├── convert.py │ │ ├── extraction_app.py │ │ ├── file_to_s3.py │ │ ├── run_streamlit_app.py │ │ ├── server.py │ │ └── streamlit_app.py │ ├── services │ │ ├── __init__.py │ │ ├── azure_openai.py │ │ ├── claude.py │ │ ├── gemini.py │ │ ├── ollama.py │ │ ├── openai.py │ │ └── vertex.py │ ├── settings.py │ ├── util.py │ └── utils │ ├── __init__.py │ ├── batch.py │ ├── gpu.py │ └── image.py ├── marker_app.py ├── marker_server.py ├── poetry.lock ├── pyproject.toml ├── pytest.ini ├── README.md ├── signatures │ └── version1 │ └── cla.json ├── static │ └── fonts │ └── .gitignore └── tests ├── builders │ ├── test_blank_page.py │ ├── test_document_builder.py │ ├── test_garbled_pdf.py │ ├── test_layout_replace.py │ ├── test_ocr_builder.py │ ├── test_ocr_pipeline.py │ ├── test_overriding.py │ ├── test_pdf_links.py │ ├── test_rotated_bboxes.py │ ├── test_strip_existing_ocr.py │ └── test_structure.py ├── config │ └── test_config.py ├── conftest.py ├── converters │ ├── test_extraction_converter.py │ ├── test_ocr_converter.py │ ├── test_pdf_converter.py │ └── test_table_converter.py ├── processors │ ├── test_document_toc_processor.py │ ├── test_equation_processor.py │ ├── test_footnote_processor.py │ ├── test_ignoretext.py │ ├── test_llm_processors.py │ ├── test_table_merge.py │ └── test_table_processor.py ├── providers │ ├── test_document_providers.py │ ├── test_image_provider.py │ └── test_pdf_provider.py ├── renderers │ ├── test_chunk_renderer.py │ ├── test_extract_images.py │ ├── test_html_renderer.py │ ├── test_json_renderer.py │ └── test_markdown_renderer.py ├── schema │ └── groups │ └── test_list_grouping.py ├── services │ └── test_service_init.py └── utils.py ``` # Files -------------------------------------------------------------------------------- /marker/providers/epub.py: -------------------------------------------------------------------------------- ```python import base64 import os import tempfile from bs4 import BeautifulSoup from marker.providers.pdf import PdfProvider css = ''' @page { size: A4; margin: 2cm; } img { max-width: 100%; max-height: 25cm; object-fit: contain; margin: 12pt auto; } div, p { max-width: 100%; word-break: break-word; font-size: 10pt; } table { width: 100%; border-collapse: collapse; break-inside: auto; font-size: 10pt; } tr { break-inside: avoid; page-break-inside: avoid; } td { border: 0.75pt solid #000; padding: 6pt; } ''' class EpubProvider(PdfProvider): def __init__(self, filepath: str, config=None): temp_pdf = tempfile.NamedTemporaryFile(delete=False, suffix=f".pdf") self.temp_pdf_path = temp_pdf.name temp_pdf.close() # Convert Epub to PDF try: self.convert_epub_to_pdf(filepath) except Exception as e: raise RuntimeError(f"Failed to convert {filepath} to PDF: {e}") # Initialize the PDF provider with the temp pdf path super().__init__(self.temp_pdf_path, config) def __del__(self): if os.path.exists(self.temp_pdf_path): os.remove(self.temp_pdf_path) def convert_epub_to_pdf(self, filepath): from weasyprint import CSS, HTML from ebooklib import epub import ebooklib ebook = epub.read_epub(filepath) styles = [] html_content = "" img_tags = {} for item in ebook.get_items(): if item.get_type() == ebooklib.ITEM_IMAGE: img_data = base64.b64encode(item.get_content()).decode("utf-8") img_tags[item.file_name] = f'data:{item.media_type};base64,{img_data}' elif item.get_type() == ebooklib.ITEM_STYLE: styles.append(item.get_content().decode('utf-8')) for item in ebook.get_items(): if item.get_type() == ebooklib.ITEM_DOCUMENT: html_content += item.get_content().decode("utf-8") soup = BeautifulSoup(html_content, 'html.parser') for img in soup.find_all('img'): src = img.get('src') if src: normalized_src = src.replace('../', '') if normalized_src in img_tags: img['src'] = img_tags[normalized_src] for image in soup.find_all('image'): src = image.get('xlink:href') if src: normalized_src = src.replace('../', '') if normalized_src in img_tags: image['xlink:href'] = img_tags[normalized_src] html_content = str(soup) full_style = ''.join([css]) # + styles) # we convert the epub to HTML HTML(string=html_content, base_url=filepath).write_pdf( self.temp_pdf_path, stylesheets=[CSS(string=full_style), self.get_font_css()] ) ``` -------------------------------------------------------------------------------- /tests/processors/test_table_processor.py: -------------------------------------------------------------------------------- ```python from typing import List import pytest from marker.renderers.markdown import MarkdownRenderer from marker.schema import BlockTypes from marker.processors.table import TableProcessor from marker.schema.blocks import TableCell @pytest.mark.config({"page_range": [5]}) def test_table_processor( pdf_document, recognition_model, table_rec_model, detection_model ): processor = TableProcessor(recognition_model, table_rec_model, detection_model) processor(pdf_document) for block in pdf_document.pages[0].children: if block.block_type == BlockTypes.Table: children = block.contained_blocks(pdf_document, (BlockTypes.TableCell,)) assert children assert len(children) > 0 assert isinstance(children[0], TableCell) assert len(pdf_document.contained_blocks((BlockTypes.Table,))) == 2 renderer = MarkdownRenderer() table_output = renderer(pdf_document) assert "Schedule" in table_output.markdown @pytest.mark.filename("table_ex.pdf") @pytest.mark.config({"page_range": [0], "force_ocr": True}) def test_avoid_double_ocr( pdf_document, recognition_model, table_rec_model, detection_model ): tables = pdf_document.contained_blocks((BlockTypes.Table,)) lines = tables[0].contained_blocks(pdf_document, (BlockTypes.Line,)) assert len(lines) == 0 processor = TableProcessor( recognition_model, table_rec_model, detection_model, config={"force_ocr": True} ) processor(pdf_document) renderer = MarkdownRenderer() table_output = renderer(pdf_document) assert "Participants" in table_output.markdown @pytest.mark.filename("multicol-blocks.pdf") @pytest.mark.config({"page_range": [3]}) def test_overlap_blocks( pdf_document, detection_model, recognition_model, table_rec_model ): page = pdf_document.pages[0] assert "Cascading, and the Auxiliary Problem Principle" in page.raw_text( pdf_document ) processor = TableProcessor(recognition_model, table_rec_model, detection_model) processor(pdf_document) assert "Cascading, and the Auxiliary Problem Principle" in page.raw_text( pdf_document ) @pytest.mark.filename("pres.pdf") @pytest.mark.config({"page_range": [4]}) def test_ocr_table(pdf_document, recognition_model, table_rec_model, detection_model): processor = TableProcessor(recognition_model, table_rec_model, detection_model) processor(pdf_document) renderer = MarkdownRenderer() table_output = renderer(pdf_document) assert "1.2E-38" in table_output.markdown @pytest.mark.config({"page_range": [11]}) def test_split_rows(pdf_document, recognition_model, table_rec_model, detection_model): processor = TableProcessor(recognition_model, table_rec_model, detection_model) processor(pdf_document) table = pdf_document.contained_blocks((BlockTypes.Table,))[-1] cells: List[TableCell] = table.contained_blocks( pdf_document, (BlockTypes.TableCell,) ) unique_rows = len(set([cell.row_id for cell in cells])) assert unique_rows == 6 ``` -------------------------------------------------------------------------------- /marker/renderers/json.py: -------------------------------------------------------------------------------- ```python from typing import Annotated, Dict, List, Tuple from pydantic import BaseModel from marker.renderers import BaseRenderer from marker.schema import BlockTypes from marker.schema.blocks import Block, BlockOutput from marker.schema.document import Document from marker.schema.registry import get_block_class class JSONBlockOutput(BaseModel): id: str block_type: str html: str polygon: List[List[float]] bbox: List[float] children: List["JSONBlockOutput"] | None = None section_hierarchy: Dict[int, str] | None = None images: dict | None = None class JSONOutput(BaseModel): children: List[JSONBlockOutput] block_type: str = str(BlockTypes.Document) metadata: dict def reformat_section_hierarchy(section_hierarchy): new_section_hierarchy = {} for key, value in section_hierarchy.items(): new_section_hierarchy[key] = str(value) return new_section_hierarchy class JSONRenderer(BaseRenderer): """ A renderer for JSON output. """ image_blocks: Annotated[ Tuple[BlockTypes], "The list of block types to consider as images.", ] = (BlockTypes.Picture, BlockTypes.Figure) page_blocks: Annotated[ Tuple[BlockTypes], "The list of block types to consider as pages.", ] = (BlockTypes.Page,) def extract_json(self, document: Document, block_output: BlockOutput): cls = get_block_class(block_output.id.block_type) if cls.__base__ == Block: html, images = self.extract_block_html(document, block_output) return JSONBlockOutput( html=html, polygon=block_output.polygon.polygon, bbox=block_output.polygon.bbox, id=str(block_output.id), block_type=str(block_output.id.block_type), images=images, section_hierarchy=reformat_section_hierarchy( block_output.section_hierarchy ), ) else: children = [] for child in block_output.children: child_output = self.extract_json(document, child) children.append(child_output) return JSONBlockOutput( html=block_output.html, polygon=block_output.polygon.polygon, bbox=block_output.polygon.bbox, id=str(block_output.id), block_type=str(block_output.id.block_type), children=children, section_hierarchy=reformat_section_hierarchy( block_output.section_hierarchy ), ) def __call__(self, document: Document) -> JSONOutput: document_output = document.render(self.block_config) json_output = [] for page_output in document_output.children: json_output.append(self.extract_json(document, page_output)) return JSONOutput( children=json_output, metadata=self.generate_document_metadata(document, document_output), ) ``` -------------------------------------------------------------------------------- /marker/processors/llm/llm_image_description.py: -------------------------------------------------------------------------------- ```python from pydantic import BaseModel from marker.processors.llm import PromptData, BaseLLMSimpleBlockProcessor, BlockData from marker.schema import BlockTypes from marker.schema.document import Document from typing import Annotated, List class LLMImageDescriptionProcessor(BaseLLMSimpleBlockProcessor): block_types = ( BlockTypes.Picture, BlockTypes.Figure, ) extract_images: Annotated[bool, "Extract images from the document."] = True image_description_prompt: Annotated[ str, "The prompt to use for generating image descriptions.", "Default is a string containing the Gemini prompt.", ] = """You are a document analysis expert who specializes in creating text descriptions for images. You will receive an image of a picture or figure. Your job will be to create a short description of the image. **Instructions:** 1. Carefully examine the provided image. 2. Analyze any text that was extracted from within the image. 3. Output a faithful description of the image. Make sure there is enough specific detail to accurately reconstruct the image. If the image is a figure or contains numeric data, include the numeric data in the output. **Example:** Input: ```text "Fruit Preference Survey" 20, 15, 10 Apples, Bananas, Oranges ``` Output: In this figure, a bar chart titled "Fruit Preference Survey" is showing the number of people who prefer different types of fruits. The x-axis shows the types of fruits, and the y-axis shows the number of people. The bar chart shows that most people prefer apples, followed by bananas and oranges. 20 people prefer apples, 15 people prefer bananas, and 10 people prefer oranges. **Input:** ```text {raw_text} ``` """ def inference_blocks(self, document: Document) -> List[BlockData]: blocks = super().inference_blocks(document) if self.extract_images: return [] return blocks def block_prompts(self, document: Document) -> List[PromptData]: prompt_data = [] for block_data in self.inference_blocks(document): block = block_data["block"] prompt = self.image_description_prompt.replace( "{raw_text}", block.raw_text(document) ) image = self.extract_image(document, block) prompt_data.append( { "prompt": prompt, "image": image, "block": block, "schema": ImageSchema, "page": block_data["page"], } ) return prompt_data def rewrite_block( self, response: dict, prompt_data: PromptData, document: Document ): block = prompt_data["block"] if not response or "image_description" not in response: block.update_metadata(llm_error_count=1) return image_description = response["image_description"] if len(image_description) < 10: block.update_metadata(llm_error_count=1) return block.description = image_description class ImageSchema(BaseModel): image_description: str ``` -------------------------------------------------------------------------------- /benchmarks/overall/display/table.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from typing import Dict, List import tabulate from benchmarks.overall.schema import FullResult def write_table(title: str, rows: list, headers: list, out_path: Path, filename: str): table = tabulate.tabulate(rows, headers=headers, tablefmt="github") with open(out_path / filename, "w", encoding="utf-8") as f: f.write(f"# {title}\n") f.write(table) print(title) print(table) def print_scores(result: FullResult, out_path: Path, methods: List[str], score_types: List[str], default_score_type="heuristic", default_method="marker"): document_types = list(result["averages_by_type"][default_method][default_score_type].keys()) headers = ["Document Type"] for method in methods: for score_type in score_types: headers.append(f"{method} {score_type}") document_rows = [[k] for k in document_types] for i, doc_type in enumerate(document_types): for method in methods: for score_type in score_types: avg_score = sum(result["averages_by_type"][method][score_type][doc_type]) / max(1, len(result["averages_by_type"][method][score_type][doc_type])) document_rows[i].append(avg_score) write_table("Document Types", document_rows, headers, out_path, "document_types.md") headers = ["Block Type"] block_types = list(result["averages_by_block_type"][default_method][default_score_type].keys()) # all possible blocks block_score_types = list(result["averages_by_block_type"][default_method].keys()) for method in methods: for score_type in block_score_types: headers.append(f"{method} {score_type}") block_rows = [[k] for k in block_types] for i, block_type in enumerate(block_types): for method in methods: for score_type in block_score_types: avg_score = sum(result["averages_by_block_type"][method][score_type][block_type]) / max(1, len(result["averages_by_block_type"][method][score_type][block_type])) block_rows[i].append(avg_score) write_table("Block types", block_rows, headers, out_path, "block_types.md") headers = ["Method", "Avg Time"] + score_types inference_rows = [[k] for k in methods] all_raw_scores = [result["scores"][i] for i in result["scores"]] for i, method in enumerate(methods): avg_time = sum(result["average_times"][method]) / max(1, len(result["average_times"][method])) inference_rows[i].append(avg_time) for score_type in score_types: scores_lst = [] for ar in all_raw_scores: try: # Sometimes a few llm scores are missing scores_lst.append(ar[method][score_type]["score"]) except KeyError: continue avg_score = sum(scores_lst) / max(1, len(scores_lst)) inference_rows[i].append(avg_score) write_table("Overall Results", inference_rows, headers, out_path, "overall.md") print("Scores computed by aligning ground truth markdown blocks with predicted markdown for each method. The scores are 0-100 based on edit distance.") ``` -------------------------------------------------------------------------------- /marker/renderers/chunk.py: -------------------------------------------------------------------------------- ```python import html from typing import List, Dict from bs4 import BeautifulSoup from pydantic import BaseModel from marker.renderers.json import JSONRenderer, JSONBlockOutput from marker.schema.document import Document class FlatBlockOutput(BaseModel): id: str block_type: str html: str page: int polygon: List[List[float]] bbox: List[float] section_hierarchy: Dict[int, str] | None = None images: dict | None = None class ChunkOutput(BaseModel): blocks: List[FlatBlockOutput] page_info: Dict[int, dict] metadata: dict def collect_images(block: JSONBlockOutput) -> dict[str, str]: if not getattr(block, "children", None): return block.images or {} else: images = block.images or {} for child_block in block.children: images.update(collect_images(child_block)) return images def assemble_html_with_images(block: JSONBlockOutput, image_blocks: set[str]) -> str: if not getattr(block, "children", None): if block.block_type in image_blocks: return f"<p>{block.html}<img src='{block.id}'></p>" else: return block.html child_html = [assemble_html_with_images(child, image_blocks) for child in block.children] child_ids = [child.id for child in block.children] soup = BeautifulSoup(block.html, "html.parser") content_refs = soup.find_all("content-ref") for ref in content_refs: src_id = ref.attrs["src"] if src_id in child_ids: ref.replace_with(child_html[child_ids.index(src_id)]) return html.unescape(str(soup)) def json_to_chunks( block: JSONBlockOutput, image_blocks: set[str], page_id: int=0) -> FlatBlockOutput | List[FlatBlockOutput]: if block.block_type == "Page": children = block.children page_id = int(block.id.split("/")[-1]) return [json_to_chunks(child, image_blocks, page_id=page_id) for child in children] else: return FlatBlockOutput( id=block.id, block_type=block.block_type, html=assemble_html_with_images(block, image_blocks), page=page_id, polygon=block.polygon, bbox=block.bbox, section_hierarchy=block.section_hierarchy, images=collect_images(block), ) class ChunkRenderer(JSONRenderer): def __call__(self, document: Document) -> ChunkOutput: document_output = document.render(self.block_config) json_output = [] for page_output in document_output.children: json_output.append(self.extract_json(document, page_output)) # This will get the top-level blocks from every page chunk_output = [] for item in json_output: chunks = json_to_chunks(item, set([str(block) for block in self.image_blocks])) chunk_output.extend(chunks) page_info = { page.page_id: {"bbox": page.polygon.bbox, "polygon": page.polygon.polygon} for page in document.pages } return ChunkOutput( blocks=chunk_output, page_info=page_info, metadata=self.generate_document_metadata(document, document_output), ) ``` -------------------------------------------------------------------------------- /marker/schema/document.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations from typing import List, Sequence, Optional from pydantic import BaseModel from marker.schema import BlockTypes from marker.schema.blocks import Block, BlockId, BlockOutput from marker.schema.groups.page import PageGroup class DocumentOutput(BaseModel): children: List[BlockOutput] html: str block_type: BlockTypes = BlockTypes.Document class TocItem(BaseModel): title: str heading_level: int page_id: int polygon: List[List[float]] class Document(BaseModel): filepath: str pages: List[PageGroup] block_type: BlockTypes = BlockTypes.Document table_of_contents: List[TocItem] | None = None debug_data_path: str | None = None # Path that debug data was saved to def get_block(self, block_id: BlockId): page = self.get_page(block_id.page_id) block = page.get_block(block_id) if block: return block return None def get_page(self, page_id): for page in self.pages: if page.page_id == page_id: return page return None def get_next_block( self, block: Block, ignored_block_types: List[BlockTypes] = None ): if ignored_block_types is None: ignored_block_types = [] next_block = None # Try to find the next block in the current page page = self.get_page(block.page_id) next_block = page.get_next_block(block, ignored_block_types) if next_block: return next_block # If no block found, search subsequent pages for page in self.pages[self.pages.index(page) + 1 :]: next_block = page.get_next_block(None, ignored_block_types) if next_block: return next_block return None def get_next_page(self, page: PageGroup): page_idx = self.pages.index(page) if page_idx + 1 < len(self.pages): return self.pages[page_idx + 1] return None def get_prev_block(self, block: Block): page = self.get_page(block.page_id) prev_block = page.get_prev_block(block) if prev_block: return prev_block prev_page = self.get_prev_page(page) if not prev_page: return None return prev_page.get_block(prev_page.structure[-1]) def get_prev_page(self, page: PageGroup): page_idx = self.pages.index(page) if page_idx > 0: return self.pages[page_idx - 1] return None def assemble_html( self, child_blocks: List[Block], block_config: Optional[dict] = None ): template = "" for c in child_blocks: template += f"<content-ref src='{c.id}'></content-ref>" return template def render(self, block_config: Optional[dict] = None): child_content = [] section_hierarchy = None for page in self.pages: rendered = page.render(self, None, section_hierarchy, block_config) section_hierarchy = rendered.section_hierarchy.copy() child_content.append(rendered) return DocumentOutput( children=child_content, html=self.assemble_html(child_content, block_config), ) def contained_blocks(self, block_types: Sequence[BlockTypes] = None) -> List[Block]: blocks = [] for page in self.pages: blocks += page.contained_blocks(self, block_types) return blocks ``` -------------------------------------------------------------------------------- /benchmarks/overall/scorers/heuristic.py: -------------------------------------------------------------------------------- ```python from typing import List from rapidfuzz import fuzz from benchmarks.overall.scorers.clean import MarkdownCleaner from benchmarks.overall.scorers.schema import BlockScores from benchmarks.overall.scorers import BaseScorer class HeuristicScorer(BaseScorer): def __call__(self, sample, gt_markdown: List[str], method_markdown: str) -> BlockScores: if not method_markdown: return { "score": 0, "specific_scores": { "order": 0, "by_block": [0] * len(gt_markdown) } } # Standardize inputs gt_markdown = [self.clean_input(block) for block in gt_markdown] method_markdown = self.clean_input(method_markdown) alignments = self.find_fuzzy_alignments(method_markdown, gt_markdown) scores = [alignment["score"] for alignment in alignments] # Find order score orders = [alignment["start"] for alignment in alignments] correct_order = list(range(len(gt_markdown))) actual_order = sorted(range(len(gt_markdown)), key=lambda x: orders[x]) order_score = self.kendall_tau(correct_order, actual_order) # Weight score by sequence length gt_weights = [len(g) for g in gt_markdown] weighted_scores = [score * weight for score, weight in zip(scores, gt_weights)] # Weight the score by sequence length overall_score = sum(weighted_scores) / max(1, sum(gt_weights)) overall_score = overall_score * 0.8 + order_score * 0.2 return { "score": overall_score, "specific_scores": { "order": order_score, "by_block": scores }, } @staticmethod def kendall_tau(correct_order: List[int], actual_order: List[int]) -> float: n = len(correct_order) concordant = 0 discordant = 0 if n <= 1: return 100 for i in range(n): for j in range(i + 1, n): correct_sign = correct_order[i] - correct_order[j] actual_sign = actual_order[i] - actual_order[j] if (correct_sign > 0 and actual_sign > 0) or (correct_sign < 0 and actual_sign < 0): concordant += 1 elif (correct_sign < 0 and actual_sign > 0) or (correct_sign > 0 and actual_sign < 0): discordant += 1 total_pairs = (n * (n - 1)) // 2 tau = (concordant - discordant) / total_pairs tau = (tau + 1) / 2 # 0-1 scale return tau * 100 # 0-100 scale @staticmethod def find_fuzzy_alignments( main_string: str, substrings: List[str], threshold: int = 70 ) -> List[dict]: alignments = [] for idx, substr in enumerate(substrings): result = fuzz.partial_ratio_alignment(substr, main_string, score_cutoff=threshold) score = 0 dest_start = 0 dest_end = 0 if result: score = result.score dest_start = result.dest_start dest_end = result.dest_end alignments.append({ "string": substr, "start": dest_start, "end": dest_end, "score": score, "idx": idx }) return alignments @staticmethod def clean_input(md: str): cleaner = MarkdownCleaner() return cleaner(md) ``` -------------------------------------------------------------------------------- /marker/output.py: -------------------------------------------------------------------------------- ```python import json import os from bs4 import BeautifulSoup, Tag from pydantic import BaseModel from PIL import Image from marker.renderers.extraction import ExtractionOutput from marker.renderers.html import HTMLOutput from marker.renderers.json import JSONOutput, JSONBlockOutput from marker.renderers.markdown import MarkdownOutput from marker.renderers.ocr_json import OCRJSONOutput from marker.schema.blocks import BlockOutput from marker.settings import settings def unwrap_outer_tag(html: str): soup = BeautifulSoup(html, "html.parser") contents = list(soup.contents) if len(contents) == 1 and isinstance(contents[0], Tag) and contents[0].name == "p": # Unwrap the p tag soup.p.unwrap() return str(soup) def json_to_html(block: JSONBlockOutput | BlockOutput): # Utility function to take in json block output and give html for the block. if not getattr(block, "children", None): return block.html else: child_html = [json_to_html(child) for child in block.children] child_ids = [child.id for child in block.children] soup = BeautifulSoup(block.html, "html.parser") content_refs = soup.find_all("content-ref") for ref in content_refs: src_id = ref.attrs["src"] if src_id in child_ids: child_soup = BeautifulSoup( child_html[child_ids.index(src_id)], "html.parser" ) ref.replace_with(child_soup) return str(soup) def output_exists(output_dir: str, fname_base: str): exts = ["md", "html", "json"] for ext in exts: if os.path.exists(os.path.join(output_dir, f"{fname_base}.{ext}")): return True return False def text_from_rendered(rendered: BaseModel): from marker.renderers.chunk import ChunkOutput # Has an import from this file if isinstance(rendered, MarkdownOutput): return rendered.markdown, "md", rendered.images elif isinstance(rendered, HTMLOutput): return rendered.html, "html", rendered.images elif isinstance(rendered, JSONOutput): return rendered.model_dump_json(exclude=["metadata"], indent=2), "json", {} elif isinstance(rendered, ChunkOutput): return rendered.model_dump_json(exclude=["metadata"], indent=2), "json", {} elif isinstance(rendered, OCRJSONOutput): return rendered.model_dump_json(exclude=["metadata"], indent=2), "json", {} elif isinstance(rendered, ExtractionOutput): return rendered.document_json, "json", {} else: raise ValueError("Invalid output type") def convert_if_not_rgb(image: Image.Image) -> Image.Image: if image.mode != "RGB": image = image.convert("RGB") return image def save_output(rendered: BaseModel, output_dir: str, fname_base: str): text, ext, images = text_from_rendered(rendered) text = text.encode(settings.OUTPUT_ENCODING, errors="replace").decode( settings.OUTPUT_ENCODING ) with open( os.path.join(output_dir, f"{fname_base}.{ext}"), "w+", encoding=settings.OUTPUT_ENCODING, ) as f: f.write(text) with open( os.path.join(output_dir, f"{fname_base}_meta.json"), "w+", encoding=settings.OUTPUT_ENCODING, ) as f: f.write(json.dumps(rendered.metadata, indent=2)) for img_name, img in images.items(): img = convert_if_not_rgb(img) # RGBA images can't save as JPG img.save(os.path.join(output_dir, img_name), settings.OUTPUT_IMAGE_FORMAT) ``` -------------------------------------------------------------------------------- /marker/processors/block_relabel.py: -------------------------------------------------------------------------------- ```python from copy import deepcopy from typing import Annotated from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.blocks import BlockId from marker.schema.document import Document from marker.schema.registry import get_block_class from marker.logger import get_logger logger = get_logger() class BlockRelabelProcessor(BaseProcessor): """ A processor to heuristically relabel blocks based on a confidence threshold. Each rule in the relabel string maps an original block label to a new one if the confidence exceeds a given threshold. """ block_relabel_str: Annotated[ str, "Comma-separated relabeling rules in the format '<original_label>:<new_label>:<confidence_threshold>'.", "Each rule defines how blocks of a certain type should be relabeled when the confidence exceeds the threshold.", "Example: 'Table:Picture:0.85,Form:Picture:0.9'" ] = "" def __init__(self, config=None): super().__init__(config) self.block_relabel_map = {} if not self.block_relabel_str: return for i, block_config_str in enumerate(self.block_relabel_str.split(',')): block_config_str = block_config_str.strip() if not block_config_str: continue # Skip empty segments try: parts = block_config_str.split(':') if len(parts) != 3: raise ValueError(f"Expected 3 parts, got {len(parts)}") block_label, block_relabel, confidence_str = parts confidence_thresh = float(confidence_str) block_type = BlockTypes[block_label] relabel_block_type = BlockTypes[block_relabel] self.block_relabel_map[block_type] = ( confidence_thresh, relabel_block_type ) except Exception as e: logger.warning(f"Failed to parse relabel rule '{block_config_str}' at index {i}: {e}. Expected format is <original_label>:<new_label>:<confidence_threshold>") def __call__(self, document: Document): if len(self.block_relabel_map) == 0: return for page in document.pages: for block in page.structure_blocks(document): if block.block_type not in self.block_relabel_map: continue block_id = BlockId(page_id=page.page_id, block_id=block.block_id, block_type=block.block_type) confidence_thresh, relabel_block_type = self.block_relabel_map[block.block_type] confidence = block.top_k.get(block.block_type) if confidence > confidence_thresh: logger.debug(f"Skipping relabel for {block_id}; Confidence: {confidence} > Confidence Threshold {confidence_thresh} for re-labelling") continue new_block_cls = get_block_class(relabel_block_type) new_block = new_block_cls( polygon=deepcopy(block.polygon), page_id=block.page_id, structure=deepcopy(block.structure), text_extraction_method=block.text_extraction_method, source="heuristics", top_k=block.top_k, metadata=block.metadata ) page.replace_block(block, new_block) logger.debug(f"Relabelled {block_id} to {relabel_block_type}") ``` -------------------------------------------------------------------------------- /marker/providers/spreadsheet.py: -------------------------------------------------------------------------------- ```python import os import tempfile from marker.providers.pdf import PdfProvider css = ''' @page { size: A4 landscape; margin: 1.5cm; } table { width: 100%; border-collapse: collapse; break-inside: auto; font-size: 10pt; } tr { break-inside: avoid; page-break-inside: avoid; } td { border: 0.75pt solid #000; padding: 6pt; } ''' class SpreadSheetProvider(PdfProvider): def __init__(self, filepath: str, config=None): temp_pdf = tempfile.NamedTemporaryFile(delete=False, suffix=f".pdf") self.temp_pdf_path = temp_pdf.name temp_pdf.close() # Convert XLSX to PDF try: self.convert_xlsx_to_pdf(filepath) except Exception as e: raise RuntimeError(f"Failed to convert {filepath} to PDF: {e}") # Initialize the PDF provider with the temp pdf path super().__init__(self.temp_pdf_path, config) def __del__(self): if os.path.exists(self.temp_pdf_path): os.remove(self.temp_pdf_path) def convert_xlsx_to_pdf(self, filepath: str): from weasyprint import CSS, HTML from openpyxl import load_workbook html = "" workbook = load_workbook(filepath) if workbook is not None: for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] html += f'<div><h1>{sheet_name}</h1>' + self._excel_to_html_table(sheet) + '</div>' else: raise ValueError("Invalid XLSX file") # We convert the HTML into a PDF HTML(string=html).write_pdf( self.temp_pdf_path, stylesheets=[CSS(string=css), self.get_font_css()] ) @staticmethod def _get_merged_cell_ranges(sheet): merged_info = {} for merged_range in sheet.merged_cells.ranges: min_col, min_row, max_col, max_row = merged_range.bounds merged_info[(min_row, min_col)] = { 'rowspan': max_row - min_row + 1, 'colspan': max_col - min_col + 1, 'range': merged_range } return merged_info def _excel_to_html_table(self, sheet): merged_cells = self._get_merged_cell_ranges(sheet) html = f'<table>' # Track cells we should skip due to being part of a merge range skip_cells = set() for row_idx, row in enumerate(sheet.rows, 1): html += '<tr>' for col_idx, cell in enumerate(row, 1): if (row_idx, col_idx) in skip_cells: continue # Check if this cell is the start of a merged range merge_info = merged_cells.get((row_idx, col_idx)) if merge_info: # Add cells to skip for r in range(row_idx, row_idx + merge_info['rowspan']): for c in range(col_idx, col_idx + merge_info['colspan']): if (r, c) != (row_idx, col_idx): skip_cells.add((r, c)) # Add merged cell with rowspan/colspan value = cell.value if cell.value is not None else '' html += f'<td rowspan="{merge_info["rowspan"]}" colspan="{merge_info["colspan"]}">{value}' else: # Regular cell value = cell.value if cell.value is not None else '' html += f'<td>{value}' html += '</td>' html += '</tr>' html += '</table>' return html ``` -------------------------------------------------------------------------------- /marker/processors/ignoretext.py: -------------------------------------------------------------------------------- ```python import re from collections import Counter from itertools import groupby from typing import Annotated, List from rapidfuzz import fuzz from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.blocks import Block from marker.schema.document import Document class IgnoreTextProcessor(BaseProcessor): """ A processor for identifying and ignoring common text blocks in a document. These blocks often represent repetitive or non-essential elements, such as headers, footers, or page numbers. """ block_types = ( BlockTypes.Text, BlockTypes.SectionHeader, BlockTypes.TextInlineMath ) common_element_threshold: Annotated[ float, "The minimum ratio of pages a text block must appear on to be considered a common element.", "Blocks that meet or exceed this threshold are marked as common elements.", ] = 0.2 common_element_min_blocks: Annotated[ int, "The minimum number of occurrences of a text block within a document to consider it a common element.", "This ensures that rare blocks are not mistakenly flagged.", ] = 3 max_streak: Annotated[ int, "The maximum number of consecutive occurrences of a text block allowed before it is classified as a common element.", "Helps to identify patterns like repeated headers or footers.", ] = 3 text_match_threshold: Annotated[ int, "The minimum fuzzy match score (0-100) required to classify a text block as similar to a common element.", "Higher values enforce stricter matching.", ] = 90 def __call__(self, document: Document): first_blocks = [] last_blocks = [] for page in document.pages: initial_block = None last_block = None for block in page.contained_blocks(document, self.block_types): if block.structure is not None: if initial_block is None: initial_block = block last_block = block if initial_block is not None: first_blocks.append(initial_block) if last_block is not None: last_blocks.append(last_block) self.filter_common_elements(document, first_blocks) self.filter_common_elements(document, last_blocks) @staticmethod def clean_text(text): text = text.replace("\n", "").strip() text = re.sub(r"^\d+\s*", "", text) # remove numbers at the start of the line text = re.sub(r"\s*\d+$", "", text) # remove numbers at the end of the line return text def filter_common_elements(self, document, blocks: List[Block]): # We can't filter if we don't have enough pages to find common elements if len(blocks) < self.common_element_min_blocks: return text = [self.clean_text(b.raw_text(document)) for b in blocks] streaks = {} for key, group in groupby(text): streaks[key] = max(streaks.get(key, 0), len(list(group))) counter = Counter(text) common = [ k for k, v in counter.items() if (v >= len(blocks) * self.common_element_threshold or streaks[k] >= self.max_streak) and v > self.common_element_min_blocks ] if len(common) == 0: return for t, b in zip(text, blocks): # Check against all common elements if any(fuzz.ratio(t, common_element) > self.text_match_threshold for common_element in common): b.ignore_for_output = True ``` -------------------------------------------------------------------------------- /marker/processors/llm/llm_complex.py: -------------------------------------------------------------------------------- ```python from typing import List import markdown2 from pydantic import BaseModel from marker.processors.llm import PromptData, BaseLLMSimpleBlockProcessor from marker.schema import BlockTypes from marker.schema.document import Document class LLMComplexRegionProcessor(BaseLLMSimpleBlockProcessor): block_types = (BlockTypes.ComplexRegion,) complex_region_prompt = """You are a text correction expert specializing in accurately reproducing text from images. You will receive an image of a text block and the text that can be extracted from the image. Your task is to generate markdown to properly represent the content of the image. Do not omit any text present in the image - make sure everything is included in the markdown representation. The markdown representation should be as faithful to the original image as possible. Formatting should be in markdown, with the following rules: - * for italics, ** for bold, and ` for inline code. - Use <sup>...</sup> for superscripts. - Headers should be formatted with #, with one # for the largest header, and up to 6 for the smallest. - Lists should be formatted with either - or 1. for unordered and ordered lists, respectively. - Links should be formatted with [text](url). - Use ``` for code blocks. - Inline math should be formatted with <math>math expression</math>. - Display math should be formatted with <math display="block">math expression</math>. - Values and labels should be extracted from forms, and put into markdown tables, with the labels on the left side, and values on the right. The headers should be "Labels" and "Values". Other text in the form can appear between the tables. - Tables should be formatted with markdown tables, with the headers bolded. **Instructions:** 1. Carefully examine the provided block image. 2. Analyze the existing text representation. 3. Generate the markdown representation of the content in the image. **Example:** Input: ```text Table 1: Car Sales ``` Output: ```markdown ## Table 1: Car Sales | Car | Sales | | --- | --- | | Honda | 100 | | Toyota | 200 | ``` **Input:** ```text {extracted_text} ``` """ def block_prompts(self, document: Document) -> List[PromptData]: prompt_data = [] for block in self.inference_blocks(document): text = block["block"].raw_text(document) prompt = self.complex_region_prompt.replace("{extracted_text}", text) image = self.extract_image(document, block["block"]) prompt_data.append({ "prompt": prompt, "image": image, "block": block["block"], "schema": ComplexSchema, "page": block["page"] }) return prompt_data def rewrite_block(self, response: dict, prompt_data: PromptData, document: Document): block = prompt_data["block"] text = block.raw_text(document) if not response or "corrected_markdown" not in response: block.update_metadata(llm_error_count=1) return corrected_markdown = response["corrected_markdown"] # The original table is okay if "no corrections" in corrected_markdown.lower(): return # Potentially a partial response if len(corrected_markdown) < len(text) * .5: block.update_metadata(llm_error_count=1) return # Convert LLM markdown to html corrected_markdown = corrected_markdown.strip().lstrip("```markdown").rstrip("```").strip() block.html = markdown2.markdown(corrected_markdown, extras=["tables"]) class ComplexSchema(BaseModel): corrected_markdown: str ``` -------------------------------------------------------------------------------- /tests/converters/test_pdf_converter.py: -------------------------------------------------------------------------------- ```python import io import pytest from marker.converters.pdf import PdfConverter from marker.renderers.markdown import MarkdownOutput @pytest.mark.output_format("markdown") @pytest.mark.config({"page_range": [0, 1, 2, 3, 7], "disable_ocr": True}) def test_pdf_converter(pdf_converter: PdfConverter, temp_doc): markdown_output: MarkdownOutput = pdf_converter(temp_doc.name) markdown = markdown_output.markdown # Basic assertions assert len(markdown) > 0 assert "# Subspace Adversarial Training" in markdown # Some assertions for line joining across pages assert ( "AT solutions. However, these methods highly rely on specifically" in markdown ) # pgs: 1-2 assert ( "(with adversarial perturbations), which harms natural accuracy, " in markdown ) # pgs: 3-4 # Some assertions for line joining across columns assert "remain similar across a wide range of choices." in markdown # pg: 2 assert "a new scheme for designing more robust and efficient" in markdown # pg: 8 @pytest.mark.filename("manual.epub") @pytest.mark.config({"page_range": [0]}) def test_epub_converter(pdf_converter: PdfConverter, temp_doc): markdown_output: MarkdownOutput = pdf_converter(temp_doc.name) markdown = markdown_output.markdown # Basic assertions assert "Simple Sabotage Field Manual" in markdown @pytest.mark.filename("single_sheet.xlsx") @pytest.mark.config({"page_range": [0]}) def test_xlsx_converter(pdf_converter: PdfConverter, temp_doc): markdown_output: MarkdownOutput = pdf_converter(temp_doc.name) markdown = markdown_output.markdown # Basic assertions assert "four" in markdown @pytest.mark.filename("china.html") @pytest.mark.config({"page_range": [10]}) def test_html_converter(pdf_converter: PdfConverter, temp_doc): markdown_output: MarkdownOutput = pdf_converter(temp_doc.name) markdown = markdown_output.markdown # Basic assertions assert "Republic of China" in markdown @pytest.mark.filename("gatsby.docx") @pytest.mark.config({"page_range": [0]}) def test_docx_converter(pdf_converter: PdfConverter, temp_doc): markdown_output: MarkdownOutput = pdf_converter(temp_doc.name) markdown = markdown_output.markdown # Basic assertions assert "The Decline of the American Dream in the 1920s" in markdown @pytest.mark.filename("lambda.pptx") @pytest.mark.config({"page_range": [0]}) def test_pptx_converter(pdf_converter: PdfConverter, temp_doc): markdown_output: MarkdownOutput = pdf_converter(temp_doc.name) markdown = markdown_output.markdown # Basic assertions assert "Adam Doupé" in markdown @pytest.mark.output_format("markdown") @pytest.mark.config({"page_range": [0, 1, 2, 3, 7], "disable_ocr": True}) def test_pdf_converter_bytes(pdf_converter: PdfConverter, temp_doc): with open(temp_doc.name, "rb") as f: data = f.read() input_bytes = io.BytesIO(data) markdown_output: MarkdownOutput = pdf_converter(input_bytes) markdown = markdown_output.markdown # Basic assertions assert len(markdown) > 0 assert "# Subspace Adversarial Training" in markdown # Some assertions for line joining across pages assert ( "AT solutions. However, these methods highly rely on specifically" in markdown ) # pgs: 1-2 assert ( "(with adversarial perturbations), which harms natural accuracy, " in markdown ) # pgs: 3-4 # Some assertions for line joining across columns assert "remain similar across a wide range of choices." in markdown # pg: 2 assert "a new scheme for designing more robust and efficient" in markdown # pg: 8 ``` -------------------------------------------------------------------------------- /benchmarks/table/table.py: -------------------------------------------------------------------------------- ```python import os os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # Transformers uses .isin for an op, which is not supported on MPS from pathlib import Path from itertools import repeat from typing import List import time import datasets from tqdm import tqdm import click from tabulate import tabulate import json from concurrent.futures import ProcessPoolExecutor from marker.settings import settings from benchmarks.table.inference import inference_tables from scoring import wrap_table_html, similarity_eval_html def update_teds_score(result, prefix: str = "marker"): prediction, ground_truth = result[f'{prefix}_table'], result['gt_table'] prediction, ground_truth = wrap_table_html(prediction), wrap_table_html(ground_truth) score = similarity_eval_html(prediction, ground_truth) result.update({f'{prefix}_score':score}) return result @click.command(help="Benchmark Table to HTML Conversion") @click.option("--result_path", type=str, default=os.path.join(settings.OUTPUT_DIR, "benchmark", "table"), help="Output path for results.") @click.option("--dataset", type=str, default="datalab-to/fintabnet_bench_marker", help="Dataset to use") @click.option("--max_rows", type=int, default=None, help="Maximum number of PDFs to process") @click.option("--max_workers", type=int, default=16, help="Maximum number of workers to use") @click.option("--use_llm", is_flag=True, help="Use LLM for improving table recognition.") @click.option("--table_rec_batch_size", type=int, default=None, help="Batch size for table recognition.") @click.option("--use_gemini", is_flag=True, help="Evaluate Gemini for table recognition.") def main( result_path: str, dataset: str, max_rows: int, max_workers: int, use_llm: bool, table_rec_batch_size: int | None, use_gemini: bool = False ): start = time.time() dataset = datasets.load_dataset(dataset, split='train') dataset = dataset.shuffle(seed=0) results, total_unaligned = inference_tables(dataset, use_llm, table_rec_batch_size, max_rows, use_gemini) print(f"Total time: {time.time() - start}.") print(f"Could not align {total_unaligned} tables from fintabnet.") with ProcessPoolExecutor(max_workers=max_workers) as executor: marker_results = list( tqdm( executor.map(update_teds_score, results), desc='Computing alignment scores', total=len(results) ) ) avg_score = sum([r["marker_score"] for r in marker_results]) / len(marker_results) headers = ["Avg score", "Total tables"] data = [f"{avg_score:.3f}", len(marker_results)] gemini_results = None if use_gemini: with ProcessPoolExecutor(max_workers=max_workers) as executor: gemini_results = list( tqdm( executor.map(update_teds_score, results, repeat("gemini")), desc='Computing Gemini scores', total=len(results) ) ) avg_gemini_score = sum([r["gemini_score"] for r in gemini_results]) / len(gemini_results) headers.append("Avg Gemini score") data.append(f"{avg_gemini_score:.3f}") table = tabulate([data], headers=headers, tablefmt="github") print(table) print("Avg score computed by comparing marker predicted HTML with original HTML") results = { "marker": marker_results, "gemini": gemini_results } out_path = Path(result_path) out_path.mkdir(parents=True, exist_ok=True) with open(out_path / "table.json", "w+") as f: json.dump(results, f, indent=2) print(f"Results saved to {out_path}.") if __name__ == '__main__': main() ``` -------------------------------------------------------------------------------- /benchmarks/table/scoring.py: -------------------------------------------------------------------------------- ```python """" TEDS Code Adapted from https://github.com/ibm-aur-nlp/EDD """ import distance from apted import APTED, Config from apted.helpers import Tree from lxml import html from collections import deque def wrap_table_html(table_html:str)->str: return f'<html><body>{table_html}</body></html>' class TableTree(Tree): def __init__(self, tag, colspan=None, rowspan=None, content=None, *children): self.tag = tag self.colspan = colspan self.rowspan = rowspan self.content = content # Sets self.name and self.children super().__init__(tag, *children) def bracket(self): """Show tree using brackets notation""" if self.tag == 'td': result = '"tag": %s, "colspan": %d, "rowspan": %d, "text": %s' % \ (self.tag, self.colspan, self.rowspan, self.content) else: result = '"tag": %s' % self.tag for child in self.children: result += child.bracket() return "{{{}}}".format(result) class CustomConfig(Config): @staticmethod def maximum(*sequences): return max(map(len, sequences)) def normalized_distance(self, *sequences): return float(distance.levenshtein(*sequences)) / self.maximum(*sequences) def rename(self, node1, node2): if (node1.tag != node2.tag) or (node1.colspan != node2.colspan) or (node1.rowspan != node2.rowspan): return 1. if node1.tag == 'td': if node1.content or node2.content: return self.normalized_distance(node1.content, node2.content) return 0. def tokenize(node): """ Tokenizes table cells """ global __tokens__ __tokens__.append('<%s>' % node.tag) if node.text is not None: __tokens__ += list(node.text) for n in node.getchildren(): tokenize(n) if node.tag != 'unk': __tokens__.append('</%s>' % node.tag) if node.tag != 'td' and node.tail is not None: __tokens__ += list(node.tail) def tree_convert_html(node, convert_cell=False, parent=None): """ Converts HTML tree to the format required by apted """ global __tokens__ if node.tag == 'td': if convert_cell: __tokens__ = [] tokenize(node) cell = __tokens__[1:-1].copy() else: cell = [] new_node = TableTree(node.tag, int(node.attrib.get('colspan', '1')), int(node.attrib.get('rowspan', '1')), cell, *deque()) else: new_node = TableTree(node.tag, None, None, None, *deque()) if parent is not None: parent.children.append(new_node) if node.tag != 'td': for n in node.getchildren(): tree_convert_html(n, convert_cell, new_node) if parent is None: return new_node def similarity_eval_html(pred, true, structure_only=False): """ Computes TEDS score between the prediction and the ground truth of a given samples """ pred, true = html.fromstring(pred), html.fromstring(true) if pred.xpath('body/table') and true.xpath('body/table'): pred = pred.xpath('body/table')[0] true = true.xpath('body/table')[0] n_nodes_pred = len(pred.xpath(".//*")) n_nodes_true = len(true.xpath(".//*")) tree_pred = tree_convert_html(pred, convert_cell=not structure_only) tree_true = tree_convert_html(true, convert_cell=not structure_only) n_nodes = max(n_nodes_pred, n_nodes_true) distance = APTED(tree_pred, tree_true, CustomConfig()).compute_edit_distance() return 1.0 - (float(distance) / n_nodes) else: return 0.0 ``` -------------------------------------------------------------------------------- /marker/schema/text/span.py: -------------------------------------------------------------------------------- ```python import html import re from typing import List, Literal, Optional from marker.schema import BlockTypes from marker.schema.blocks import Block from marker.util import unwrap_math def cleanup_text(full_text): full_text = re.sub(r"(\n\s){3,}", "\n\n", full_text) full_text = full_text.replace("\xa0", " ") # Replace non-breaking spaces return full_text class Span(Block): block_type: BlockTypes = BlockTypes.Span block_description: str = "A span of text inside a line." text: str font: str font_weight: float font_size: float minimum_position: int maximum_position: int formats: List[ Literal[ "plain", "math", "chemical", "bold", "italic", "highlight", "subscript", "superscript", "small", "code", "underline", ] ] has_superscript: bool = False has_subscript: bool = False url: Optional[str] = None html: Optional[str] = None @property def bold(self): return "bold" in self.formats @property def italic(self): return "italic" in self.formats @property def math(self): return "math" in self.formats @property def highlight(self): return "highlight" in self.formats @property def superscript(self): return "superscript" in self.formats @property def subscript(self): return "subscript" in self.formats @property def small(self): return "small" in self.formats @property def code(self): return "code" in self.formats @property def underline(self): return "underline" in self.formats def assemble_html(self, document, child_blocks, parent_structure, block_config): if self.ignore_for_output: return "" if self.html: return self.html text = self.text # Remove trailing newlines replaced_newline = False while len(text) > 0 and text[-1] in ["\n", "\r"]: text = text[:-1] replaced_newline = True # Remove leading newlines while len(text) > 0 and text[0] in ["\n", "\r"]: text = text[1:] if replaced_newline and not text.endswith("-"): text += " " text = text.replace( "-\n", "" ) # Remove hyphenated line breaks from the middle of the span text = html.escape(text) text = cleanup_text(text) if self.has_superscript: text = re.sub(r"^([0-9\W]+)(.*)", r"<sup>\1</sup>\2", text) # Handle full block superscript if "<sup>" not in text: text = f"<sup>{text}</sup>" if self.url: text = f"<a href='{self.url}'>{text}</a>" # TODO Support multiple formats if self.italic: text = f"<i>{text}</i>" elif self.bold: text = f"<b>{text}</b>" elif self.math: block_envs = ["split", "align", "gather", "multline"] if any(f"\\begin{{{env}}}" in text for env in block_envs): display_mode = "block" else: display_mode = "inline" text = f"<math display='{display_mode}'>{text}</math>" elif self.highlight: text = f"<mark>{text}</mark>" elif self.subscript: text = f"<sub>{text}</sub>" elif self.superscript: text = f"<sup>{text}</sup>" elif self.underline: text = f"<u>{text}</u>" elif self.small: text = f"<small>{text}</small>" elif self.code: text = f"<code>{text}</code>" text = unwrap_math(text) return text ``` -------------------------------------------------------------------------------- /marker/processors/llm/llm_handwriting.py: -------------------------------------------------------------------------------- ```python import markdown2 from pydantic import BaseModel from marker.processors.llm import PromptData, BaseLLMSimpleBlockProcessor, BlockData from marker.schema import BlockTypes from marker.schema.document import Document from typing import Annotated, List class LLMHandwritingProcessor(BaseLLMSimpleBlockProcessor): block_types = (BlockTypes.Handwriting, BlockTypes.Text) handwriting_generation_prompt: Annotated[ str, "The prompt to use for OCRing handwriting.", "Default is a string containing the Gemini prompt." ] = """You are an expert editor specializing in accurately reproducing text from images. You will receive an image of a text block. Your task is to generate markdown to properly represent the content of the image. Do not omit any text present in the image - make sure everything is included in the markdown representation. The markdown representation should be as faithful to the original image as possible. Formatting should be in markdown, with the following rules: - * for italics, ** for bold, and ` for inline code. - Headers should be formatted with #, with one # for the largest header, and up to 6 for the smallest. - Lists should be formatted with either - or 1. for unordered and ordered lists, respectively. - Links should be formatted with [text](url). - Use ``` for code blocks. - Inline math should be formatted with <math>math expression</math>. - Display math should be formatted with <math display="block">math expression</math>. - Values and labels should be extracted from forms, and put into markdown tables, with the labels on the left side, and values on the right. The headers should be "Labels" and "Values". Other text in the form can appear between the tables. - Tables should be formatted with markdown tables, with the headers bolded. **Instructions:** 1. Carefully examine the provided block image. 2. Output the markdown representing the content of the image. """ def inference_blocks(self, document: Document) -> List[BlockData]: blocks = super().inference_blocks(document) out_blocks = [] for block_data in blocks: raw_text = block_data["block"].raw_text(document) block = block_data["block"] # Don't process text blocks that contain lines already if block.block_type == BlockTypes.Text: lines = block.contained_blocks(document, (BlockTypes.Line,)) if len(lines) > 0 or len(raw_text.strip()) > 0: continue out_blocks.append(block_data) return out_blocks def block_prompts(self, document: Document) -> List[PromptData]: prompt_data = [] for block_data in self.inference_blocks(document): block = block_data["block"] prompt = self.handwriting_generation_prompt image = self.extract_image(document, block) prompt_data.append({ "prompt": prompt, "image": image, "block": block, "schema": HandwritingSchema, "page": block_data["page"] }) return prompt_data def rewrite_block(self, response: dict, prompt_data: PromptData, document: Document): block = prompt_data["block"] raw_text = block.raw_text(document) if not response or "markdown" not in response: block.update_metadata(llm_error_count=1) return markdown = response["markdown"] if len(markdown) < len(raw_text) * .5: block.update_metadata(llm_error_count=1) return markdown = markdown.strip().lstrip("```markdown").rstrip("```").strip() block.html = markdown2.markdown(markdown, extras=["tables"]) class HandwritingSchema(BaseModel): markdown: str ``` -------------------------------------------------------------------------------- /marker/services/azure_openai.py: -------------------------------------------------------------------------------- ```python import json import time from typing import Annotated, List import PIL from marker.logger import get_logger from openai import AzureOpenAI, APITimeoutError, RateLimitError from PIL import Image from pydantic import BaseModel from marker.schema.blocks import Block from marker.services import BaseService logger = get_logger() class AzureOpenAIService(BaseService): azure_endpoint: Annotated[ str, "The Azure OpenAI endpoint URL. No trailing slash." ] = None azure_api_key: Annotated[ str, "The API key to use for the Azure OpenAI service." ] = None azure_api_version: Annotated[str, "The Azure OpenAI API version to use."] = None deployment_name: Annotated[ str, "The deployment name for the Azure OpenAI model." ] = None def process_images(self, images: List[PIL.Image.Image]) -> list: if isinstance(images, Image.Image): images = [images] return [ { "type": "image_url", "image_url": { "url": "data:image/webp;base64,{}".format(self.img_to_base64(img)), }, } for img in images ] def __call__( self, prompt: str, image: PIL.Image.Image | List[PIL.Image.Image] | None, block: Block | None, response_schema: type[BaseModel], max_retries: int | None = None, timeout: int | None = None, ): if max_retries is None: max_retries = self.max_retries if timeout is None: timeout = self.timeout client = self.get_client() image_data = self.format_image_for_llm(image) messages = [ { "role": "user", "content": [ *image_data, {"type": "text", "text": prompt}, ], } ] total_tries = max_retries + 1 for tries in range(1, total_tries + 1): try: response = client.beta.chat.completions.parse( extra_headers={ "X-Title": "Marker", "HTTP-Referer": "https://github.com/datalab-to/marker", }, model=self.deployment_name, messages=messages, timeout=timeout, response_format=response_schema, ) response_text = response.choices[0].message.content total_tokens = response.usage.total_tokens if block: block.update_metadata( llm_tokens_used=total_tokens, llm_request_count=1 ) return json.loads(response_text) except (APITimeoutError, RateLimitError) as e: # Rate limit exceeded if tries == total_tries: # Last attempt failed. Give up logger.error( f"Rate limit error: {e}. Max retries reached. Giving up. (Attempt {tries}/{total_tries})" ) break else: wait_time = tries * self.retry_wait_time logger.warning( f"Rate limit error: {e}. Retrying in {wait_time} seconds... (Attempt {tries}/{total_tries})" ) time.sleep(wait_time) except Exception as e: logger.error(f"Azure OpenAI inference failed: {e}") break return {} def get_client(self) -> AzureOpenAI: return AzureOpenAI( api_version=self.azure_api_version, azure_endpoint=self.azure_endpoint, api_key=self.azure_api_key, ) ``` -------------------------------------------------------------------------------- /benchmarks/overall/scorers/clean.py: -------------------------------------------------------------------------------- ```python import re import subprocess import tempfile from pathlib import Path import latex2mathml.converter class MarkdownCleaner: def __init__(self): pass def __call__(self, markdown): markdown = self.normalize_markdown(markdown) # Use pandoc to normalize # Replace math expressions with latexml pattern = r'(?<!\\)\$(?:\$([^$]+)\$\$|\s*([^$\n]+?)\s*\$)' markdown = re.sub(pattern, self.standardize_math, markdown) # Replace image urls with a generic tag pattern = r'!\[(.*?)\]\((https?://[^\s\)]+)\)' markdown = re.sub(pattern, r'![link]', markdown) # Clean up stray html tags markdown = markdown.replace("<br>", "\n") markdown = re.sub(r"<sub>(.*?)</sub>", r"\1", markdown) markdown = re.sub(r"<sup>(.*?)</sup>", r"\1", markdown) markdown = re.sub(r"<span.*?>(.*?)</span>", r"\1", markdown) # Remove span tags and keep content # Clean up markdown formatting markdown = re.sub(r"\s+", " ", markdown) markdown = re.sub(r"\n+", "\n", markdown) markdown = re.sub("\\.+", ".", markdown) # Replace repeated periods with a single period, like in table of contents markdown = re.sub("#+", "#", markdown) # Replace repeated headers with a single header markdown = markdown.encode().decode('unicode-escape', errors="ignore") # Decode unicode characters properly return markdown.strip().lower() @staticmethod def normalize_markdown(md_text: str) -> str: with tempfile.TemporaryDirectory() as tmp_dir: dirpath = Path(tmp_dir) input_file = dirpath / 'input.md' input_file.write_text(md_text, encoding='utf-8') # Markdown to HTML html_file = dirpath / 'temp.html' subprocess.run( [ 'pandoc', str(input_file), '-f', 'markdown+tex_math_dollars', '-t', 'html', '-o', str(html_file), '--quiet' ], check=True ) # HTML to Markdown output_file = dirpath / 'output.md' subprocess.run( [ 'pandoc', str(html_file), '-f', 'html', '-t', 'markdown+tex_math_dollars', '-o', str(output_file), '--quiet' ], check=True ) # Read back the normalized Markdown normalized_md = output_file.read_text(encoding='utf-8') return normalized_md def standardize_math(self, match): try: delim = "$$" if match.group(0).startswith('$$') else "$" math_content = match.group(1) or match.group(2) if delim == "$$": math_content = latex2mathml.converter.convert(math_content) else: math_content = self.clean_latex(math_content) return f'{delim}{math_content}{delim}' except Exception as e: print(f"Failed to standardize math expression: {match.group(0)} with error: {e}") return match.group(0) @staticmethod def clean_latex(latex_str): latex_str = re.sub(r'\s+', ' ', latex_str.strip()) for tag in [r'\\text', r'\\mathrm', r'\\mathbf', r'\\textbf']: latex_str = re.sub(tag + r'\{([^}]+)\}', r'\1', latex_str) replacements = { '\\times': '*', '\\cdot': '*', '\\div': '/', '\\le': '<=', '\\ge': '>=', '\\neq': '!=', '\\to': '\\rightarrow', } for old, new in replacements.items(): latex_str = latex_str.replace(old, new) return latex_str ``` -------------------------------------------------------------------------------- /marker/utils/gpu.py: -------------------------------------------------------------------------------- ```python import os import subprocess import torch from marker.logger import get_logger from marker.settings import settings logger = get_logger() class GPUManager: default_gpu_vram: int = 8 def __init__(self, device_idx: int): self.device_idx = device_idx self.original_compute_mode = None self.mps_server_process = None def __enter__(self): if self.using_cuda(): self.start_mps_server() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.using_cuda(): self.cleanup() @staticmethod def using_cuda(): return "cuda" in settings.TORCH_DEVICE_MODEL def check_cuda_available(self) -> bool: if not torch.cuda.is_available(): return False try: subprocess.run(["nvidia-smi", "--version"], capture_output=True, check=True) return True except (subprocess.CalledProcessError, FileNotFoundError): return False def get_gpu_vram(self): if not self.using_cuda(): return self.default_gpu_vram try: result = subprocess.run( [ "nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits", "-i", str(self.device_idx), ], capture_output=True, text=True, check=True, ) vram_mb = int(result.stdout.strip()) vram_gb = int(vram_mb / 1024) return vram_gb except (subprocess.CalledProcessError, ValueError, FileNotFoundError): return self.default_gpu_vram def start_mps_server(self) -> bool: if not self.check_cuda_available(): return False try: # Set MPS environment with chunk-specific directories env = os.environ.copy() pipe_dir = f"/tmp/nvidia-mps-{self.device_idx}" log_dir = f"/tmp/nvidia-log-{self.device_idx}" env["CUDA_MPS_PIPE_DIRECTORY"] = pipe_dir env["CUDA_MPS_LOG_DIRECTORY"] = log_dir # Create directories os.makedirs(pipe_dir, exist_ok=True) os.makedirs(log_dir, exist_ok=True) # Start MPS control daemon self.mps_server_process = subprocess.Popen( ["nvidia-cuda-mps-control", "-d"], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) logger.info(f"Started NVIDIA MPS server for chunk {self.device_idx}") return True except (subprocess.CalledProcessError, FileNotFoundError) as e: logger.warning( f"Failed to start MPS server for chunk {self.device_idx}: {e}" ) return False def stop_mps_server(self) -> None: try: # Stop MPS server env = os.environ.copy() env["CUDA_MPS_PIPE_DIRECTORY"] = f"/tmp/nvidia-mps-{self.device_idx}" env["CUDA_MPS_LOG_DIRECTORY"] = f"/tmp/nvidia-log-{self.device_idx}" subprocess.run( ["nvidia-cuda-mps-control"], input="quit\n", text=True, env=env, timeout=10, ) if self.mps_server_process: self.mps_server_process.terminate() try: self.mps_server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.mps_server_process.kill() self.mps_server_process = None logger.info(f"Stopped NVIDIA MPS server for chunk {self.device_idx}") except Exception as e: logger.warning( f"Failed to stop MPS server for chunk {self.device_idx}: {e}" ) def cleanup(self) -> None: self.stop_mps_server() ``` -------------------------------------------------------------------------------- /marker/processors/sectionheader.py: -------------------------------------------------------------------------------- ```python import warnings from typing import Annotated, Dict, List import numpy as np from sklearn.cluster import KMeans from sklearn.exceptions import ConvergenceWarning from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.document import Document # Ignore sklearn warning about not converging warnings.filterwarnings("ignore", category=ConvergenceWarning) class SectionHeaderProcessor(BaseProcessor): """ A processor for recognizing section headers in the document. """ block_types = (BlockTypes.SectionHeader, ) level_count: Annotated[ int, "The number of levels to use for headings.", ] = 4 merge_threshold: Annotated[ float, "The minimum gap between headings to consider them part of the same group.", ] = 0.25 default_level: Annotated[ int, "The default heading level to use if no heading level is detected.", ] = 2 height_tolerance: Annotated[ float, "The minimum height of a heading to consider it a heading.", ] = 0.99 def __call__(self, document: Document): line_heights: Dict[int, float] = {} for page in document.pages: # Iterate children to grab all section headers for block in page.children: if block.block_type not in self.block_types: continue if block.structure is not None: line_heights[block.id] = block.line_height(document) else: line_heights[block.id] = 0 block.ignore_for_output = True # Don't output an empty section header flat_line_heights = list(line_heights.values()) heading_ranges = self.bucket_headings(flat_line_heights) for page in document.pages: # Iterate children to grab all section headers for block in page.children: if block.block_type not in self.block_types: continue block_height = line_heights.get(block.id, 0) if block_height > 0: for idx, (min_height, max_height) in enumerate(heading_ranges): if block_height >= min_height * self.height_tolerance: block.heading_level = idx + 1 break if block.heading_level is None: block.heading_level = self.default_level def bucket_headings(self, line_heights: List[float], num_levels=4): if len(line_heights) <= self.level_count: return [] data = np.asarray(line_heights).reshape(-1, 1) labels = KMeans(n_clusters=num_levels, random_state=0, n_init="auto").fit_predict(data) data_labels = np.concatenate([data, labels.reshape(-1, 1)], axis=1) data_labels = np.sort(data_labels, axis=0) cluster_means = {int(label): float(np.mean(data_labels[data_labels[:, 1] == label, 0])) for label in np.unique(labels)} label_max = None label_min = None heading_ranges = [] prev_cluster = None for row in data_labels: value, label = row value = float(value) label = int(label) if prev_cluster is not None and label != prev_cluster: prev_cluster_mean = cluster_means[prev_cluster] cluster_mean = cluster_means[label] if cluster_mean * self.merge_threshold < prev_cluster_mean: heading_ranges.append((label_min, label_max)) label_min = None label_max = None label_min = value if label_min is None else min(label_min, value) label_max = value if label_max is None else max(label_max, value) prev_cluster = label if label_min is not None: heading_ranges.append((label_min, label_max)) heading_ranges = sorted(heading_ranges, reverse=True) return heading_ranges ``` -------------------------------------------------------------------------------- /marker/config/printer.py: -------------------------------------------------------------------------------- ```python from typing import Optional import click from marker.config.crawler import crawler class CustomClickPrinter(click.Command): def parse_args(self, ctx, args): display_help = "config" in args and "--help" in args if display_help: click.echo( "Here is a list of all the Builders, Processors, Converters, Providers and Renderers in Marker along with their attributes:" ) # Keep track of shared attributes and their types shared_attrs = {} # First pass: identify shared attributes and verify compatibility for base_type, base_type_dict in crawler.class_config_map.items(): for class_name, class_map in base_type_dict.items(): for attr, (attr_type, formatted_type, default, metadata) in class_map[ "config" ].items(): if attr not in shared_attrs: shared_attrs[attr] = { "classes": [], "type": attr_type, "is_flag": attr_type in [bool, Optional[bool]] and not default, "metadata": metadata, "default": default, } shared_attrs[attr]["classes"].append(class_name) # These are the types of attrs that can be set from the command line attr_types = [ str, int, float, bool, Optional[int], Optional[float], Optional[str], ] # Add shared attribute options first for attr, info in shared_attrs.items(): if info["type"] in attr_types: ctx.command.params.append( click.Option( ["--" + attr], type=info["type"], help=" ".join(info["metadata"]) + f" (Applies to: {', '.join(info['classes'])})", default=None, # This is important, or it sets all the default keys again in config is_flag=info["is_flag"], flag_value=True if info["is_flag"] else None, ) ) # Second pass: create class-specific options for base_type, base_type_dict in crawler.class_config_map.items(): if display_help: click.echo(f"{base_type}s:") for class_name, class_map in base_type_dict.items(): if display_help and class_map["config"]: click.echo( f"\n {class_name}: {class_map['class_type'].__doc__ or ''}" ) click.echo(" " * 4 + "Attributes:") for attr, (attr_type, formatted_type, default, metadata) in class_map[ "config" ].items(): class_name_attr = class_name + "_" + attr if display_help: click.echo(" " * 8 + f"{attr} ({formatted_type}):") click.echo( "\n".join([f"{' ' * 12}" + desc for desc in metadata]) ) if attr_type in attr_types: is_flag = attr_type in [bool, Optional[bool]] and not default # Only add class-specific options ctx.command.params.append( click.Option( ["--" + class_name_attr, class_name_attr], type=attr_type, help=" ".join(metadata), is_flag=is_flag, default=None, # This is important, or it sets all the default keys again in config ) ) if display_help: ctx.exit() super().parse_args(ctx, args) ``` -------------------------------------------------------------------------------- /marker/processors/text.py: -------------------------------------------------------------------------------- ```python import math from typing import Annotated, List import regex from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.document import Document from marker.schema.text.line import Line class TextProcessor(BaseProcessor): """ A processor for merging text across pages and columns. """ block_types = (BlockTypes.Text, BlockTypes.TextInlineMath) ignored_block_types = (BlockTypes.PageHeader, BlockTypes.PageFooter) column_gap_ratio: Annotated[ float, "The minimum ratio of the page width to the column gap to consider a column break.", ] = 0.02 def __init__(self, config): super().__init__(config) def __call__(self, document: Document): for page in document.pages: for block in page.contained_blocks(document, self.block_types): if block.structure is None: continue if not len(block.structure) >= 2: # Skip single lines continue next_block = document.get_next_block(block, self.ignored_block_types) if next_block is None: # we've reached the end of the document continue if next_block.block_type not in self.block_types: continue # we found a non-text block if next_block.structure is None: continue # This is odd though, why do we have text blocks with no structure? if next_block.ignore_for_output: continue # skip ignored blocks column_gap = block.polygon.width * self.column_gap_ratio column_break, page_break = False, False next_block_starts_indented = True next_block_in_first_quadrant = False last_line_is_full_width = False last_line_is_hyphentated = False if next_block.page_id == block.page_id: # block on the same page # we check for a column break column_break = math.floor(next_block.polygon.y_start) <= math.ceil( block.polygon.y_start ) and next_block.polygon.x_start > ( block.polygon.x_end + column_gap ) else: page_break = True next_page = document.get_page(next_block.page_id) next_block_in_first_quadrant = ( next_block.polygon.x_start < next_page.polygon.width // 2 ) and (next_block.polygon.y_start < next_page.polygon.height // 2) if not (column_break or page_break): continue new_block_lines = next_block.structure_blocks(document) # we check for next_block indentation if len(new_block_lines): min_x = math.ceil( min([line.polygon.x_start for line in new_block_lines]) ) next_block_starts_indented = ( new_block_lines[0].polygon.x_start > min_x ) lines: List[Line] = [ line for line in block.structure_blocks(document) if line.polygon.width > 1 ] if len(lines): max_x = math.floor(max([line.polygon.x_end for line in lines])) last_line_is_full_width = lines[-1].polygon.x_end >= max_x last_line_is_hyphentated = regex.compile( r".*[\p{Ll}|\d][-—¬]\s?$", regex.DOTALL ).match(lines[-1].raw_text(document).strip()) if ( (last_line_is_full_width or last_line_is_hyphentated) and not next_block_starts_indented and ((next_block_in_first_quadrant and page_break) or column_break) ): block.has_continuation = True ``` -------------------------------------------------------------------------------- /benchmarks/overall/methods/__init__.py: -------------------------------------------------------------------------------- ```python import io import random import re from typing import Tuple import markdown2 from PIL import Image from playwright.sync_api import sync_playwright from benchmarks.overall.methods.schema import BenchmarkResult from marker.renderers.markdown import MarkdownRenderer class BaseMethod: def __init__(self, **kwargs): for kwarg in kwargs: if hasattr(self, kwarg): setattr(self, kwarg, kwargs[kwarg]) @staticmethod def convert_to_md(html: str): md = MarkdownRenderer() markdown = md.md_cls.convert(html) return markdown def __call__(self, sample) -> BenchmarkResult: raise NotImplementedError() def render(self, markdown: str): return self.html_to_image(self.convert_to_html(markdown)) @staticmethod def convert_to_html(md: str): block_placeholders = [] inline_placeholders = [] # Add placeholders for the math def block_sub(match): content = match.group(1) placeholder = f"1BLOCKMATH{len(block_placeholders)}1" block_placeholders.append((placeholder, f"$${content}$$")) return placeholder def inline_sub(match): content = match.group(1) placeholder = f"1INLINEMATH{len(inline_placeholders)}1" inline_placeholders.append((placeholder, f"${content}$")) return placeholder md = re.sub(r'\${2}(.*?)\${2}', block_sub, md, flags=re.DOTALL) md = re.sub(r'\$(.*?)\$', inline_sub, md) html = markdown2.markdown(md, extras=['tables']) # Replace placeholders for placeholder, math_str in block_placeholders: html = html.replace(placeholder, math_str) for placeholder, math_str in inline_placeholders: html = html.replace(placeholder, math_str) return html def html_to_image(self, html: str) -> Image.Image: with sync_playwright() as p: browser = p.chromium.launch() page = browser.new_page() html_str = f""" <!DOCTYPE html> <html> <head> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/katex.min.css" integrity="sha384-zh0CIslj+VczCZtlzBcjt5ppRcsAmDnRem7ESsYwWwg3m/OaJ2l4x7YBZl9Kxxib" crossorigin="anonymous"> <!-- The loading of KaTeX is deferred to speed up page rendering --> <script defer src="https://cdn.jsdelivr.net/npm/[email protected]/dist/katex.min.js" integrity="sha384-Rma6DA2IPUwhNxmrB/7S3Tno0YY7sFu9WSYMCuulLhIqYSGZ2gKCJWIqhBWqMQfh" crossorigin="anonymous"></script> <!-- To automatically render math in text elements, include the auto-render extension: --> <script defer src="https://cdn.jsdelivr.net/npm/[email protected]/dist/contrib/auto-render.min.js" integrity="sha384-hCXGrW6PitJEwbkoStFjeJxv+fSOOQKOPbJxSfM6G5sWZjAyWhXiTIIAmQqnlLlh" crossorigin="anonymous"></script> </head> <body> {html} <script> document.addEventListener("DOMContentLoaded", function() {{ renderMathInElement(document.body, {{ delimiters: [ {{left: '$$', right: '$$', display: true}}, {{left: '$', right: '$', display: false}} ], throwOnError : false }}); }}); </script> </body> </html> """.strip() page.set_viewport_size({"width": 1200, "height": 800}) page.set_content(html_str) page.wait_for_load_state("domcontentloaded") page.wait_for_timeout(500) # Wait for KaTeX to render screenshot_bytes = page.screenshot(full_page=True) browser.close() return Image.open(io.BytesIO(screenshot_bytes)) ``` -------------------------------------------------------------------------------- /marker/processors/llm/llm_form.py: -------------------------------------------------------------------------------- ```python from typing import List from pydantic import BaseModel from marker.output import json_to_html from marker.processors.llm import PromptData, BaseLLMSimpleBlockProcessor, BlockData from marker.schema import BlockTypes from marker.schema.document import Document class LLMFormProcessor(BaseLLMSimpleBlockProcessor): block_types = (BlockTypes.Form,) form_rewriting_prompt = """You are a text correction expert specializing in accurately reproducing text from images. You will receive an image of a text block and an html representation of the form in the image. Your task is to correct any errors in the html representation, and format it properly. Values and labels should appear in html tables, with the labels on the left side, and values on the right. Other text in the form can appear between the tables. Only use the tags `table, p, span, i, b, th, td, tr, and div`. Do not omit any text from the form - make sure everything is included in the html representation. It should be as faithful to the original form as possible. **Instructions:** 1. Carefully examine the provided form block image. 2. Analyze the html representation of the form. 3. Compare the html representation to the image. 4. If the html representation is correct, or you cannot read the image properly, then write "No corrections needed." 5. If the html representation contains errors, generate the corrected html representation. 6. Output only either the corrected html representation or "No corrections needed." **Example:** Input: ```html <table> <tr> <td>Label 1</td> <td>Label 2</td> <td>Label 3</td> </tr> <tr> <td>Value 1</td> <td>Value 2</td> <td>Value 3</td> </tr> </table> ``` Output: Comparison: The html representation has the labels in the first row and the values in the second row. It should be corrected to have the labels on the left side and the values on the right side. ```html <table> <tr> <td>Label 1</td> <td>Value 1</td> </tr> <tr> <td>Label 2</td> <td>Value 2</td> </tr> <tr> <td>Label 3</td> <td>Value 3</td> </tr> </table> ``` **Input:** ```html {block_html} ``` """ def inference_blocks(self, document: Document) -> List[BlockData]: blocks = super().inference_blocks(document) out_blocks = [] for block_data in blocks: block = block_data["block"] children = block.contained_blocks(document, (BlockTypes.TableCell,)) if not children: continue out_blocks.append(block_data) return out_blocks def block_prompts(self, document: Document) -> List[PromptData]: prompt_data = [] for block_data in self.inference_blocks(document): block = block_data["block"] block_html = json_to_html(block.render(document)) prompt = self.form_rewriting_prompt.replace("{block_html}", block_html) image = self.extract_image(document, block) prompt_data.append({ "prompt": prompt, "image": image, "block": block, "schema": FormSchema, "page": block_data["page"] }) return prompt_data def rewrite_block(self, response: dict, prompt_data: PromptData, document: Document): block = prompt_data["block"] block_html = json_to_html(block.render(document)) if not response or "corrected_html" not in response: block.update_metadata(llm_error_count=1) return corrected_html = response["corrected_html"] # The original table is okay if "no corrections needed" in corrected_html.lower(): return # Potentially a partial response if len(corrected_html) < len(block_html) * .33: block.update_metadata(llm_error_count=1) return corrected_html = corrected_html.strip().lstrip("```html").rstrip("```").strip() block.html = corrected_html class FormSchema(BaseModel): comparison: str corrected_html: str ``` -------------------------------------------------------------------------------- /marker/renderers/ocr_json.py: -------------------------------------------------------------------------------- ```python from typing import Annotated, List, Tuple from pydantic import BaseModel from marker.renderers import BaseRenderer from marker.schema import BlockTypes from marker.schema.document import Document class OCRJSONCharOutput(BaseModel): id: str block_type: str text: str polygon: List[List[float]] bbox: List[float] class OCRJSONLineOutput(BaseModel): id: str block_type: str html: str polygon: List[List[float]] bbox: List[float] children: List["OCRJSONCharOutput"] | None = None class OCRJSONPageOutput(BaseModel): id: str block_type: str polygon: List[List[float]] bbox: List[float] children: List[OCRJSONLineOutput] | None = None class OCRJSONOutput(BaseModel): children: List[OCRJSONPageOutput] block_type: str = str(BlockTypes.Document) metadata: dict | None = None class OCRJSONRenderer(BaseRenderer): """ A renderer for OCR JSON output. """ image_blocks: Annotated[ Tuple[BlockTypes], "The list of block types to consider as images.", ] = (BlockTypes.Picture, BlockTypes.Figure) page_blocks: Annotated[ Tuple[BlockTypes], "The list of block types to consider as pages.", ] = (BlockTypes.Page,) def extract_json(self, document: Document) -> List[OCRJSONPageOutput]: pages = [] for page in document.pages: page_equations = [ b for b in page.children if b.block_type == BlockTypes.Equation and not b.removed ] equation_lines = [] for equation in page_equations: if not equation.structure: continue equation_lines += [ line for line in equation.structure if line.block_type == BlockTypes.Line ] page_lines = [ block for block in page.children if block.block_type == BlockTypes.Line and block.id not in equation_lines and not block.removed ] lines = [] for line in page_lines + page_equations: line_obj = OCRJSONLineOutput( id=str(line.id), block_type=str(line.block_type), html="", polygon=line.polygon.polygon, bbox=line.polygon.bbox, ) if line in page_equations: line_obj.html = line.html else: line_obj.html = line.formatted_text(document) spans = ( [document.get_block(span_id) for span_id in line.structure] if line.structure else [] ) children = [] for span in spans: if not span.structure: continue span_chars = [ document.get_block(char_id) for char_id in span.structure ] children.extend( [ OCRJSONCharOutput( id=str(char.id), block_type=str(char.block_type), text=char.text, polygon=char.polygon.polygon, bbox=char.polygon.bbox, ) for char in span_chars ] ) line_obj.children = children lines.append(line_obj) page = OCRJSONPageOutput( id=str(page.id), block_type=str(page.block_type), polygon=page.polygon.polygon, bbox=page.polygon.bbox, children=lines, ) pages.append(page) return pages def __call__(self, document: Document) -> OCRJSONOutput: return OCRJSONOutput(children=self.extract_json(document), metadata=None) ``` -------------------------------------------------------------------------------- /marker/services/openai.py: -------------------------------------------------------------------------------- ```python import json import time from typing import Annotated, List import openai import PIL from marker.logger import get_logger from openai import APITimeoutError, RateLimitError from PIL import Image from pydantic import BaseModel from marker.schema.blocks import Block from marker.services import BaseService logger = get_logger() class OpenAIService(BaseService): openai_base_url: Annotated[ str, "The base url to use for OpenAI-like models. No trailing slash." ] = "https://api.openai.com/v1" openai_model: Annotated[str, "The model name to use for OpenAI-like model."] = ( "gpt-4o-mini" ) openai_api_key: Annotated[ str, "The API key to use for the OpenAI-like service." ] = None openai_image_format: Annotated[ str, "The image format to use for the OpenAI-like service. Use 'png' for better compatability", ] = "webp" def process_images(self, images: List[Image.Image]) -> List[dict]: """ Generate the base-64 encoded message to send to an openAI-compatabile multimodal model. Args: images: Image or list of PIL images to include format: Format to use for the image; use "png" for better compatability. Returns: A list of OpenAI-compatbile multimodal messages containing the base64-encoded images. """ if isinstance(images, Image.Image): images = [images] img_fmt = self.openai_image_format return [ { "type": "image_url", "image_url": { "url": "data:image/{};base64,{}".format( img_fmt, self.img_to_base64(img, format=img_fmt) ), }, } for img in images ] def __call__( self, prompt: str, image: PIL.Image.Image | List[PIL.Image.Image] | None, block: Block | None, response_schema: type[BaseModel], max_retries: int | None = None, timeout: int | None = None, ): if max_retries is None: max_retries = self.max_retries if timeout is None: timeout = self.timeout client = self.get_client() image_data = self.format_image_for_llm(image) messages = [ { "role": "user", "content": [ *image_data, {"type": "text", "text": prompt}, ], } ] total_tries = max_retries + 1 for tries in range(1, total_tries + 1): try: response = client.beta.chat.completions.parse( extra_headers={ "X-Title": "Marker", "HTTP-Referer": "https://github.com/datalab-to/marker", }, model=self.openai_model, messages=messages, timeout=timeout, response_format=response_schema, ) response_text = response.choices[0].message.content total_tokens = response.usage.total_tokens if block: block.update_metadata( llm_tokens_used=total_tokens, llm_request_count=1 ) return json.loads(response_text) except (APITimeoutError, RateLimitError) as e: # Rate limit exceeded if tries == total_tries: # Last attempt failed. Give up logger.error( f"Rate limit error: {e}. Max retries reached. Giving up. (Attempt {tries}/{total_tries})", ) break else: wait_time = tries * self.retry_wait_time logger.warning( f"Rate limit error: {e}. Retrying in {wait_time} seconds... (Attempt {tries}/{total_tries})", ) time.sleep(wait_time) except Exception as e: logger.error(f"OpenAI inference failed: {e}") break return {} def get_client(self) -> openai.OpenAI: return openai.OpenAI(api_key=self.openai_api_key, base_url=self.openai_base_url) ``` -------------------------------------------------------------------------------- /marker/services/claude.py: -------------------------------------------------------------------------------- ```python import json import time from typing import List, Annotated, T import PIL from PIL import Image import anthropic from anthropic import RateLimitError, APITimeoutError from marker.logger import get_logger from pydantic import BaseModel from marker.schema.blocks import Block from marker.services import BaseService logger = get_logger() class ClaudeService(BaseService): claude_model_name: Annotated[ str, "The name of the Google model to use for the service." ] = "claude-3-7-sonnet-20250219" claude_api_key: Annotated[str, "The Claude API key to use for the service."] = None max_claude_tokens: Annotated[ int, "The maximum number of tokens to use for a single Claude request." ] = 8192 def process_images(self, images: List[Image.Image]) -> List[dict]: return [ { "type": "image", "source": { "type": "base64", "media_type": "image/webp", "data": self.img_to_base64(img), }, } for img in images ] def validate_response(self, response_text: str, schema: type[T]) -> T: response_text = response_text.strip() if response_text.startswith("```json"): response_text = response_text[7:] if response_text.endswith("```"): response_text = response_text[:-3] try: # Try to parse as JSON first out_schema = schema.model_validate_json(response_text) out_json = out_schema.model_dump() return out_json except Exception: try: # Re-parse with fixed escapes escaped_str = response_text.replace("\\", "\\\\") out_schema = schema.model_validate_json(escaped_str) return out_schema.model_dump() except Exception: return def get_client(self): return anthropic.Anthropic( api_key=self.claude_api_key, ) def __call__( self, prompt: str, image: PIL.Image.Image | List[PIL.Image.Image] | None, block: Block | None, response_schema: type[BaseModel], max_retries: int | None = None, timeout: int | None = None, ): if max_retries is None: max_retries = self.max_retries if timeout is None: timeout = self.timeout schema_example = response_schema.model_json_schema() system_prompt = f""" Follow the instructions given by the user prompt. You must provide your response in JSON format matching this schema: {json.dumps(schema_example, indent=2)} Respond only with the JSON schema, nothing else. Do not include ```json, ```, or any other formatting. """.strip() client = self.get_client() image_data = self.format_image_for_llm(image) messages = [ { "role": "user", "content": [ *image_data, {"type": "text", "text": prompt}, ], } ] total_tries = max_retries + 1 for tries in range(1, total_tries + 1): try: response = client.messages.create( system=system_prompt, model=self.claude_model_name, max_tokens=self.max_claude_tokens, messages=messages, timeout=timeout, ) # Extract and validate response response_text = response.content[0].text return self.validate_response(response_text, response_schema) except (RateLimitError, APITimeoutError) as e: # Rate limit exceeded if tries == total_tries: # Last attempt failed. Give up logger.error( f"Rate limit error: {e}. Max retries reached. Giving up. (Attempt {tries}/{total_tries})", ) break else: wait_time = tries * self.retry_wait_time logger.warning( f"Rate limit error: {e}. Retrying in {wait_time} seconds... (Attempt {tries}/{total_tries})", ) time.sleep(wait_time) except Exception as e: logger.error(f"Error during Claude API call: {e}") break return {} ``` -------------------------------------------------------------------------------- /CLA.md: -------------------------------------------------------------------------------- ```markdown Marker Contributor Agreement This Marker Contributor Agreement ("MCA") applies to any contribution that you make to any product or project managed by us (the "project"), and sets out the intellectual property rights you grant to us in the contributed materials. The term "us" shall mean Endless Labs, Inc. The term "you" shall mean the person or entity identified below. If you agree to be bound by these terms, sign by writing "I have read the CLA document and I hereby sign the CLA" in response to the CLA bot Github comment. Read this agreement carefully before signing. These terms and conditions constitute a binding legal agreement. 1. The term 'contribution' or 'contributed materials' means any source code, object code, patch, tool, sample, graphic, specification, manual, documentation, or any other material posted or submitted by you to the project. 2. With respect to any worldwide copyrights, or copyright applications and registrations, in your contribution: - you hereby assign to us joint ownership, and to the extent that such assignment is or becomes invalid, ineffective or unenforceable, you hereby grant to us a perpetual, irrevocable, non-exclusive, worldwide, no-charge, royalty free, unrestricted license to exercise all rights under those copyrights. This includes, at our option, the right to sublicense these same rights to third parties through multiple levels of sublicensees or other licensing arrangements, including dual-license structures for commercial customers; - you agree that each of us can do all things in relation to your contribution as if each of us were the sole owners, and if one of us makes a derivative work of your contribution, the one who makes the derivative work (or has it made will be the sole owner of that derivative work; - you agree that you will not assert any moral rights in your contribution against us, our licensees or transferees; - you agree that we may register a copyright in your contribution and exercise all ownership rights associated with it; and - you agree that neither of us has any duty to consult with, obtain the consent of, pay or render an accounting to the other for any use or distribution of vour contribution. 3. With respect to any patents you own, or that you can license without payment to any third party, you hereby grant to us a perpetual, irrevocable, non-exclusive, worldwide, no-charge, royalty-free license to: - make, have made, use, sell, offer to sell, import, and otherwise transfer your contribution in whole or in part, alone or in combination with or included in any product, work or materials arising out of the project to which your contribution was submitted, and - at our option, to sublicense these same rights to third parties through multiple levels of sublicensees or other licensing arrangements. If you or your affiliates institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the contribution or any project it was submitted to constitutes direct or contributory patent infringement, then any patent licenses granted to you under this agreement for that contribution shall terminate as of the date such litigation is filed. 4. Except as set out above, you keep all right, title, and interest in your contribution. The rights that you grant to us under these terms are effective on the date you first submitted a contribution to us, even if your submission took place before the date you sign these terms. Any contribution we make available under any license will also be made available under a suitable FSF (Free Software Foundation) or OSI (Open Source Initiative) approved license. 5. You covenant, represent, warrant and agree that: - each contribution that you submit is and shall be an original work of authorship and you can legally grant the rights set out in this MCA; - to the best of your knowledge, each contribution will not violate any third party's copyrights, trademarks, patents, or other intellectual property rights; and - each contribution shall be in compliance with U.S. export control laws and other applicable export and import laws. You agree to notify us if you become aware of any circumstance which would make any of the foregoing representations inaccurate in any respect. Endless Labs, Inc. may publicly disclose your participation in the project, including the fact that you have signed the MCA. 6. This MCA is governed by the laws of the State of California and applicable U.S. Federal law. Any choice of law rules will not apply. ``` -------------------------------------------------------------------------------- /marker/schema/text/line.py: -------------------------------------------------------------------------------- ```python import html import re from typing import Literal, List import regex from marker.schema import BlockTypes from marker.schema.blocks import Block, BlockOutput HYPHENS = r"-—¬" def remove_tags(text): return re.sub(r"<[^>]+>", "", text) def replace_last(string, old, new): matches = list(re.finditer(old, string)) if not matches: return string last_match = matches[-1] return string[: last_match.start()] + new + string[last_match.end() :] def strip_trailing_hyphens(line_text, next_line_text, line_html) -> str: lowercase_letters = r"\p{Ll}" hyphen_regex = regex.compile(rf".*[{HYPHENS}]\s?$", regex.DOTALL) next_line_starts_lowercase = regex.match( rf"^\s?[{lowercase_letters}]", next_line_text ) if hyphen_regex.match(line_text) and next_line_starts_lowercase: line_html = replace_last(line_html, rf"[{HYPHENS}]", "") return line_html class Line(Block): block_type: BlockTypes = BlockTypes.Line block_description: str = "A line of text." formats: List[Literal["math"]] | None = ( None # Sometimes we want to set math format at the line level, not span ) def ocr_input_text(self, document): text = "" for block in self.contained_blocks(document, (BlockTypes.Span,)): # We don't include superscripts/subscripts and math since they can be unreliable at this stage block_text = block.text if block.italic: text += f"<i>{block_text}</i>" elif block.bold: text += f"<b>{block_text}</b>" else: text += block_text return text.strip() def formatted_text(self, document, skip_urls=False): text = "" for block in self.contained_blocks(document, (BlockTypes.Span,)): block_text = html.escape(block.text) if block.has_superscript: block_text = re.sub(r"^([0-9\W]+)(.*)", r"<sup>\1</sup>\2", block_text) if "<sup>" not in block_text: block_text = f"<sup>{block_text}</sup>" if block.url and not skip_urls: block_text = f"<a href='{block.url}'>{block_text}</a>" if block.italic: text += f"<i>{block_text}</i>" elif block.bold: text += f"<b>{block_text}</b>" elif block.math: text += f"<math display='inline'>{block_text}</math>" else: text += block_text return text def assemble_html(self, document, child_blocks, parent_structure, block_config): template = "" for c in child_blocks: template += c.html raw_text = remove_tags(template).strip() structure_idx = parent_structure.index(self.id) if structure_idx < len(parent_structure) - 1: next_block_id = parent_structure[structure_idx + 1] next_line = document.get_block(next_block_id) next_line_raw_text = next_line.raw_text(document) template = strip_trailing_hyphens(raw_text, next_line_raw_text, template) else: template = template.strip( " " ) # strip any trailing whitespace from the last line return template def render( self, document, parent_structure, section_hierarchy=None, block_config=None ): child_content = [] if self.structure is not None and len(self.structure) > 0: for block_id in self.structure: block = document.get_block(block_id) child_content.append( block.render( document, parent_structure, section_hierarchy, block_config ) ) return BlockOutput( html=self.assemble_html( document, child_content, parent_structure, block_config ), polygon=self.polygon, id=self.id, children=[], section_hierarchy=section_hierarchy, ) def merge(self, other: "Line"): self.polygon = self.polygon.merge([other.polygon]) # Handle merging structure with Nones if self.structure is None: self.structure = other.structure elif other.structure is not None: self.structure = self.structure + other.structure # Merge formats with Nones if self.formats is None: self.formats = other.formats elif other.formats is not None: self.formats = list(set(self.formats + other.formats)) ``` -------------------------------------------------------------------------------- /marker/processors/list.py: -------------------------------------------------------------------------------- ```python from typing import Annotated, List, Tuple from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.blocks import ListItem from marker.schema.document import Document class ListProcessor(BaseProcessor): """ A processor for merging lists across pages and columns """ block_types = (BlockTypes.ListGroup,) ignored_block_types: Annotated[ Tuple[BlockTypes], "The list of block types to ignore when merging lists.", ] = (BlockTypes.PageHeader, BlockTypes.PageFooter) min_x_indent: Annotated[ float, "The minimum horizontal indentation required to consider a block as a nested list item.", "This is expressed as a percentage of the page width and is used to determine hierarchical relationships within a list.", ] = 0.01 def __init__(self, config): super().__init__(config) def __call__(self, document: Document): self.list_group_continuation(document) self.list_group_indentation(document) def list_group_continuation(self, document: Document): for page in document.pages: for block in page.contained_blocks(document, self.block_types): next_block = document.get_next_block(block, self.ignored_block_types) if next_block is None: continue if next_block.block_type not in self.block_types: continue if next_block.structure is None: continue if next_block.ignore_for_output: continue column_break, page_break = False, False next_block_in_first_quadrant = False if next_block.page_id == block.page_id: # block on the same page # we check for a column break column_break = next_block.polygon.y_start <= block.polygon.y_end else: page_break = True next_page = document.get_page(next_block.page_id) next_block_in_first_quadrant = (next_block.polygon.x_start < next_page.polygon.width // 2) and \ (next_block.polygon.y_start < next_page.polygon.height // 2) block.has_continuation = column_break or (page_break and next_block_in_first_quadrant) def list_group_indentation(self, document: Document): for page in document.pages: for block in page.contained_blocks(document, self.block_types): if block.structure is None: continue if block.ignore_for_output: continue stack: List[ListItem] = [block.get_next_block(page, None)] for list_item_id in block.structure: list_item_block: ListItem = page.get_block(list_item_id) # This can be a line sometimes if list_item_block.block_type != BlockTypes.ListItem: continue while stack and list_item_block.polygon.x_start <= stack[-1].polygon.x_start + (self.min_x_indent * page.polygon.width): stack.pop() if stack and list_item_block.polygon.y_start > stack[-1].polygon.y_start: list_item_block.list_indent_level = stack[-1].list_indent_level if list_item_block.polygon.x_start > stack[-1].polygon.x_start + (self.min_x_indent * page.polygon.width): list_item_block.list_indent_level += 1 next_list_item_block = block.get_next_block(page, list_item_block) if next_list_item_block is not None and next_list_item_block.polygon.x_start > list_item_block.polygon.x_end: stack = [next_list_item_block] # reset stack on column breaks else: stack.append(list_item_block) stack: List[ListItem] = [block.get_next_block(page, None)] for list_item_id in block.structure.copy(): list_item_block: ListItem = page.get_block(list_item_id) while stack and list_item_block.list_indent_level <= stack[-1].list_indent_level: stack.pop() if stack: current_parent = stack[-1] current_parent.add_structure(list_item_block) current_parent.polygon = current_parent.polygon.merge([list_item_block.polygon]) block.remove_structure_items([list_item_id]) stack.append(list_item_block) ``` -------------------------------------------------------------------------------- /marker/config/crawler.py: -------------------------------------------------------------------------------- ```python import importlib import inspect import pkgutil from functools import cached_property from typing import Annotated, Dict, Set, Type, get_args, get_origin from marker.builders import BaseBuilder from marker.converters import BaseConverter from marker.extractors import BaseExtractor from marker.processors import BaseProcessor from marker.providers import BaseProvider from marker.renderers import BaseRenderer from marker.services import BaseService class ConfigCrawler: def __init__( self, base_classes=( BaseBuilder, BaseProcessor, BaseConverter, BaseProvider, BaseRenderer, BaseService, BaseExtractor, ), ): self.base_classes = base_classes self.class_config_map: Dict[str, dict] = {} self._crawl_config() def _crawl_config(self): for base in self.base_classes: base_class_type = base.__name__.removeprefix("Base") self.class_config_map.setdefault(base_class_type, {}) for class_name, class_type in self._find_subclasses(base).items(): if class_name.startswith("Base"): continue self.class_config_map[base_class_type].setdefault( class_name, {"class_type": class_type, "config": {}} ) for attr, attr_type in self._gather_super_annotations( class_type ).items(): default = getattr(class_type, attr) metadata = (f"Default is {default}.",) if get_origin(attr_type) is Annotated: if any("Default" in desc for desc in attr_type.__metadata__): metadata = attr_type.__metadata__ else: metadata = attr_type.__metadata__ + metadata attr_type = get_args(attr_type)[0] formatted_type = self._format_type(attr_type) self.class_config_map[base_class_type][class_name]["config"][ attr ] = (attr_type, formatted_type, default, metadata) @staticmethod def _gather_super_annotations(cls: Type) -> Dict[str, Type]: """ Collect all annotated attributes from `cls` and its superclasses, bottom-up. Subclass attributes overwrite superclass attributes with the same name. """ # We'll walk the MRO from base -> derived so subclass attributes overwrite # the same attribute name from superclasses. annotations = {} for base in reversed(cls.__mro__): if base is object: continue if hasattr(base, "__annotations__"): for name, annotation in base.__annotations__.items(): annotations[name] = annotation return annotations @cached_property def attr_counts(self) -> Dict[str, int]: counts: Dict[str, int] = {} for base_type_dict in self.class_config_map.values(): for class_map in base_type_dict.values(): for attr in class_map["config"].keys(): counts[attr] = counts.get(attr, 0) + 1 return counts @cached_property def attr_set(self) -> Set[str]: attr_set: Set[str] = set() for base_type_dict in self.class_config_map.values(): for class_name, class_map in base_type_dict.items(): for attr in class_map["config"].keys(): attr_set.add(attr) attr_set.add(f"{class_name}_{attr}") return attr_set def _find_subclasses(self, base_class): subclasses = {} module_name = base_class.__module__ package = importlib.import_module(module_name) if hasattr(package, "__path__"): for _, module_name, _ in pkgutil.walk_packages( package.__path__, module_name + "." ): try: module = importlib.import_module(module_name) for name, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, base_class) and obj is not base_class: subclasses[name] = obj except ImportError: pass return subclasses def _format_type(self, t: Type) -> str: """Format a typing type like Optional[int] into a readable string.""" if get_origin(t): # Handle Optional and types with origins separately return f"{t}".removeprefix("typing.") else: # Regular types like int, str return t.__name__ crawler = ConfigCrawler() ``` -------------------------------------------------------------------------------- /marker/processors/line_merge.py: -------------------------------------------------------------------------------- ```python from typing import Annotated, List from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.blocks import Block from marker.schema.document import Document from marker.schema.text import Line from marker.util import matrix_intersection_area class LineMergeProcessor(BaseProcessor): """ A processor for merging inline math lines. """ block_types = (BlockTypes.Text, BlockTypes.TextInlineMath, BlockTypes.Caption, BlockTypes.Footnote, BlockTypes.SectionHeader) min_merge_pct: Annotated[ float, "The minimum percentage of intersection area to consider merging." ] = .015 block_expand_threshold: Annotated[ float, "The percentage of the block width to expand the bounding box." ] = .05 min_merge_ydist: Annotated[ float, "The minimum y distance between lines to consider merging." ] = 5 intersection_pct_threshold: Annotated[ float, "The total amount of intersection area concentrated in the max intersection block." ] = .5 vertical_overlap_pct_threshold: Annotated[ float, "The minimum percentage of vertical overlap to consider merging." ] = .8 use_llm: Annotated[ bool, "Whether to use LLMs to improve accuracy." ] = False def __init__(self, config): super().__init__(config) def merge_lines(self, lines: List[Line], block: Block): lines = [l for l in lines if l.polygon.width * 5 > l.polygon.height] # Skip vertical lines line_bboxes = [l.polygon.expand(self.block_expand_threshold, 0).bbox for l in lines] # Expand horizontally intersections = matrix_intersection_area(line_bboxes, line_bboxes) merges = [] merge = [] for i in range(len(line_bboxes)): intersection_row = intersections[i] intersection_row[i] = 0 # Zero out the current idx if i < len(line_bboxes) - 1: intersection_row[i+1] = 0 # Zero out the next idx, so we only evaluate merge from the left if len(merge) == 0: merge.append(i) continue # Zero out previous merge segments merge_intersection = sum([intersection_row[m] for m in merge]) line_area = lines[i].polygon.area intersection_pct = merge_intersection / max(1, line_area) total_intersection = max(1, sum(intersection_row)) line_start = lines[merge[0]].polygon.y_start line_end = lines[merge[0]].polygon.y_end vertical_overlap_start = max(line_start, lines[i].polygon.y_start) vertical_overlap_end = min(line_end, lines[i].polygon.y_end) vertical_overlap = max(0, vertical_overlap_end - vertical_overlap_start) vertical_overlap_pct = vertical_overlap / max(1, lines[i].polygon.height) if all([ # Overlaps enough intersection_pct >= self.min_merge_pct, # Within same line vertical_overlap_pct > self.vertical_overlap_pct_threshold, # doesn't overlap with anything else merge_intersection / total_intersection > self.intersection_pct_threshold ]): merge.append(i) else: merges.append(merge) merge = [] if merge: merges.append(merge) merges = [m for m in merges if len(m) > 1] merged = set() for merge in merges: merge = [m for m in merge if m not in merged] if len(merge) < 2: continue line: Line = lines[merge[0]] merged.add(merge[0]) for idx in merge[1:]: other_line: Line = lines[idx] line.merge(other_line) block.structure.remove(other_line.id) other_line.removed = True # Mark line as removed merged.add(idx) # It is probably math if we are merging provider lines like this if not line.formats: line.formats = ["math"] elif "math" not in line.formats: line.formats.append("math") def __call__(self, document: Document): # Merging lines only needed for inline math if not self.use_llm: return for page in document.pages: for block in page.contained_blocks(document, self.block_types): if block.structure is None: continue if not len(block.structure) >= 2: # Skip single lines continue lines = block.contained_blocks(document, (BlockTypes.Line,)) self.merge_lines(lines, block) ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python import tempfile from typing import Dict, Type from PIL import Image, ImageDraw import datasets import pytest from marker.builders.document import DocumentBuilder from marker.builders.layout import LayoutBuilder from marker.builders.line import LineBuilder from marker.builders.ocr import OcrBuilder from marker.builders.structure import StructureBuilder from marker.converters.pdf import PdfConverter from marker.models import create_model_dict from marker.providers.registry import provider_from_filepath from marker.renderers.chunk import ChunkRenderer from marker.renderers.html import HTMLRenderer from marker.schema import BlockTypes from marker.schema.blocks import Block from marker.renderers.markdown import MarkdownRenderer from marker.renderers.json import JSONRenderer from marker.schema.registry import register_block_class from marker.util import classes_to_strings, strings_to_classes @pytest.fixture(scope="session") def model_dict(): model_dict = create_model_dict() yield model_dict del model_dict @pytest.fixture(scope="session") def layout_model(model_dict): yield model_dict["layout_model"] @pytest.fixture(scope="session") def detection_model(model_dict): yield model_dict["detection_model"] @pytest.fixture(scope="session") def recognition_model(model_dict): yield model_dict["recognition_model"] @pytest.fixture(scope="session") def table_rec_model(model_dict): yield model_dict["table_rec_model"] @pytest.fixture(scope="session") def ocr_error_model(model_dict): yield model_dict["ocr_error_model"] @pytest.fixture(scope="function") def config(request): config_mark = request.node.get_closest_marker("config") config = config_mark.args[0] if config_mark else {} override_map: Dict[BlockTypes, Type[Block]] = config.get("override_map", {}) for block_type, override_block_type in override_map.items(): register_block_class(block_type, override_block_type) return config @pytest.fixture(scope="session") def pdf_dataset(): return datasets.load_dataset("datalab-to/pdfs", split="train") @pytest.fixture(scope="function") def temp_doc(request, pdf_dataset): filename_mark = request.node.get_closest_marker("filename") filename = filename_mark.args[0] if filename_mark else "adversarial.pdf" idx = pdf_dataset["filename"].index(filename) suffix = filename.split(".")[-1] temp_pdf = tempfile.NamedTemporaryFile(suffix=f".{suffix}") temp_pdf.write(pdf_dataset["pdf"][idx]) temp_pdf.flush() yield temp_pdf @pytest.fixture(scope="function") def doc_provider(request, config, temp_doc): provider_cls = provider_from_filepath(temp_doc.name) yield provider_cls(temp_doc.name, config) @pytest.fixture(scope="function") def pdf_document( request, config, doc_provider, layout_model, ocr_error_model, recognition_model, detection_model, ): layout_builder = LayoutBuilder(layout_model, config) line_builder = LineBuilder(detection_model, ocr_error_model, config) ocr_builder = OcrBuilder(recognition_model, config) builder = DocumentBuilder(config) structure_builder = StructureBuilder(config) document = builder(doc_provider, layout_builder, line_builder, ocr_builder) structure_builder(document) yield document @pytest.fixture(scope="function") def pdf_converter(request, config, model_dict, renderer, llm_service): if llm_service: llm_service = classes_to_strings([llm_service])[0] yield PdfConverter( artifact_dict=model_dict, processor_list=None, renderer=classes_to_strings([renderer])[0], config=config, llm_service=llm_service, ) @pytest.fixture(scope="function") def renderer(request, config): if request.node.get_closest_marker("output_format"): output_format = request.node.get_closest_marker("output_format").args[0] if output_format == "markdown": return MarkdownRenderer elif output_format == "json": return JSONRenderer elif output_format == "html": return HTMLRenderer elif output_format == "chunks": return ChunkRenderer else: raise ValueError(f"Unknown output format: {output_format}") else: return MarkdownRenderer @pytest.fixture(scope="function") def llm_service(request, config): llm_service = config.get("llm_service") if not llm_service: yield None else: yield strings_to_classes([llm_service])[0] @pytest.fixture(scope="function") def temp_image(): img = Image.new("RGB", (512, 512), color="white") draw = ImageDraw.Draw(img) draw.text((200, 200), "Hello, World!", fill="black", font_size=36) with tempfile.NamedTemporaryFile(suffix=".png") as f: img.save(f.name) f.flush() yield f ``` -------------------------------------------------------------------------------- /marker/extractors/document.py: -------------------------------------------------------------------------------- ```python import json from pydantic import BaseModel from typing import Annotated, Optional, List from marker.extractors import BaseExtractor from marker.extractors.page import PageExtractionSchema from marker.logger import get_logger logger = get_logger() class DocumentExtractionSchema(BaseModel): analysis: str document_json: str class DocumentExtractor(BaseExtractor): """ An extractor that combines data from across all pages. """ page_schema: Annotated[ str, "The JSON schema to be extracted from the page.", ] = "" page_extraction_prompt = """You are an expert document analyst who reads documents and pulls data out in JSON format. You will receive your detailed notes from all the pages of a document, and a JSON schema that we want to extract from the document. Your task is to extract all the information properly into the JSON schema. Some notes: - The schema may contain a single object to extract from the entire document, or an array of objects. - The schema may contain nested objects, arrays, and other complex structures. Some guidelines: - Some entities will span multiple pages, so make sure to consult your notes thoroughly. - In the case of potential conflicting values, pull out the values you have the most confidence in, from your notes. - If you cannot find a value for a field, leave it blank in the JSON. **Instructions:** 1. Analyze your provided notes. 2. Analyze the JSON schema. 3. Write a detailed analysis of the notes, and the associated values in the schema. Make sure to reference which page each piece of information comes from. 4. Write the output in the JSON schema format, ensuring all required fields are filled out. Output only the json data, without any additional text or formatting. **Example:** Input: Detailed Notes Page 0 On this page, I see a table with car makes and sales. The makes are Honda and Toyota, with sales of 100 and 200 respectively. The color is not present in the table, so I will leave it blank in the JSON. That information may be present on another page. Some JSON snippets I may find useful later are: ```json { "make": "Honda", "sales": 100, } ``` ```json { "make": "Toyota", "sales": 200, } ``` Honda is the first row in the table, and Toyota is the second row. Make is the first column, and sales is the second. Page 1 I see a table that contains 2 rows, and has a color header. The first row has the color red, and the second row has the color blue. Here are some useful snippets: Schema ```json {'$defs': {'Cars': {'properties': {'make': {'title': 'Make', 'type': 'string'}, 'sales': {'title': 'Sales', 'type': 'integer'}, 'color': {'title': 'Color', 'type': 'string'}}, 'required': ['make', 'sales', 'color'], 'title': 'Cars', 'type': 'object'}}, 'properties': {'cars': {'items': {'$ref': '#/$defs/Cars'}, 'title': 'Cars', 'type': 'array'}}, 'required': ['cars'], 'title': 'CarsList', 'type': 'object'} ``` Output: Analysis: From the notes, it looks like the information I need is in a table that spans 2 pages. The first page has the makes and sales, while the second page has the colors. I will combine this information into the JSON schema. JSON { "cars": [ { "make": "Honda", "sales": 100, "color": "red" }, { "make": "Toyota", "sales": 200, "color": "blue" } ] } **Input:** Detailed Notes {{document_notes}} Schema ```json {{schema}} ``` """ def assemble_document_notes(self, page_notes: List[PageExtractionSchema]) -> str: notes = "" for i, page_schema in enumerate(page_notes): if not page_notes: continue notes += f"Page {i + 1}\n{page_schema.detailed_notes}\n\n" return notes.strip() def __call__( self, page_notes: List[PageExtractionSchema], **kwargs, ) -> Optional[DocumentExtractionSchema]: if not self.page_schema: raise ValueError( "Page schema must be defined for structured extraction to work." ) prompt = self.page_extraction_prompt.replace( "{{document_notes}}", self.assemble_document_notes(page_notes) ).replace("{{schema}}", json.dumps(self.page_schema)) response = self.llm_service(prompt, None, None, DocumentExtractionSchema) logger.debug(f"Document extraction response: {response}") if not response or any( [ key not in response for key in [ "analysis", "document_json", ] ] ): return None json_data = response["document_json"].strip().lstrip("```json").rstrip("```") return DocumentExtractionSchema( analysis=response["analysis"], document_json=json_data ) ``` -------------------------------------------------------------------------------- /marker/scripts/server.py: -------------------------------------------------------------------------------- ```python import traceback import click import os from pydantic import BaseModel, Field from starlette.responses import HTMLResponse from marker.config.parser import ConfigParser from marker.output import text_from_rendered import base64 from contextlib import asynccontextmanager from typing import Optional, Annotated import io from fastapi import FastAPI, Form, File, UploadFile from marker.converters.pdf import PdfConverter from marker.models import create_model_dict from marker.settings import settings app_data = {} UPLOAD_DIRECTORY = "./uploads" os.makedirs(UPLOAD_DIRECTORY, exist_ok=True) @asynccontextmanager async def lifespan(app: FastAPI): app_data["models"] = create_model_dict() yield if "models" in app_data: del app_data["models"] app = FastAPI(lifespan=lifespan) @app.get("/") async def root(): return HTMLResponse( """ <h1>Marker API</h1> <ul> <li><a href="/docs">API Documentation</a></li> <li><a href="/marker">Run marker (post request only)</a></li> </ul> """ ) class CommonParams(BaseModel): filepath: Annotated[ Optional[str], Field(description="The path to the PDF file to convert.") ] page_range: Annotated[ Optional[str], Field( description="Page range to convert, specify comma separated page numbers or ranges. Example: 0,5-10,20", example=None, ), ] = None force_ocr: Annotated[ bool, Field( description="Force OCR on all pages of the PDF. Defaults to False. This can lead to worse results if you have good text in your PDFs (which is true in most cases)." ), ] = False paginate_output: Annotated[ bool, Field( description="Whether to paginate the output. Defaults to False. If set to True, each page of the output will be separated by a horizontal rule that contains the page number (2 newlines, {PAGE_NUMBER}, 48 - characters, 2 newlines)." ), ] = False output_format: Annotated[ str, Field( description="The format to output the text in. Can be 'markdown', 'json', or 'html'. Defaults to 'markdown'." ), ] = "markdown" async def _convert_pdf(params: CommonParams): assert params.output_format in ["markdown", "json", "html", "chunks"], ( "Invalid output format" ) try: options = params.model_dump() config_parser = ConfigParser(options) config_dict = config_parser.generate_config_dict() config_dict["pdftext_workers"] = 1 converter_cls = PdfConverter converter = converter_cls( config=config_dict, artifact_dict=app_data["models"], processor_list=config_parser.get_processors(), renderer=config_parser.get_renderer(), llm_service=config_parser.get_llm_service(), ) rendered = converter(params.filepath) text, _, images = text_from_rendered(rendered) metadata = rendered.metadata except Exception as e: traceback.print_exc() return { "success": False, "error": str(e), } encoded = {} for k, v in images.items(): byte_stream = io.BytesIO() v.save(byte_stream, format=settings.OUTPUT_IMAGE_FORMAT) encoded[k] = base64.b64encode(byte_stream.getvalue()).decode( settings.OUTPUT_ENCODING ) return { "format": params.output_format, "output": text, "images": encoded, "metadata": metadata, "success": True, } @app.post("/marker") async def convert_pdf(params: CommonParams): return await _convert_pdf(params) @app.post("/marker/upload") async def convert_pdf_upload( page_range: Optional[str] = Form(default=None), force_ocr: Optional[bool] = Form(default=False), paginate_output: Optional[bool] = Form(default=False), output_format: Optional[str] = Form(default="markdown"), file: UploadFile = File( ..., description="The PDF file to convert.", media_type="application/pdf" ), ): upload_path = os.path.join(UPLOAD_DIRECTORY, file.filename) with open(upload_path, "wb+") as upload_file: file_contents = await file.read() upload_file.write(file_contents) params = CommonParams( filepath=upload_path, page_range=page_range, force_ocr=force_ocr, paginate_output=paginate_output, output_format=output_format, ) results = await _convert_pdf(params) os.remove(upload_path) return results @click.command() @click.option("--port", type=int, default=8000, help="Port to run the server on") @click.option("--host", type=str, default="127.0.0.1", help="Host to run the server on") def server_cli(port: int, host: str): import uvicorn # Run the server uvicorn.run( app, host=host, port=port, ) ``` -------------------------------------------------------------------------------- /marker/processors/equation.py: -------------------------------------------------------------------------------- ```python from typing import Annotated, List, Tuple from PIL import Image import re from bs4 import BeautifulSoup from ftfy import fix_text, TextFixerConfig from surya.recognition import RecognitionPredictor, OCRResult from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.document import Document from marker.settings import settings MATH_TAG_PATTERN = re.compile(r"<math[^>]*>(.*?)</math>") class EquationProcessor(BaseProcessor): """ A processor for recognizing equations in the document. """ block_types: Annotated[ Tuple[BlockTypes], "The block types to process.", ] = (BlockTypes.Equation,) model_max_length: Annotated[ int, "The maximum number of tokens to allow for the Recognition model.", ] = 1024 equation_batch_size: Annotated[ int, "The batch size to use for the recognition model while processing equations.", "Default is None, which will use the default batch size for the model.", ] = None disable_tqdm: Annotated[ bool, "Whether to disable the tqdm progress bar.", ] = False drop_repeated_text: Annotated[bool, "Drop repeated text in OCR results."] = False def __init__(self, recognition_model: RecognitionPredictor, config=None): super().__init__(config) self.recognition_model = recognition_model def get_batch_size(self): # Set to 1/4th of OCR batch size due to sequence length with tiling if self.equation_batch_size is not None: return self.equation_batch_size elif settings.TORCH_DEVICE_MODEL == "cuda": return 32 elif settings.TORCH_DEVICE_MODEL == "mps": return 6 return 6 def __call__(self, document: Document): images = [] equation_boxes = [] equation_block_ids = [] total_equation_blocks = 0 for page in document.pages: page_image = page.get_image(highres=True) page_size = page.polygon.width, page.polygon.height image_size = page_image.size page_equation_boxes = [] page_equation_block_ids = [] equation_blocks = page.contained_blocks(document, self.block_types) for block in equation_blocks: page_equation_boxes.append( block.polygon.rescale(page_size, image_size).bbox ) page_equation_block_ids.append(block.id) total_equation_blocks += 1 images.append(page_image) equation_boxes.append(page_equation_boxes) equation_block_ids.append(page_equation_block_ids) if total_equation_blocks == 0: return predictions = self.get_latex_batched(images, equation_boxes) for page_predictions, page_equation_block_ids in zip( predictions, equation_block_ids ): assert len(page_predictions) == len(page_equation_block_ids), ( "Every equation block should have a corresponding prediction" ) for block_prediction, block_id in zip( page_predictions, page_equation_block_ids ): block = document.get_block(block_id) block.html = self.fix_latex(block_prediction) def fix_latex(self, math_html: str): math_html = math_html.strip() soup = BeautifulSoup(math_html, "html.parser") opening_math_tag = soup.find("math") # No math block found if not opening_math_tag: return "" # Force block format opening_math_tag.attrs["display"] = "block" fixed_math_html = str(soup) # Sometimes model outputs newlines at the beginning/end of tags fixed_math_html = re.sub( r"^<math display=\"block\">\\n(?![a-zA-Z])", '<math display="block">', fixed_math_html, ) fixed_math_html = re.sub(r"\\n</math>$", "</math>", fixed_math_html) fixed_math_html = re.sub(r"<br>", "", fixed_math_html) fixed_math_html = fix_text( fixed_math_html, config=TextFixerConfig(unescape_html=True) ) return fixed_math_html def get_latex_batched( self, page_images: List[Image.Image], bboxes: List[List[List[float]]], ): self.recognition_model.disable_tqdm = self.disable_tqdm predictions: List[OCRResult] = self.recognition_model( images=page_images, bboxes=bboxes, task_names=["ocr_with_boxes"] * len(page_images), recognition_batch_size=self.get_batch_size(), sort_lines=False, drop_repeated_text=self.drop_repeated_text, max_tokens=2048, max_sliding_window=2148, ) equation_predictions = [ [line.text.strip() for line in page_prediction.text_lines] for page_prediction in predictions ] return equation_predictions ``` -------------------------------------------------------------------------------- /marker/processors/llm/llm_equation.py: -------------------------------------------------------------------------------- ```python from pydantic import BaseModel from marker.processors.llm import BaseLLMSimpleBlockProcessor, PromptData, BlockData from marker.schema import BlockTypes from marker.schema.document import Document from typing import Annotated, List class LLMEquationProcessor(BaseLLMSimpleBlockProcessor): block_types = (BlockTypes.Equation,) min_equation_height: Annotated[ float, "The minimum ratio between equation height and page height to consider for processing.", ] = 0.06 image_expansion_ratio: Annotated[ float, "The ratio to expand the image by when cropping.", ] = 0.05 # Equations sometimes get bboxes that are too tight redo_inline_math: Annotated[ bool, "Whether to redo inline math blocks.", ] = False equation_latex_prompt: Annotated[ str, "The prompt to use for generating LaTeX from equations.", "Default is a string containing the Gemini prompt." ] = r"""You're an expert mathematician who is good at writing LaTeX code and html for equations. You'll receive an image of a math block, along with the text extracted from the block. It may contain one or more equations. Your job is to write html that represents the content of the image, with the equations in LaTeX format. Some guidelines: - Output valid html, where all the equations can render properly. - Use <math display="block"> as a block equation delimiter and <math> for inline equations. Do not use $ or $$ as delimiters. - Keep the LaTeX code inside the math tags simple, concise, and KaTeX compatible. - Enclose all equations in the correct math tags. Use multiple math tags inside the html to represent multiple equations. - Only use the html tags math, i, b, p, and br. - Make sure to include all the equations in the image in the html output. - Make sure to include other text in the image in the correct positions along with the equations. **Instructions:** 1. Carefully examine the provided image. 2. Analyze the existing html, which may include LaTeX code. 3. Write a short analysis of how the html should be corrected to represent the image. 4. If the html and LaTeX are correct, write "No corrections needed." 5. If the html and LaTeX are incorrect, generate the corrected html. 6. Output only the analysis, then the corrected html or "No corrections needed." **Example:** Input: ```html The following equation illustrates the Pythagorean theorem: x2 + y2 = z2 And this equation is a bit more complex: (ab * x5 + x2 + 2 * x + 123)/t ``` Output: analysis: The equations are not formatted as LaTeX, or enclosed in math tags. ```html <p>The following equation illustrates the Pythagorean theorem:</p> <math display="block">x^{2} + y^{2} = z^{2}</math> <p>And this equation is a bit more complex, and contains <math>ab \cdot x^{5}</math>:</p> <math display="block">\frac{ab \cdot x^{5} + x^{2} + 2 \cdot x + 123}{t}</math> ``` **Input:** ```html {equation} ``` """ def inference_blocks(self, document: Document) -> List[BlockData]: blocks = super().inference_blocks(document) out_blocks = [] for block_data in blocks: block = block_data["block"] page = block_data["page"] # If we redo inline math, we redo all equations if all([ block.polygon.height / page.polygon.height < self.min_equation_height, not self.redo_inline_math ]): continue out_blocks.append(block_data) return out_blocks def block_prompts(self, document: Document) -> List[PromptData]: prompt_data = [] for block_data in self.inference_blocks(document): block = block_data["block"] text = block.html if block.html else block.raw_text(document) prompt = self.equation_latex_prompt.replace("{equation}", text) image = self.extract_image(document, block) prompt_data.append({ "prompt": prompt, "image": image, "block": block, "schema": EquationSchema, "page": block_data["page"] }) return prompt_data def rewrite_block(self, response: dict, prompt_data: PromptData, document: Document): block = prompt_data["block"] text = block.html if block.html else block.raw_text(document) if not response or "corrected_equation" not in response: block.update_metadata(llm_error_count=1) return html_equation = response["corrected_equation"] if "no corrections needed" in html_equation.lower(): return balanced_tags = html_equation.count("<math") == html_equation.count("</math>") if not all([ html_equation, balanced_tags, len(html_equation) > len(text) * .3, ]): block.update_metadata(llm_error_count=1) return block.html = html_equation class EquationSchema(BaseModel): analysis: str corrected_equation: str ``` -------------------------------------------------------------------------------- /marker/builders/structure.py: -------------------------------------------------------------------------------- ```python from typing import Annotated from marker.builders import BaseBuilder from marker.schema import BlockTypes from marker.schema.blocks import Text from marker.schema.document import Document from marker.schema.groups import ListGroup from marker.schema.groups.page import PageGroup from marker.schema.registry import get_block_class class StructureBuilder(BaseBuilder): """ A builder for grouping blocks together based on their structure. """ gap_threshold: Annotated[ float, "The minimum gap between blocks to consider them part of the same group.", ] = 0.05 list_gap_threshold: Annotated[ float, "The minimum gap between list items to consider them part of the same group.", ] = 0.1 def __init__(self, config=None): super().__init__(config) def __call__(self, document: Document): for page in document.pages: self.group_caption_blocks(page) self.group_lists(page) self.unmark_lists(page) def group_caption_blocks(self, page: PageGroup): gap_threshold_px = self.gap_threshold * page.polygon.height static_page_structure = page.structure.copy() remove_ids = list() for i, block_id in enumerate(static_page_structure): block = page.get_block(block_id) if block.block_type not in [BlockTypes.Table, BlockTypes.Figure, BlockTypes.Picture]: continue if block.id in remove_ids: continue block_structure = [block_id] selected_polygons = [block.polygon] caption_types = [BlockTypes.Caption, BlockTypes.Footnote] prev_block = page.get_prev_block(block) next_block = page.get_next_block(block) if prev_block and \ prev_block.block_type in caption_types and \ prev_block.polygon.minimum_gap(block.polygon) < gap_threshold_px and \ prev_block.id not in remove_ids: block_structure.insert(0, prev_block.id) selected_polygons.append(prev_block.polygon) if next_block and \ next_block.block_type in caption_types and \ next_block.polygon.minimum_gap(block.polygon) < gap_threshold_px: block_structure.append(next_block.id) selected_polygons.append(next_block.polygon) if len(block_structure) > 1: # Create a merged block new_block_cls = get_block_class(BlockTypes[block.block_type.name + "Group"]) new_polygon = block.polygon.merge(selected_polygons) group_block = page.add_block(new_block_cls, new_polygon) group_block.structure = block_structure # Update the structure of the page to reflect the new block page.update_structure_item(block_id, group_block.id) remove_ids.extend(block_structure) page.remove_structure_items(remove_ids) def group_lists(self, page: PageGroup): gap_threshold_px = self.list_gap_threshold * page.polygon.height static_page_structure = page.structure.copy() remove_ids = list() for i, block_id in enumerate(static_page_structure): block = page.get_block(block_id) if block.block_type not in [BlockTypes.ListItem]: continue if block.id in remove_ids: continue block_structure = [block_id] selected_polygons = [block.polygon] for j, next_block_id in enumerate(page.structure[i + 1:]): next_block = page.get_block(next_block_id) if all([ next_block.block_type == BlockTypes.ListItem, next_block.polygon.minimum_gap(selected_polygons[-1]) < gap_threshold_px ]): block_structure.append(next_block_id) selected_polygons.append(next_block.polygon) else: break if len(block_structure) > 1: new_polygon = block.polygon.merge(selected_polygons) group_block = page.add_block(ListGroup, new_polygon) group_block.structure = block_structure # Update the structure of the page to reflect the new block page.update_structure_item(block_id, group_block.id) remove_ids.extend(block_structure) page.remove_structure_items(remove_ids) def unmark_lists(self, page: PageGroup): # If lists aren't grouped, unmark them as list items for block_id in page.structure: block = page.get_block(block_id) if block.block_type == BlockTypes.ListItem: generated_block = Text( polygon=block.polygon, page_id=block.page_id, structure=block.structure, ) page.replace_block(block, generated_block) ``` -------------------------------------------------------------------------------- /marker/services/gemini.py: -------------------------------------------------------------------------------- ```python import json import time import traceback from io import BytesIO from typing import List, Annotated import PIL from google import genai from google.genai import types from google.genai.errors import APIError from marker.logger import get_logger from pydantic import BaseModel from marker.schema.blocks import Block from marker.services import BaseService logger = get_logger() class BaseGeminiService(BaseService): gemini_model_name: Annotated[ str, "The name of the Google model to use for the service." ] = "gemini-2.0-flash" thinking_budget: Annotated[ int, "The thinking token budget to use for the service." ] = None def img_to_bytes(self, img: PIL.Image.Image): image_bytes = BytesIO() img.save(image_bytes, format="WEBP") return image_bytes.getvalue() def get_google_client(self, timeout: int): raise NotImplementedError def process_images(self, images): image_parts = [ types.Part.from_bytes(data=self.img_to_bytes(img), mime_type="image/webp") for img in images ] return image_parts def __call__( self, prompt: str, image: PIL.Image.Image | List[PIL.Image.Image] | None, block: Block | None, response_schema: type[BaseModel], max_retries: int | None = None, timeout: int | None = None, ): if max_retries is None: max_retries = self.max_retries if timeout is None: timeout = self.timeout client = self.get_google_client(timeout=timeout) image_parts = self.format_image_for_llm(image) total_tries = max_retries + 1 temperature = 0 for tries in range(1, total_tries + 1): config = { "temperature": temperature, "response_schema": response_schema, "response_mime_type": "application/json", } if self.max_output_tokens: config["max_output_tokens"] = self.max_output_tokens if self.thinking_budget is not None: # For gemini models, we can optionally set a thinking budget in the config config["thinking_config"] = types.ThinkingConfig( thinking_budget=self.thinking_budget ) try: responses = client.models.generate_content( model=self.gemini_model_name, contents=image_parts + [ prompt ], # According to gemini docs, it performs better if the image is the first element config=config, ) output = responses.candidates[0].content.parts[0].text total_tokens = responses.usage_metadata.total_token_count if block: block.update_metadata( llm_tokens_used=total_tokens, llm_request_count=1 ) return json.loads(output) except APIError as e: if e.code in [429, 443, 503]: # Rate limit exceeded if tries == total_tries: # Last attempt failed. Give up logger.error( f"APIError: {e}. Max retries reached. Giving up. (Attempt {tries}/{total_tries})", ) break else: wait_time = tries * self.retry_wait_time logger.warning( f"APIError: {e}. Retrying in {wait_time} seconds... (Attempt {tries}/{total_tries})", ) time.sleep(wait_time) else: logger.error(f"APIError: {e}") break except json.JSONDecodeError as e: temperature = 0.2 # Increase temperature slightly to try and get a different respons # The response was not valid JSON if tries == total_tries: # Last attempt failed. Give up logger.error( f"JSONDecodeError: {e}. Max retries reached. Giving up. (Attempt {tries}/{total_tries})", ) break else: logger.warning( f"JSONDecodeError: {e}. Retrying... (Attempt {tries}/{total_tries})", ) except Exception as e: logger.error(f"Exception: {e}") traceback.print_exc() break return {} class GoogleGeminiService(BaseGeminiService): gemini_api_key: Annotated[str, "The Google API key to use for the service."] = None def get_google_client(self, timeout: int): return genai.Client( api_key=self.gemini_api_key, http_options={"timeout": timeout * 1000}, # Convert to milliseconds ) ``` -------------------------------------------------------------------------------- /marker/processors/line_numbers.py: -------------------------------------------------------------------------------- ```python from typing import Annotated from marker.processors import BaseProcessor from marker.schema import BlockTypes from marker.schema.document import Document class LineNumbersProcessor(BaseProcessor): """ A processor for ignoring line numbers. """ block_types = (BlockTypes.Text, BlockTypes.TextInlineMath) strip_numbers_threshold: Annotated[ float, "The fraction of lines or tokens in a block that must be numeric to consider them as line numbers.", ] = 0.6 min_lines_in_block: Annotated[ int, "The minimum number of lines required in a block for it to be considered during processing.", "Ensures that small blocks are ignored as they are unlikely to contain meaningful line numbers.", ] = 4 min_line_length: Annotated[ int, "The minimum length of a line (in characters) to consider it significant when checking for", "numeric prefixes or suffixes. Prevents false positives for short lines.", ] = 10 min_line_number_span_ratio: Annotated[ float, "The minimum ratio of detected line number spans to total lines required to treat them as line numbers.", ] = .6 def __init__(self, config): super().__init__(config) def __call__(self, document: Document): self.ignore_line_number_spans(document) self.ignore_line_starts_ends(document) self.ignore_line_number_blocks(document) def ignore_line_number_spans(self, document: Document): for page in document.pages: line_count = 0 line_number_spans = [] for block in page.contained_blocks(document, (BlockTypes.Line,)): if block.structure is None: continue line_count += 1 leftmost_span = None for span in block.contained_blocks(document, (BlockTypes.Span,)): if leftmost_span is None or span.polygon.x_start < leftmost_span.polygon.x_start: leftmost_span = span if leftmost_span is not None and leftmost_span.text.strip().isnumeric(): line_number_spans.append(leftmost_span) if line_count > 0 and len(line_number_spans) / line_count > self.min_line_number_span_ratio: for span in line_number_spans: span.ignore_for_output = True def ignore_line_number_blocks(self, document: Document): for page in document.pages: for block in page.contained_blocks(document, self.block_types): raw_text = block.raw_text(document) tokens = raw_text.strip().split() if len(tokens) < 4: continue tokens_are_numbers = [token.isdigit() for token in tokens] if all([ sum(tokens_are_numbers) / len(tokens) > self.strip_numbers_threshold, block.polygon.height > block.polygon.width # Ensure block is taller than it is wide, like vertical page numbers ]): block.ignore_for_output = True def ignore_line_starts_ends(self, document: Document): for page in document.pages: for block in page.contained_blocks(document, self.block_types): if block.structure is None: continue all_lines = block.structure_blocks(document) if len(all_lines) < self.min_lines_in_block: continue starts_with_number = [] ends_with_number = [] for line in all_lines: spans = line.structure_blocks(document) if len(spans) < 2: starts_with_number.append(False) ends_with_number.append(False) continue raw_text = line.raw_text(document) starts = all([ spans[0].text.strip().isdigit(), len(raw_text) - len(spans[0].text.strip()) > self.min_line_length ]) ends = all([ spans[-1].text.strip().isdigit(), len(raw_text) - len(spans[-1].text.strip()) > self.min_line_length ]) starts_with_number.append(starts) ends_with_number.append(ends) if sum(starts_with_number) / len(starts_with_number) > self.strip_numbers_threshold: for starts, line in zip(starts_with_number, all_lines): if starts: span = page.get_block(line.structure[0]) span.ignore_for_output = True if sum(ends_with_number) / len(ends_with_number) > self.strip_numbers_threshold: for ends, line in zip(ends_with_number, all_lines): if ends: span = page.get_block(line.structure[-1]) span.ignore_for_output = True ``` -------------------------------------------------------------------------------- /marker/scripts/streamlit_app.py: -------------------------------------------------------------------------------- ```python import os from marker.scripts.common import ( load_models, parse_args, img_to_html, get_page_image, page_count, ) os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" os.environ["IN_STREAMLIT"] = "true" from marker.settings import settings from streamlit.runtime.uploaded_file_manager import UploadedFile import re import tempfile from typing import Any, Dict import streamlit as st from PIL import Image from marker.converters.pdf import PdfConverter from marker.config.parser import ConfigParser from marker.output import text_from_rendered def convert_pdf(fname: str, config_parser: ConfigParser) -> (str, Dict[str, Any], dict): config_dict = config_parser.generate_config_dict() config_dict["pdftext_workers"] = 1 converter_cls = PdfConverter converter = converter_cls( config=config_dict, artifact_dict=model_dict, processor_list=config_parser.get_processors(), renderer=config_parser.get_renderer(), llm_service=config_parser.get_llm_service(), ) return converter(fname) def markdown_insert_images(markdown, images): image_tags = re.findall( r'(!\[(?P<image_title>[^\]]*)\]\((?P<image_path>[^\)"\s]+)\s*([^\)]*)\))', markdown, ) for image in image_tags: image_markdown = image[0] image_alt = image[1] image_path = image[2] if image_path in images: markdown = markdown.replace( image_markdown, img_to_html(images[image_path], image_alt) ) return markdown st.set_page_config(layout="wide") col1, col2 = st.columns([0.5, 0.5]) model_dict = load_models() cli_options = parse_args() st.markdown(""" # Marker Demo This app will let you try marker, a PDF or image -> Markdown, HTML, JSON converter. It works with any language, and extracts images, tables, equations, etc. Find the project [here](https://github.com/VikParuchuri/marker). """) in_file: UploadedFile = st.sidebar.file_uploader( "PDF, document, or image file:", type=["pdf", "png", "jpg", "jpeg", "gif", "pptx", "docx", "xlsx", "html", "epub"], ) if in_file is None: st.stop() filetype = in_file.type with col1: page_count = page_count(in_file) page_number = st.number_input( f"Page number out of {page_count}:", min_value=0, value=0, max_value=page_count ) pil_image = get_page_image(in_file, page_number) st.image(pil_image, use_container_width=True) page_range = st.sidebar.text_input( "Page range to parse, comma separated like 0,5-10,20", value=f"{page_number}-{page_number}", ) output_format = st.sidebar.selectbox( "Output format", ["markdown", "json", "html", "chunks"], index=0 ) run_marker = st.sidebar.button("Run Marker") use_llm = st.sidebar.checkbox( "Use LLM", help="Use LLM for higher quality processing", value=False ) force_ocr = st.sidebar.checkbox("Force OCR", help="Force OCR on all pages", value=False) strip_existing_ocr = st.sidebar.checkbox( "Strip existing OCR", help="Strip existing OCR text from the PDF and re-OCR.", value=False, ) debug = st.sidebar.checkbox("Debug", help="Show debug information", value=False) disable_ocr_math = st.sidebar.checkbox( "Disable math", help="Disable math in OCR output - no inline math", value=False, ) if not run_marker: st.stop() # Run Marker with tempfile.TemporaryDirectory() as tmp_dir: temp_pdf = os.path.join(tmp_dir, "temp.pdf") with open(temp_pdf, "wb") as f: f.write(in_file.getvalue()) cli_options.update( { "output_format": output_format, "page_range": page_range, "force_ocr": force_ocr, "debug": debug, "output_dir": settings.DEBUG_DATA_FOLDER if debug else None, "use_llm": use_llm, "strip_existing_ocr": strip_existing_ocr, "disable_ocr_math": disable_ocr_math, } ) config_parser = ConfigParser(cli_options) rendered = convert_pdf(temp_pdf, config_parser) page_range = config_parser.generate_config_dict()["page_range"] first_page = page_range[0] if page_range else 0 text, ext, images = text_from_rendered(rendered) with col2: if output_format == "markdown": text = markdown_insert_images(text, images) st.markdown(text, unsafe_allow_html=True) elif output_format == "json": st.json(text) elif output_format == "html": st.html(text) elif output_format == "chunks": st.json(text) if debug: with col1: debug_data_path = rendered.metadata.get("debug_data_path") if debug_data_path: pdf_image_path = os.path.join(debug_data_path, f"pdf_page_{first_page}.png") img = Image.open(pdf_image_path) st.image(img, caption="PDF debug image", use_container_width=True) layout_image_path = os.path.join( debug_data_path, f"layout_page_{first_page}.png" ) img = Image.open(layout_image_path) st.image(img, caption="Layout debug image", use_container_width=True) st.write("Raw output:") st.code(text, language=output_format) ``` -------------------------------------------------------------------------------- /benchmarks/throughput/main.py: -------------------------------------------------------------------------------- ```python import os import tempfile import time from multiprocessing import get_context from concurrent.futures import ProcessPoolExecutor import torch import click import pypdfium2 as pdfium from tqdm import tqdm import datasets def get_next_pdf(ds: datasets.Dataset, i: int): while True: pdf = ds[i]["pdf"] filename = ds[i]["filename"] if pdf and filename.endswith(".pdf"): return pdf, filename, i + 1 i += 1 if i >= len(ds): i = 0 def single_batch( batch_size: int, num_threads: int, force_ocr: bool, quantize: bool, compile: bool, worker_id: int, chunksize: int = 100, ): if quantize: os.environ["RECOGNITION_MODEL_QUANTIZE"] = "true" if compile: os.environ["COMPILE_ALL"] = "true" for item in [ "DETECTOR_POSTPROCESSING_CPU_WORKERS", "OPENBLAS_NUM_THREADS", "PDFTEXT_CPU_WORKERS", "OMP_NUM_THREADS", ]: os.environ[item] = f"{num_threads}" torch.set_num_threads(num_threads) from marker.converters.pdf import PdfConverter from marker.models import create_model_dict from marker.output import text_from_rendered ds = datasets.load_dataset("datalab-to/pdfs", split="train") model_dict = create_model_dict() torch.cuda.reset_peak_memory_stats() times = [] i = 0 pages = 0 chars = 0 min_time = time.time() for _ in range(batch_size): pdf, fname, i = get_next_pdf(ds, i) print(f"Inferencing {fname} on worker {worker_id}...") pdf_doc = pdfium.PdfDocument(pdf) page_count = len(pdf_doc) pdf_doc.close() pages += page_count with tempfile.NamedTemporaryFile(suffix=".pdf") as f: f.write(pdf) f.flush() page_range_chunks = list(range(0, page_count, chunksize)) for chunk_start in page_range_chunks: chunk_end = min(chunk_start + chunksize, page_count) page_range = list(range(chunk_start, chunk_end)) block_converter = PdfConverter( artifact_dict=model_dict, config={ "disable_tqdm": worker_id > 0, "page_range": page_range, "force_ocr": force_ocr, }, ) start = time.time() rendered = block_converter(f.name) markdown, _, _ = text_from_rendered(rendered) chars += len(markdown) total = time.time() - start times.append(total) max_gpu_vram = torch.cuda.max_memory_reserved() / 1024**3 max_time = time.time() return sum(times), min_time, max_time, max_gpu_vram, pages, chars @click.command(help="Benchmark PDF to MD conversion throughput.") @click.option("--workers", default=1, help="Number of workers to use.") @click.option("--batch_size", default=1, help="Batch size for inference.") @click.option("--force_ocr", is_flag=True, help="Force OCR on all pages.") @click.option("--quantize", is_flag=True, help="Use quantized model.") @click.option("--compile", is_flag=True, help="Use compiled model.") def main( workers: int, batch_size: int, force_ocr: bool, quantize: bool, compile: bool, ): total_cpus = os.cpu_count() start = time.time() current_gpu_vram = torch.cuda.memory_reserved() / 1024**3 with ProcessPoolExecutor( max_workers=workers, mp_context=get_context("spawn") ) as executor: cpus_per_worker = min(8, max(2, total_cpus // workers)) futures = [ executor.submit( single_batch, batch_size, cpus_per_worker, force_ocr, quantize, compile, i, ) for i in range(workers) ] all_times = [] min_time = None max_time = time.time() vrams = [] page_count = 0 char_count = 0 for future in tqdm(futures, desc="Running marker workers..."): times, min_time_worker, max_time_worker, max_vram, pages, chars = ( future.result() ) vrams.append(max_vram - current_gpu_vram) all_times.append(times) page_count += pages char_count += chars min_time = ( min(min_time_worker, min_time) if min_time is not None else min_time_worker ) max_time = max(max_time, max_time_worker) end = time.time() - start all_worker_time = max_time - min_time print(f"Average time per worker: {sum(all_times) / len(all_times)}") print(f"Max time per worker: {max(all_times)}") print(f"End to end time (counting model loading), all processes: {end}") print(f"End to end time (no model loading), all processes: {all_worker_time}") print(f"Total pages: {page_count}") print(f"Total characters: {char_count}") print(f"Time per page: {all_worker_time / page_count:.2f}") print(f"Characters per second: {char_count / all_worker_time:.2f}") print(f"Max GPU VRAM: {max(vrams):.2f} GB") print(f"Average GPU VRAM: {sum(vrams) / len(vrams):.2f} GB") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /marker/renderers/__init__.py: -------------------------------------------------------------------------------- ```python import base64 import io import re from collections import Counter from typing import Annotated, Optional, Tuple, Literal from bs4 import BeautifulSoup from pydantic import BaseModel from marker.schema import BlockTypes from marker.schema.blocks.base import BlockId, BlockOutput from marker.schema.document import Document from marker.settings import settings from marker.util import assign_config class BaseRenderer: image_blocks: Annotated[ Tuple[BlockTypes, ...], "The block types to consider as images." ] = (BlockTypes.Picture, BlockTypes.Figure) extract_images: Annotated[bool, "Extract images from the document."] = True image_extraction_mode: Annotated[ Literal["lowres", "highres"], "The mode to use for extracting images.", ] = "highres" keep_pageheader_in_output: Annotated[ bool, "Keep the page header in the output HTML." ] = False keep_pagefooter_in_output: Annotated[ bool, "Keep the page footer in the output HTML." ] = False add_block_ids: Annotated[bool, "Whether to add block IDs to the output HTML."] = ( False ) def __init__(self, config: Optional[BaseModel | dict] = None): assign_config(self, config) self.block_config = { "keep_pageheader_in_output": self.keep_pageheader_in_output, "keep_pagefooter_in_output": self.keep_pagefooter_in_output, "add_block_ids": self.add_block_ids, } def __call__(self, document): # Children are in reading order raise NotImplementedError def extract_image(self, document: Document, image_id, to_base64=False): image_block = document.get_block(image_id) cropped = image_block.get_image( document, highres=self.image_extraction_mode == "highres" ) if to_base64: image_buffer = io.BytesIO() # RGBA to RGB if not cropped.mode == "RGB": cropped = cropped.convert("RGB") cropped.save(image_buffer, format=settings.OUTPUT_IMAGE_FORMAT) cropped = base64.b64encode(image_buffer.getvalue()).decode( settings.OUTPUT_ENCODING ) return cropped @staticmethod def merge_consecutive_math(html, tag="math"): if not html: return html pattern = rf"-</{tag}>(\s*)<{tag}>" html = re.sub(pattern, " ", html) pattern = rf'-</{tag}>(\s*)<{tag} display="inline">' html = re.sub(pattern, " ", html) return html @staticmethod def merge_consecutive_tags(html, tag): if not html: return html def replace_whitespace(match): whitespace = match.group(1) if len(whitespace) == 0: return "" else: return " " pattern = rf"</{tag}>(\s*)<{tag}>" while True: new_merged = re.sub(pattern, replace_whitespace, html) if new_merged == html: break html = new_merged return html def generate_page_stats(self, document: Document, document_output): page_stats = [] for page in document.pages: block_counts = Counter( [str(block.block_type) for block in page.children] ).most_common() block_metadata = page.aggregate_block_metadata() page_stats.append( { "page_id": page.page_id, "text_extraction_method": page.text_extraction_method, "block_counts": block_counts, "block_metadata": block_metadata.model_dump(), } ) return page_stats def generate_document_metadata(self, document: Document, document_output): metadata = { "table_of_contents": document.table_of_contents, "page_stats": self.generate_page_stats(document, document_output), } if document.debug_data_path is not None: metadata["debug_data_path"] = document.debug_data_path return metadata def extract_block_html(self, document: Document, block_output: BlockOutput): soup = BeautifulSoup(block_output.html, "html.parser") content_refs = soup.find_all("content-ref") ref_block_id = None images = {} for ref in content_refs: src = ref.get("src") sub_images = {} for item in block_output.children: if item.id == src: content, sub_images_ = self.extract_block_html(document, item) sub_images.update(sub_images_) ref_block_id: BlockId = item.id break if ref_block_id.block_type in self.image_blocks and self.extract_images: images[ref_block_id] = self.extract_image( document, ref_block_id, to_base64=True ) else: images.update(sub_images) ref.replace_with(BeautifulSoup(content, "html.parser")) if block_output.id.block_type in self.image_blocks and self.extract_images: images[block_output.id] = self.extract_image( document, block_output.id, to_base64=True ) return str(soup), images ``` -------------------------------------------------------------------------------- /marker/processors/llm/llm_sectionheader.py: -------------------------------------------------------------------------------- ```python import json from typing import List, Tuple from tqdm import tqdm from marker.logger import get_logger from marker.processors.llm import BaseLLMComplexBlockProcessor from marker.schema import BlockTypes from marker.schema.blocks import Block from marker.schema.document import Document from marker.schema.groups import PageGroup from pydantic import BaseModel logger = get_logger() class LLMSectionHeaderProcessor(BaseLLMComplexBlockProcessor): page_prompt = """You're a text correction expert specializing in accurately analyzing complex PDF documents. You will be given a list of all of the section headers from a document, along with their page number and approximate dimensions. The headers will be formatted like below, and will be presented in order. ```json [ { "bbox": [x1, y1, x2, y2], "width": x2 - x1, "height": y2 - y1, "page": 0, "id": "/page/0/SectionHeader/1", "html": "<h1>Introduction</h1>", }, ... ] ``` Bboxes have been normalized to 0-1000. Your goal is to make sure that the section headers have the correct levels (h1, h2, h3, h4, h5, or h6). If a section header does not have the right level, edit the html to fix it. Guidelines: - Edit the blocks to ensure that the section headers have the correct levels. - Only edit the h1, h2, h3, h4, h5, and h6 tags. Do not change any other tags or content in the headers. - Only output the headers that changed (if nothing changed, output nothing). - Every header you output needs to have one and only one level tag (h1, h2, h3, h4, h5, or h6). **Instructions:** 1. Carefully examine the provided section headers and JSON. 2. Identify any changes you'll need to make, and write a short analysis. 3. Output "no_corrections", or "corrections_needed", depending on whether you need to make changes. 4. If corrections are needed, output any blocks that need updates. Only output the block ids and html, like this: ```json [ { "id": "/page/0/SectionHeader/1", "html": "<h2>Introduction</h2>" }, ... ] ``` **Example:** Input: Section Headers ```json [ { "bbox": [x1, y1, x2, y2], "id": "/page/0/SectionHeader/1", "page": 0, "html": "1 Vector Operations", }, { "bbox": [x1, y1, x2, y2], "id": "/page/0/SectionHeader/2", "page": 0, "html": "1.1 Vector Addition", }, ] ``` Output: Analysis: The first section header is missing the h1 tag, and the second section header is missing the h2 tag. ```json [ { "id": "/page/0/SectionHeader/1", "html": "<h1>1 Vector Operations</h1>" }, { "id": "/page/0/SectionHeader/2", "html": "<h2>1.1 Vector Addition</h2>" } ] ``` **Input:** Section Headers ```json {{section_header_json}} ``` """ def get_selected_blocks( self, document: Document, page: PageGroup, ) -> List[dict]: selected_blocks = page.structure_blocks(document) json_blocks = [ self.normalize_block_json(block, document, page, i) for i, block in enumerate(selected_blocks) ] return json_blocks def process_rewriting( self, document: Document, section_headers: List[Tuple[Block, dict]] ): section_header_json = [sh[1] for sh in section_headers] for item in section_header_json: _, _, page_id, block_type, block_id = item["id"].split("/") item["page"] = page_id item["width"] = item["bbox"][2] - item["bbox"][0] item["height"] = item["bbox"][3] - item["bbox"][1] del item["block_type"] # Not needed, since they're all section headers prompt = self.page_prompt.replace( "{{section_header_json}}", json.dumps(section_header_json) ) response = self.llm_service( prompt, None, document.pages[0], SectionHeaderSchema ) logger.debug(f"Got section header reponse from LLM: {response}") if not response or "correction_type" not in response: logger.warning("LLM did not return a valid response") return correction_type = response["correction_type"] if correction_type == "no_corrections": return self.load_blocks(response) self.handle_rewrites(response["blocks"], document) def load_blocks(self, response): if isinstance(response["blocks"], str): response["blocks"] = json.loads(response["blocks"]) def rewrite_blocks(self, document: Document): # Don't show progress if there are no blocks to process section_headers = [ (block, self.normalize_block_json(block, document, page)) for page in document.pages for block in page.structure_blocks(document) if block.block_type == BlockTypes.SectionHeader ] if len(section_headers) == 0: return pbar = tqdm( total=1, desc=f"Running {self.__class__.__name__}", disable=self.disable_tqdm, ) self.process_rewriting(document, section_headers) pbar.update(1) pbar.close() class BlockSchema(BaseModel): id: str html: str class SectionHeaderSchema(BaseModel): analysis: str correction_type: str blocks: List[BlockSchema] ``` -------------------------------------------------------------------------------- /marker/renderers/html.py: -------------------------------------------------------------------------------- ```python import textwrap from PIL import Image from typing import Annotated, Tuple from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning from pydantic import BaseModel from marker.renderers import BaseRenderer from marker.schema import BlockTypes from marker.schema.blocks import BlockId from marker.settings import settings # Ignore beautifulsoup warnings import warnings warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) # Suppress DecompressionBombError Image.MAX_IMAGE_PIXELS = None class HTMLOutput(BaseModel): html: str images: dict metadata: dict class HTMLRenderer(BaseRenderer): """ A renderer for HTML output. """ page_blocks: Annotated[ Tuple[BlockTypes], "The block types to consider as pages.", ] = (BlockTypes.Page,) paginate_output: Annotated[ bool, "Whether to paginate the output.", ] = False def extract_image(self, document, image_id): image_block = document.get_block(image_id) cropped = image_block.get_image( document, highres=self.image_extraction_mode == "highres" ) return cropped def insert_block_id(self, soup, block_id: BlockId): """ Insert a block ID into the soup as a data attribute. """ if block_id.block_type in [BlockTypes.Line, BlockTypes.Span]: return soup if self.add_block_ids: # Find the outermost tag (first tag that isn't a NavigableString) outermost_tag = None for element in soup.contents: if hasattr(element, "name") and element.name: outermost_tag = element break # If we found an outermost tag, add the data-block-id attribute if outermost_tag: outermost_tag["data-block-id"] = str(block_id) # If soup only contains text or no tags, wrap in a span elif soup.contents: wrapper = soup.new_tag("span") wrapper["data-block-id"] = str(block_id) contents = list(soup.contents) for content in contents: content.extract() wrapper.append(content) soup.append(wrapper) return soup def extract_html(self, document, document_output, level=0): soup = BeautifulSoup(document_output.html, "html.parser") content_refs = soup.find_all("content-ref") ref_block_id = None images = {} for ref in content_refs: src = ref.get("src") sub_images = {} content = "" for item in document_output.children: if item.id == src: content, sub_images_ = self.extract_html(document, item, level + 1) sub_images.update(sub_images_) ref_block_id: BlockId = item.id break if ref_block_id.block_type in self.image_blocks: if self.extract_images: image = self.extract_image(document, ref_block_id) image_name = f"{ref_block_id.to_path()}.{settings.OUTPUT_IMAGE_FORMAT.lower()}" images[image_name] = image element = BeautifulSoup( f"<p>{content}<img src='{image_name}'></p>", "html.parser" ) ref.replace_with(self.insert_block_id(element, ref_block_id)) else: # This will be the image description if using llm mode, or empty if not element = BeautifulSoup(f"{content}", "html.parser") ref.replace_with(self.insert_block_id(element, ref_block_id)) elif ref_block_id.block_type in self.page_blocks: images.update(sub_images) if self.paginate_output: content = f"<div class='page' data-page-id='{ref_block_id.page_id}'>{content}</div>" element = BeautifulSoup(f"{content}", "html.parser") ref.replace_with(self.insert_block_id(element, ref_block_id)) else: images.update(sub_images) element = BeautifulSoup(f"{content}", "html.parser") ref.replace_with(self.insert_block_id(element, ref_block_id)) output = str(soup) if level == 0: output = self.merge_consecutive_tags(output, "b") output = self.merge_consecutive_tags(output, "i") output = self.merge_consecutive_math( output ) # Merge consecutive inline math tags output = textwrap.dedent(f""" <!DOCTYPE html> <html> <head> <meta charset="utf-8" /> </head> <body> {output} </body> </html> """) return output, images def __call__(self, document) -> HTMLOutput: document_output = document.render(self.block_config) full_html, images = self.extract_html(document, document_output) soup = BeautifulSoup(full_html, "html.parser") full_html = soup.prettify() # Add indentation to the HTML return HTMLOutput( html=full_html, images=images, metadata=self.generate_document_metadata(document, document_output), ) ``` -------------------------------------------------------------------------------- /marker/extractors/page.py: -------------------------------------------------------------------------------- ```python import json from concurrent.futures import ThreadPoolExecutor from pydantic import BaseModel from typing import Annotated, Optional, List from tqdm import tqdm from marker.extractors import BaseExtractor from marker.logger import get_logger logger = get_logger() class PageExtractionSchema(BaseModel): description: str detailed_notes: str class PageExtractor(BaseExtractor): """ An extractor that pulls data from a single page. """ extraction_page_chunk_size: Annotated[ int, "The number of pages to chunk together for extraction." ] = 3 page_schema: Annotated[ str, "The JSON schema to be extracted from the page.", ] = "" page_extraction_prompt = """You are an expert document analyst who reads documents and pulls data out in JSON format. You will receive the markdown representation of a document page, and a JSON schema that we want to extract from the document. Your task is to write detailed notes on this page, so that when you look at all your notes from across the document, you can fill in the schema. Some notes: - The schema may contain a single object to extract from the entire document, or an array of objects. - The schema may contain nested objects, arrays, and other complex structures. Some guidelines: - Write very thorough notes, and include specific JSON snippets that can be extracted from the page. - You may need information from prior or subsequent pages to fully fill in the schema, so make sure to write detailed notes that will let you join entities across pages later on. - Estimate your confidence in the values you extract, so you can reconstruct the JSON later when you only have your notes. - Some tables and other data structures may continue on a subsequent page, so make sure to store the positions that data comes from where appropriate. **Instructions:** 1. Analyze the provided markdown representation of the page. 2. Analyze the JSON schema. 3. Write a short description of the fields in the schema, and the associated values in the markdown. 4. Write detailed notes on the page, including any values that can be extracted from the markdown. Include snippets of JSON that can be extracted from the page where possible. **Example:** Input: Markdown ```markdown | Make | Sales | |--------|-------| | Honda | 100 | | Toyota | 200 | ``` Schema ```json {'$defs': {'Cars': {'properties': {'make': {'title': 'Make', 'type': 'string'}, 'sales': {'title': 'Sales', 'type': 'integer'}, 'color': {'title': 'Color', 'type': 'string'}}, 'required': ['make', 'sales', 'color'], 'title': 'Cars', 'type': 'object'}}, 'properties': {'cars': {'items': {'$ref': '#/$defs/Cars'}, 'title': 'Cars', 'type': 'array'}}, 'required': ['cars'], 'title': 'CarsList', 'type': 'object'} ``` Output: Description: The schema has a list of cars, each with a make, sales, and color. The image and markdown contain a table with 2 cars: Honda with 100 sales and Toyota with 200 sales. The color is not present in the table. Detailed Notes: On this page, I see a table with car makes and sales. The makes are Honda and Toyota, with sales of 100 and 200 respectively. The color is not present in the table, so I will leave it blank in the JSON. That information may be present on another page. Some JSON snippets I may find useful later are: ```json { "make": "Honda", "sales": 100, } ``` ```json { "make": "Toyota", "sales": 200, } ``` Honda is the first row in the table, and Toyota is the second row. Make is the first column, and sales is the second. **Input:** Markdown ```markdown {{page_md}} ``` Schema ```json {{schema}} ``` """ def chunk_page_markdown(self, page_markdown: List[str]) -> List[str]: """ Chunk the page markdown into smaller pieces for processing. """ chunks = [] for i in range(0, len(page_markdown), self.extraction_page_chunk_size): chunk = page_markdown[i : i + self.extraction_page_chunk_size] chunks.append("\n\n".join(chunk)) return chunks def inference_single_chunk( self, page_markdown: str ) -> Optional[PageExtractionSchema]: prompt = self.page_extraction_prompt.replace( "{{page_md}}", page_markdown ).replace("{{schema}}", json.dumps(self.page_schema)) response = self.llm_service(prompt, None, None, PageExtractionSchema) logger.debug(f"Page extraction response: {response}") if not response or any( [ key not in response for key in [ "description", "detailed_notes", ] ] ): return None return PageExtractionSchema( description=response["description"], detailed_notes=response["detailed_notes"], ) def __call__( self, page_markdown: List[str], **kwargs, ) -> List[PageExtractionSchema]: if not self.page_schema: raise ValueError( "Page schema must be defined for structured extraction to work." ) chunks = self.chunk_page_markdown(page_markdown) results = [] pbar = tqdm( desc="Running page extraction", disable=self.disable_tqdm, total=len(chunks), ) with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor: for future in [ executor.submit(self.inference_single_chunk, chunk) for chunk in chunks ]: results.append(future.result()) # Raise exceptions if any occurred pbar.update(1) pbar.close() return results ``` -------------------------------------------------------------------------------- /marker/scripts/common.py: -------------------------------------------------------------------------------- ```python import ast import base64 import io import re import sys from typing import Optional from PIL import Image import click import pypdfium2 import streamlit as st from pydantic import BaseModel from streamlit.runtime.uploaded_file_manager import UploadedFile from marker.config.parser import ConfigParser from marker.config.printer import CustomClickPrinter from marker.models import create_model_dict from marker.settings import settings @st.cache_data() def parse_args(): # Use to grab common cli options @ConfigParser.common_options def options_func(): pass def extract_click_params(decorated_function): if hasattr(decorated_function, "__click_params__"): return decorated_function.__click_params__ return [] cmd = CustomClickPrinter("Marker app.") extracted_params = extract_click_params(options_func) cmd.params.extend(extracted_params) ctx = click.Context(cmd) try: cmd_args = sys.argv[1:] cmd.parse_args(ctx, cmd_args) return ctx.params except click.exceptions.ClickException as e: return {"error": str(e)} @st.cache_resource() def load_models(): return create_model_dict() def open_pdf(pdf_file): stream = io.BytesIO(pdf_file.getvalue()) return pypdfium2.PdfDocument(stream) def img_to_html(img, img_alt): img_bytes = io.BytesIO() img.save(img_bytes, format=settings.OUTPUT_IMAGE_FORMAT) img_bytes = img_bytes.getvalue() encoded = base64.b64encode(img_bytes).decode() img_html = f'<img src="data:image/{settings.OUTPUT_IMAGE_FORMAT.lower()};base64,{encoded}" alt="{img_alt}" style="max-width: 100%;">' return img_html @st.cache_data() def get_page_image(pdf_file, page_num, dpi=96): if "pdf" in pdf_file.type: doc = open_pdf(pdf_file) page = doc[page_num] png_image = ( page.render( scale=dpi / 72, ) .to_pil() .convert("RGB") ) else: png_image = Image.open(pdf_file).convert("RGB") return png_image @st.cache_data() def page_count(pdf_file: UploadedFile): if "pdf" in pdf_file.type: doc = open_pdf(pdf_file) return len(doc) - 1 else: return 1 def pillow_image_to_base64_string(img: Image) -> str: buffered = io.BytesIO() img.save(buffered, format="JPEG") return base64.b64encode(buffered.getvalue()).decode("utf-8") def extract_root_pydantic_class(schema_code: str) -> Optional[str]: try: # Parse the code into an AST tree = ast.parse(schema_code) # Find all class definitions that inherit from BaseModel class_names = set() class_info = {} # Store information about each class for node in ast.walk(tree): if isinstance(node, ast.ClassDef): # Check if this class inherits from BaseModel is_pydantic = False for base in node.bases: if isinstance(base, ast.Name) and base.id == "BaseModel": is_pydantic = True break if is_pydantic: class_names.add(node.name) class_info[node.name] = { "references": set(), # Classes this class references "fields": [], # Field names in this class } # Extract field information for item in node.body: if isinstance(item, ast.AnnAssign) and isinstance( item.target, ast.Name ): field_name = item.target.id class_info[node.name]["fields"].append(field_name) # Check if this field references another class annotation_str = ast.unparse(item.annotation) # Look for List[ClassName], Optional[ClassName], Dict[Any, ClassName], etc. for other_class in class_names: pattern = rf"(?:List|Dict|Set|Tuple|Optional|Union)?\[.*{other_class}.*\]|{other_class}" if re.search(pattern, annotation_str): class_info[node.name]["references"].add(other_class) if len(class_names) == 1: return list(class_names)[0] referenced_classes = set() for class_name, info in class_info.items(): referenced_classes.update(info["references"]) # Find classes that reference others but aren't referenced themselves (potential roots) root_candidates = set() for class_name, info in class_info.items(): if info["references"] and class_name not in referenced_classes: root_candidates.add(class_name) # If we found exactly one root candidate, return it if len(root_candidates) == 1: return list(root_candidates)[0] return None except Exception as e: print(f"Error parsing schema: {e}") return None def get_root_class(schema_code: str) -> Optional[BaseModel]: root_class_name = extract_root_pydantic_class(schema_code) if not root_class_name: return None if "from pydantic" not in schema_code: schema_code = "from pydantic import BaseModel\n" + schema_code if "from typing" not in schema_code: schema_code = ( "from typing import List, Dict, Optional, Set, Tuple, Union, Any\n\n" + schema_code ) # Execute the code in a new namespace namespace = {} exec(schema_code, namespace) # Return the root class object return namespace.get(root_class_name) ```