.\marker\marker\models.py
from marker.cleaners.equations import load_texify_model
from marker.ordering import load_ordering_model
from marker.postprocessors.editor import load_editing_model
from marker.segmentation import load_layout_model
def load_all_models():edit = load_editing_model()order = load_ordering_model()layout = load_layout_model()texify = load_texify_model()model_lst = [texify, layout, order, edit]return model_lst
.\marker\marker\ocr\page.py
import io
from typing import List, Optional import fitz as pymupdf
import ocrmypdf
from spellchecker import SpellChecker from marker.ocr.utils import detect_bad_ocr
from marker.schema import Block
from marker.settings import settings ocrmypdf.configure_logging(verbosity=ocrmypdf.Verbosity.quiet)
def ocr_entire_page(page, lang: str, spellchecker: Optional[SpellChecker] = None) -> List[Block]:if settings.OCR_ENGINE == "tesseract":return ocr_entire_page_tess(page, lang, spellchecker)elif settings.OCR_ENGINE == "ocrmypdf":return ocr_entire_page_ocrmp(page, lang, spellchecker)else:raise ValueError(f"Unknown OCR engine {settings.OCR_ENGINE}")
def ocr_entire_page_tess(page, lang: str, spellchecker: Optional[SpellChecker] = None) -> List[Block]:try:full_tp = page.get_textpage_ocr(flags=settings.TEXT_FLAGS, dpi=settings.OCR_DPI, full=True, language=lang)blocks = page.get_text("dict", sort=True, flags=settings.TEXT_FLAGS, textpage=full_tp)["blocks"]full_text = page.get_text("text", sort=True, flags=settings.TEXT_FLAGS, textpage=full_tp)if len(full_text) == 0:return []if detect_bad_ocr(full_text, spellchecker):return []except RuntimeError:return []return blocks
def ocr_entire_page_ocrmp(page, lang: str, spellchecker: Optional[SpellChecker] = None) -> List[Block]:src = page.parent blank_doc = pymupdf.open() blank_doc.insert_pdf(src, from_page=page.number, to_page=page.number, annots=False, links=False) pdfbytes = blank_doc.tobytes() inbytes = io.BytesIO(pdfbytes) outbytes = io.BytesIO() ocrmypdf.ocr(inbytes,outbytes,language=lang,output_type="pdf",redo_ocr=None if settings.OCR_ALL_PAGES else True,force_ocr=True if settings.OCR_ALL_PAGES else None,progress_bar=False,optimize=False,fast_web_view=1e6,skip_big=15, tesseract_timeout=settings.TESSERACT_TIMEOUT,tesseract_non_ocr_timeout=settings.TESSERACT_TIMEOUT,)ocr_pdf = pymupdf.open("pdf", outbytes.getvalue()) blocks = ocr_pdf[0].get_text("dict", sort=True, flags=settings.TEXT_FLAGS)["blocks"]full_text = ocr_pdf[0].get_text("text", sort=True, flags=settings.TEXT_FLAGS)assert page.bound() == ocr_pdf[0].bound()if len(full_text) == 0:return []if detect_bad_ocr(full_text, spellchecker):return []return blocks
.\marker\marker\ocr\utils.py
from typing import Optional
from nltk import wordpunct_tokenize
from spellchecker import SpellChecker
from marker.settings import settings
import re
def detect_bad_ocr(text, spellchecker: Optional[SpellChecker], misspell_threshold=.7, space_threshold=.6, newline_threshold=.5, alphanum_threshold=.4):if len(text) == 0:return Truewords = wordpunct_tokenize(text)words = [w for w in words if w.strip()]alpha_words = [word for word in words if word.isalnum()]if spellchecker:misspelled = spellchecker.unknown(alpha_words)if len(misspelled) > len(alpha_words) * misspell_threshold:return Truespaces = len(re.findall(r'\s+', text))alpha_chars = len(re.sub(r'\s+', '', text))if spaces / (alpha_chars + spaces) > space_threshold:return Truenewlines = len(re.findall(r'\n+', text))non_newlines = len(re.sub(r'\n+', '', text))if newlines / (newlines + non_newlines) > newline_threshold:return Trueif alphanum_ratio(text) < alphanum_threshold: return Trueinvalid_chars = len([c for c in text if c in settings.INVALID_CHARS])if invalid_chars > max(3.0, len(text) * .02):return Truereturn False
def font_flags_decomposer(flags):l = []if flags & 2 ** 0:l.append("superscript")if flags & 2 ** 1:l.append("italic")if flags & 2 ** 2:l.append("serifed")else:l.append("sans")if flags & 2 ** 3:l.append("monospaced")else:l.append("proportional")if flags & 2 ** 4:l.append("bold")return "_".join(l)
def alphanum_ratio(text):text = text.replace(" ", "")text = text.replace("\n", "")alphanumeric_count = sum([1 for c in text if c.isalnum()])if len(text) == 0:return 1ratio = alphanumeric_count / len(text)return ratio
.\marker\marker\ordering.py
from copy import deepcopy
from typing import List
import torch
import sys, os
from marker.extract_text import convert_single_page
from transformers import LayoutLMv3ForSequenceClassification, LayoutLMv3Processor
from PIL import Image
import io
from marker.schema import Page
from marker.settings import settings
processor = LayoutLMv3Processor.from_pretrained(settings.ORDERER_MODEL_NAME)
def load_ordering_model():model = LayoutLMv3ForSequenceClassification.from_pretrained(settings.ORDERER_MODEL_NAME,torch_dtype=settings.MODEL_DTYPE,).to(settings.TORCH_DEVICE_MODEL)model.eval()return model
def get_inference_data(page, page_blocks: Page):bboxes = deepcopy([block.bbox for block in page_blocks.blocks])words = ["."] * len(bboxes)pix = page.get_pixmap(dpi=settings.LAYOUT_DPI, annots=False, clip=page_blocks.bbox)png = pix.pil_tobytes(format="PNG")rgb_image = Image.open(io.BytesIO(png)).convert("RGB")page_box = page_blocks.bboxpwidth = page_blocks.widthpheight = page_blocks.heightfor box in bboxes:if box[0] < page_box[0]:box[0] = page_box[0]if box[1] < page_box[1]:box[1] = page_box[1]if box[2] > page_box[2]:box[2] = page_box[2]if box[3] > page_box[3]:box[3] = page_box[3]box[0] = int(box[0] / pwidth * 1000)box[1] = int(box[1] / pheight * 1000)box[2] = int(box[2] / pwidth * 1000)box[3] = int(box[3] / pheight * 1000)return rgb_image, bboxes, words
def batch_inference(rgb_images, bboxes, words, model):encoding = processor(rgb_images,text=words,boxes=bboxes,return_tensors="pt",truncation=True,padding="max_length",max_length=128)encoding["pixel_values"] = encoding["pixel_values"].to(model.dtype)with torch.inference_mode():for k in ["bbox", "input_ids", "pixel_values", "attention_mask"]:encoding[k] = encoding[k].to(model.device)outputs = model(**encoding)logits = outputs.logitspredictions = logits.argmax(-1).squeeze().tolist()if isinstance(predictions, int):predictions = [predictions]predictions = [model.config.id2label[p] for p in predictions]return predictions
def add_column_counts(doc, doc_blocks, model, batch_size):for i in range(0, len(doc_blocks), batch_size):batch = range(i, min(i + batch_size, len(doc_blocks)))rgb_images = []bboxes = []words = []for pnum in batch:page = doc[pnum]rgb_image, page_bboxes, page_words = get_inference_data(page, doc_blocks[pnum])rgb_images.append(rgb_image)bboxes.append(page_bboxes)words.append(page_words)predictions = batch_inference(rgb_images, bboxes, words, model)for pnum, prediction in zip(batch, predictions):doc_blocks[pnum].column_count = prediction
def order_blocks(doc, doc_blocks: List[Page], model, batch_size=settings.ORDERER_BATCH_SIZE):add_column_counts(doc, doc_blocks, model, batch_size)for page_blocks in doc_blocks:if page_blocks.column_count > 1:split_pos = page_blocks.x_start + page_blocks.width / 2left_blocks = []right_blocks = []for block in page_blocks.blocks:if block.x_start <= split_pos:left_blocks.append(block)else:right_blocks.append(block)page_blocks.blocks = left_blocks + right_blocksreturn doc_blocks
.\marker\marker\postprocessors\editor.py
from collections import defaultdict, Counter
from itertools import chain
from typing import Optional
from transformers import AutoTokenizer
from marker.settings import settings
import torch
import torch.nn.functional as F
from marker.postprocessors.t5 import T5ForTokenClassification, byt5_tokenize
def load_editing_model():if not settings.ENABLE_EDITOR_MODEL:return Nonemodel = T5ForTokenClassification.from_pretrained(settings.EDITOR_MODEL_NAME,torch_dtype=settings.MODEL_DTYPE,).to(settings.TORCH_DEVICE_MODEL)model.eval()model.config.label2id = {"equal": 0,"delete": 1,"newline-1": 2,"space-1": 3,}model.config.id2label = {v: k for k, v in model.config.label2id.items()}return model
def edit_full_text(text: str, model: Optional[T5ForTokenClassification], batch_size: int = settings.EDITOR_BATCH_SIZE):if not model:return text, {}tokenized = byt5_tokenize(text, settings.EDITOR_MAX_LENGTH)input_ids = tokenized["input_ids"]char_token_lengths = tokenized["char_token_lengths"]token_masks = []for i in range(0, len(input_ids), batch_size):batch_input_ids = tokenized["input_ids"][i: i + batch_size]batch_input_ids = torch.tensor(batch_input_ids, device=model.device)batch_attention_mask = tokenized["attention_mask"][i: i + batch_size]batch_attention_mask = torch.tensor(batch_attention_mask, device=model.device)with torch.inference_mode():predictions = model(batch_input_ids, attention_mask=batch_attention_mask)logits = predictions.logits.cpu()probs = F.softmax(logits, dim=-1)max_prob = torch.max(probs, dim=-1)cutoff_prob = max_prob.values < settings.EDITOR_CUTOFF_THRESHlabels = logits.argmax(-1)labels[cutoff_prob] = model.config.label2id["equal"]labels = labels.squeeze().tolist()if len(labels) == settings.EDITOR_MAX_LENGTH:labels = [labels]labels = list(chain.from_iterable(labels))token_masks.extend(labels)flat_input_ids = list(chain.from_iterable(input_ids)assert len(token_masks) == len(flat_input_ids)token_masks = [mask for mask, token in zip(token_masks, flat_input_ids) if token >= 2]assert len(token_masks) == len(list(text.encode("utf-8")))edit_stats = defaultdict(int)out_text = []start = 0for i, char in enumerate(text):char_token_length = char_token_lengths[i]masks = token_masks[start: start + char_token_length]labels = [model.config.id2label[mask] for mask in masks]if all(l == "delete" for l in labels):if char.strip():out_text.append(char)else:edit_stats["delete"] += 1elif labels[0] == "newline-1":out_text.append("\n")out_text.append(char)edit_stats["newline-1"] += 1elif labels[0] == "space-1":out_text.append(" ")out_text.append(char)edit_stats["space-1"] += 1else:out_text.append(char)edit_stats["equal"] += 1start += char_token_lengthout_text = "".join(out_text)return out_text, edit_stats
.\marker\marker\postprocessors\t5.py
from transformers import T5Config, T5PreTrainedModel
import torch
from torch import nn
from copy import deepcopy
from typing import Optional, Tuple, Union, List
from itertools import chain
from transformers.modeling_outputs import TokenClassifierOutput
from transformers.models.t5.modeling_t5 import T5Stack
from transformers.utils.model_parallel_utils import get_device_map, assert_device_map
def byt5_tokenize(text: str, max_length: int, pad_token_id: int = 0):byte_codes = []for char in text:byte_codes.append([byte + 3 for byte in char.encode('utf-8')])tokens = list(chain.from_iterable(byte_codes))char_token_lengths = [len(b) for b in byte_codes]batched_tokens = []attention_mask = []for i in range(0, len(tokens), max_length):batched_tokens.append(tokens[i:i + max_length])attention_mask.append([1] * len(batched_tokens[-1])if len(batched_tokens[-1]) < max_length:batched_tokens[-1] += [pad_token_id] * (max_length - len(batched_tokens[-1]))attention_mask[-1] += [0] * (max_length - len(attention_mask[-1]))return {"input_ids": batched_tokens, "attention_mask": attention_mask, "char_token_lengths": char_token_lengths}
class T5ForTokenClassification(T5PreTrainedModel):_keys_to_ignore_on_load_missing = [r"encoder.embed_tokens.weight"]def __init__(self, config: T5Config):super().__init__(config)self.model_dim = config.d_modelself.shared = nn.Embedding(config.vocab_size, config.d_model)encoder_config = deepcopy(config)encoder_config.is_decoder = Falseencoder_config.is_encoder_decoder = Falseencoder_config.use_cache = Falseself.encoder = T5Stack(encoder_config, self.shared)classifier_dropout = (config.classifier_dropout if hasattr(config, 'classifier_dropout') else config.dropout_rate)self.dropout = nn.Dropout(classifier_dropout)self.classifier = nn.Linear(config.d_model, config.num_labels)self.post_init()self.model_parallel = Falseself.device_map = Nonedef parallelize(self, device_map=None):self.device_map = (get_device_map(len(self.encoder.block), range(torch.cuda.device_count()))if device_map is Noneelse device_map)assert_device_map(self.device_map, len(self.encoder.block))self.encoder.parallelize(self.device_map)self.classifier.to(self.encoder.first_device)self.model_parallel = Truedef deparallelize(self):self.encoder.deparallelize()self.encoder = self.encoder.to("cpu")self.classifier = self.classifier.to("cpu")self.model_parallel = Falseself.device_map = Nonetorch.cuda.empty_cache()def get_input_embeddings(self):return self.shareddef set_input_embeddings(self, new_embeddings):self.shared = new_embeddingsself.encoder.set_input_embeddings(new_embeddings)def get_encoder(self):return self.encoderdef _prune_heads(self, heads_to_prune):for layer, heads in heads_to_prune.items():self.encoder.block[layer].layer[0].SelfAttention.prune_heads(heads)def forward(self,input_ids: Optional[torch.LongTensor] = None,attention_mask: Optional[torch.FloatTensor] = None,head_mask: Optional[torch.FloatTensor] = None,inputs_embeds: Optional[torch.FloatTensor] = None,labels: Optional[torch.LongTensor] = None,output_attentions: Optional[bool] = None,output_hidden_states: Optional[bool] = None,return_dict: Optional[bool] = None,) -> Union[Tuple[torch.FloatTensor], TokenClassifierOutput]:return_dict = return_dict if return_dict is not None else self.config.use_return_dictoutputs = self.encoder(input_ids=input_ids,attention_mask=attention_mask,inputs_embeds=inputs_embeds,head_mask=head_mask,output_attentions=output_attentions,output_hidden_states=output_hidden_states,return_dict=return_dict,)sequence_output = outputs[0]sequence_output = self.dropout(sequence_output)logits = self.classifier(sequence_output)loss = Noneif not return_dict:output = (logits,) + outputs[2:]return ((loss,) + output) if loss is not None else outputreturn TokenClassifierOutput(loss=loss,logits=logits,hidden_states=outputs.hidden_states,attentions=outputs.attentions)
.\marker\marker\schema.py
from collections import Counter
from typing import List, Optional, Tuple
from pydantic import BaseModel, field_validator
import ftfy
from marker.bbox import boxes_intersect_pct, multiple_boxes_intersect
from marker.settings import settings
def find_span_type(span, page_blocks):block_type = "Text"for block in page_blocks:if boxes_intersect_pct(span.bbox, block.bbox):block_type = block.block_typebreakreturn block_type
class BboxElement(BaseModel):bbox: List[float]@field_validator('bbox')@classmethoddef check_4_elements(cls, v: List[float]) -> List[float]:if len(v) != 4:raise ValueError('bbox must have 4 elements')return v@propertydef height(self):return self.bbox[3] - self.bbox[1]@propertydef width(self):return self.bbox[2] - self.bbox[0]@propertydef x_start(self):return self.bbox[0]@propertydef y_start(self):return self.bbox[1]@propertydef area(self):return self.width * self.height
class BlockType(BboxElement):block_type: str
class Span(BboxElement):text: strspan_id: strfont: strcolor: intascender: Optional[float] = Nonedescender: Optional[float] = Noneblock_type: Optional[str] = Noneselected: bool = True@field_validator('text')@classmethoddef fix_unicode(cls, text: str) -> str:return ftfy.fix_text(text)
class Line(BboxElement):spans: List[Span]@propertydef prelim_text(self):return "".join([s.text for s in self.spans])@propertydef start(self):return self.spans[0].bbox[0]
class Block(BboxElement):lines: List[Line]pnum: int@propertydef prelim_text(self):return "\n".join([l.prelim_text for l in self.lines])def contains_equation(self, equation_boxes=None):conditions = [s.block_type == "Formula" for l in self.lines for s in l.spans]if equation_boxes:conditions += [multiple_boxes_intersect(self.bbox, equation_boxes)]return any(conditions)def filter_spans(self, bad_span_ids):new_lines = []for line in self.lines:new_spans = []for span in line.spans:if not span.span_id in bad_span_ids:new_spans.append(span)line.spans = new_spansif len(new_spans) > 0:new_lines.append(line)self.lines = new_linesdef filter_bad_span_types(self):new_lines = []for line in self.lines:new_spans = []for span in line.spans:if span.block_type not in settings.BAD_SPAN_TYPES:new_spans.append(span)line.spans = new_spansif len(new_spans) > 0:new_lines.append(line)self.lines = new_linesdef most_common_block_type(self):counter = Counter([s.block_type for l in self.lines for s in l.spans])return counter.most_common(1)[0][0]def set_block_type(self, block_type):for line in self.lines:for span in line.spans:span.block_type = block_type
class Page(BboxElement):blocks: List[Block]pnum: intcolumn_count: Optional[int] = Nonerotation: Optional[int] = None def get_nonblank_lines(self):lines = self.get_all_lines()nonblank_lines = [l for l in lines if l.prelim_text.strip()]return nonblank_linesdef get_all_lines(self):lines = [l for b in self.blocks for l in b.lines]return linesdef get_nonblank_spans(self) -> List[Span]:lines = [l for b in self.blocks for l in b.lines]spans = [s for l in lines for s in l.spans if s.text.strip()]return spansdef add_block_types(self, page_block_types):if len(page_block_types) != len(self.get_all_lines()):print(f"Warning: Number of detected lines {len(page_block_types)} does not match number of lines {len(self.get_all_lines())}")i = 0for block in self.blocks:for line in block.lines:if i < len(page_block_types):line_block_type = page_block_types[i].block_typeelse:line_block_type = "Text"i += 1for span in line.spans:span.block_type = line_block_typedef get_font_stats(self):fonts = [s.font for s in self.get_nonblank_spans()]font_counts = Counter(fonts)return font_countsdef get_line_height_stats(self):heights = [l.bbox[3] - l.bbox[1] for l in self.get_nonblank_lines()]height_counts = Counter(heights)return height_countsdef get_line_start_stats(self):starts = [l.bbox[0] for l in self.get_nonblank_lines()]start_counts = Counter(starts)return start_countsdef get_min_line_start(self):starts = [l.bbox[0] for l in self.get_nonblank_lines() if l.spans[0].block_type == "Text"]if len(starts) == 0:raise IndexError("No lines found")return min(starts)@propertydef prelim_text(self):return "\n".join([b.prelim_text for b in self.blocks])
class MergedLine(BboxElement):text: strfonts: List[str]def most_common_font(self):counter = Counter(self.fonts)return counter.most_common(1)[0][0]
class MergedBlock(BboxElement):lines: List[MergedLine]pnum: intblock_types: List[str]def most_common_block_type(self):counter = Counter(self.block_types)return counter.most_common(1)[0][0]
class FullyMergedBlock(BaseModel):text: strblock_type: str
.\marker\marker\segmentation.py
from concurrent.futures import ThreadPoolExecutor
from typing import Listfrom transformers import LayoutLMv3ForTokenClassification
from marker.bbox import unnormalize_box
from transformers.models.layoutlmv3.image_processing_layoutlmv3 import normalize_box
import io
from PIL import Image
from transformers import LayoutLMv3Processor
import numpy as np
from marker.settings import settings
from marker.schema import Page, BlockType
import torch
from math import isclose
Image.MAX_IMAGE_PIXELS = None
processor = LayoutLMv3Processor.from_pretrained(settings.LAYOUT_MODEL_NAME, apply_ocr=False)
CHUNK_KEYS = ["input_ids", "attention_mask", "bbox", "offset_mapping"]
NO_CHUNK_KEYS = ["pixel_values"]
def load_layout_model():model = LayoutLMv3ForTokenClassification.from_pretrained(settings.LAYOUT_MODEL_NAME,torch_dtype=settings.MODEL_DTYPE,).to(settings.TORCH_DEVICE_MODEL)model.config.id2label = {0: "Caption",1: "Footnote",2: "Formula",3: "List-item",4: "Page-footer",5: "Page-header",6: "Picture",7: "Section-header",8: "Table",9: "Text",10: "Title"}model.config.label2id = {v: k for k, v in model.config.id2label.items()}return model
def detect_document_block_types(doc, blocks: List[Page], layoutlm_model, batch_size=settings.LAYOUT_BATCH_SIZE):encodings, metadata, sample_lengths = get_features(doc, blocks)predictions = predict_block_types(encodings, layoutlm_model, batch_size)block_types = match_predictions_to_boxes(encodings, predictions, metadata, sample_lengths, layoutlm_model)assert len(block_types) == len(blocks)return block_types
def get_provisional_boxes(pred, box, is_subword, start_idx=0):prov_predictions = [pred_ for idx, pred_ in enumerate(pred) if not is_subword[idx]][start_idx:]prov_boxes = [box_ for idx, box_ in enumerate(box) if not is_subword[idx]][start_idx:]return prov_predictions, prov_boxes
def get_page_encoding(page, page_blocks: Page):if len(page_blocks.get_all_lines()) == 0:return [], []page_box = page_blocks.bboxpwidth = page_blocks.widthpheight = page_blocks.heightpix = page.get_pixmap(dpi=settings.LAYOUT_DPI, annots=False, clip=page_blocks.bbox)png = pix.pil_tobytes(format="PNG")png_image = Image.open(io.BytesIO(png))rgb_image = png_image.convert('RGB')rgb_width, rgb_height = rgb_image.sizeassert isclose(rgb_width / pwidth, rgb_height / pheight, abs_tol=2e-2)lines = page_blocks.get_all_lines()boxes = []text = []for line in lines:box = line.bboxif box[0] < page_box[0]:box[0] = page_box[0]if box[1] < page_box[1]:box[1] = page_box[1]if box[2] > page_box[2]:box[2] = page_box[2]if box[3] > page_box[3]:box[3] = page_box[3]if box[2] <= box[0]:print("Zero width box found, cannot convert properly")raise ValueErrorif box[3] <= box[1]:print("Zero height box found, cannot convert properly")raise ValueErrorboxes.append(box)text.append(line.prelim_text)boxes = [normalize_box(box, pwidth, pheight) for box in boxes]for box in boxes:assert(len(box) == 4)assert(max(box)) <= 1000assert(min(box)) >= 0encoding = processor(rgb_image,text=text,boxes=boxes,return_offsets_mapping=True,truncation=True,return_tensors="pt",stride=settings.LAYOUT_CHUNK_OVERLAP,padding="max_length",max_length=settings.LAYOUT_MODEL_MAX,return_overflowing_tokens=True)offset_mapping = encoding.pop('offset_mapping')overflow_to_sample_mapping = encoding.pop('overflow_to_sample_mapping')bbox = list(encoding["bbox"])input_ids = list(encoding["input_ids"])attention_mask = list(encoding["attention_mask"])pixel_values = list(encoding["pixel_values"])assert len(bbox) == len(input_ids) == len(attention_mask) == len(pixel_values) == len(offset_mapping)list_encoding = []for i in range(len(bbox)):list_encoding.append({"bbox": bbox[i],"input_ids": input_ids[i],"attention_mask": attention_mask[i],"pixel_values": pixel_values[i],"offset_mapping": offset_mapping[i]})other_data = {"original_bbox": boxes,"pwidth": pwidth,"pheight": pheight,}return list_encoding, other_data
def get_features(doc, blocks):encodings = []metadata = []sample_lengths = []for i in range(len(blocks)):encoding, other_data = get_page_encoding(doc[i], blocks[i])encodings.extend(encoding)metadata.append(other_data)sample_lengths.append(len(encoding))return encodings, metadata, sample_lengths
def predict_block_types(encodings, layoutlm_model, batch_size):all_predictions = []for i in range(0, len(encodings), batch_size):batch_start = ibatch_end = min(i + batch_size, len(encodings))batch = encodings[batch_start:batch_end]model_in = {}for k in ["bbox", "input_ids", "attention_mask", "pixel_values"]:model_in[k] = torch.stack([b[k] for b in batch]).to(layoutlm_model.device)model_in["pixel_values"] = model_in["pixel_values"].to(layoutlm_model.dtype)with torch.inference_mode():outputs = layoutlm_model(**model_in)logits = outputs.logitspredictions = logits.argmax(-1).squeeze().tolist()if len(predictions) == settings.LAYOUT_MODEL_MAX:predictions = [predictions]all_predictions.extend(predictions)return all_predictions
def match_predictions_to_boxes(encodings, predictions, metadata, sample_lengths, layoutlm_model) -> List[List[BlockType]]:assert len(encodings) == len(predictions) == sum(sample_lengths)assert len(metadata) == len(sample_lengths)page_start = 0page_block_types = []return page_block_types
.\marker\marker\settings.py
import os
from typing import Optional, List, Dictfrom dotenv import find_dotenv
from pydantic import computed_field
from pydantic_settings import BaseSettings
import fitz as pymupdf
import torch
class Settings(BaseSettings):TORCH_DEVICE: Optional[str] = None@computed_field@propertydef TORCH_DEVICE_MODEL(self) -> str:if self.TORCH_DEVICE is not None:return self.TORCH_DEVICEif torch.cuda.is_available():return "cuda"if torch.backends.mps.is_available():return "mps"return "cpu"INFERENCE_RAM: int = 40 VRAM_PER_TASK: float = 2.5 DEFAULT_LANG: str = "English" SUPPORTED_FILETYPES: Dict = {"application/pdf": "pdf","application/epub+zip": "epub","application/x-mobipocket-ebook": "mobi","application/vnd.ms-xpsdocument": "xps","application/x-fictionbook+xml": "fb2"}TEXT_FLAGS: int = pymupdf.TEXTFLAGS_DICT & ~pymupdf.TEXT_PRESERVE_LIGATURES & ~pymupdf.TEXT_PRESERVE_IMAGESINVALID_CHARS: List[str] = [chr(0xfffd), "�"]OCR_DPI: int = 400TESSDATA_PREFIX: str = ""TESSERACT_LANGUAGES: Dict = {"English": "eng","Spanish": "spa","Portuguese": "por","French": "fra","German": "deu","Russian": "rus","Chinese": "chi_sim","Japanese": "jpn","Korean": "kor","Hindi": "hin",}TESSERACT_TIMEOUT: int = 20 SPELLCHECK_LANGUAGES: Dict = {"English": "en","Spanish": "es","Portuguese": "pt","French": "fr","German": "de","Russian": "ru","Chinese": None,"Japanese": None,"Korean": None,"Hindi": None,}OCR_ALL_PAGES: bool = FalseOCR_PARALLEL_WORKERS: int = 2OCR_ENGINE: str = "ocrmypdf"TEXIFY_MODEL_MAX: int = 384 TEXIFY_TOKEN_BUFFER: int = 256 TEXIFY_DPI: int = 96 TEXIFY_BATCH_SIZE: int = 2 if TORCH_DEVICE_MODEL == "cpu" else 6 TEXIFY_MODEL_NAME: str = "vikp/texify"BAD_SPAN_TYPES: List[str] = ["Caption", "Footnote", "Page-footer", "Page-header", "Picture"]LAYOUT_MODEL_MAX: int = 512LAYOUT_CHUNK_OVERLAP: int = 64LAYOUT_DPI: int = 96LAYOUT_MODEL_NAME: str = "vikp/layout_segmenter"LAYOUT_BATCH_SIZE: int = 8 ORDERER_BATCH_SIZE: int = 32 ORDERER_MODEL_NAME: str = "vikp/column_detector"EDITOR_BATCH_SIZE: int = 4EDITOR_MAX_LENGTH: int = 1024EDITOR_MODEL_NAME: str = "vikp/pdf_postprocessor_t5"ENABLE_EDITOR_MODEL: bool = False EDITOR_CUTOFF_THRESH: float = 0.9 RAY_CACHE_PATH: Optional[str] = None RAY_CORES_PER_WORKER: int = 1 DEBUG: bool = False DEBUG_DATA_FOLDER: Optional[str] = NoneDEBUG_LEVEL: int = 0@computed_field@propertydef CUDA(self) -> bool:return "cuda" in self.TORCH_DEVICE@computed_field@propertydef MODEL_DTYPE(self) -> torch.dtype:if self.TORCH_DEVICE_MODEL == "cuda":return torch.bfloat16else:return torch.float32@computed_field@propertydef TEXIFY_DTYPE(self) -> torch.dtype:return torch.float32 if self.TORCH_DEVICE_MODEL == "cpu" else torch.float16class Config:env_file = find_dotenv("local.env")extra = "ignore"
settings = Settings()
.\marker\scripts\verify_benchmark_scores.py
import json
import argparse
def verify_scores(file_path):with open(file_path, 'r') as file:data = json.load(file)multicolcnn_score = data["marker"]["files"]["multicolcnn.pdf"]["score"]switch_trans_score = data["marker"]["files"]["switch_trans.pdf"]["score"]if multicolcnn_score <= 0.4 or switch_trans_score <= 0.4:raise ValueError("One or more scores are below the required threshold of 0.4")
if __name__ == "__main__":parser = argparse.ArgumentParser(description="Verify benchmark scores")parser.add_argument("file_path", type=str, help="Path to the json file")args = parser.parse_args()verify_scores(args.file_path)