From a4b557237a3ac846170fa474649954ba5b9d2abd Mon Sep 17 00:00:00 2001 From: AmineDiro Date: Tue, 30 Jul 2024 17:09:45 +0200 Subject: [PATCH] async load (#70) --- megaparse/Converter.py | 153 +++++++++++++++++++++++++++-------------- 1 file changed, 103 insertions(+), 50 deletions(-) diff --git a/megaparse/Converter.py b/megaparse/Converter.py index 77250a8..11d1cfe 100644 --- a/megaparse/Converter.py +++ b/megaparse/Converter.py @@ -1,66 +1,67 @@ import asyncio -from enum import Enum import os -from docx.document import Document as DocumentObject +from collections import Counter +from enum import Enum +from pathlib import Path +from typing import List, Set + +import pandas as pd from docx import Document +from docx.oxml.table import CT_Tbl +from docx.oxml.text.paragraph import CT_P from docx.table import Table from docx.text.paragraph import Paragraph -from docx.oxml.text.paragraph import CT_P -from docx.oxml.table import CT_Tbl from docx.text.run import Run -from typing import List -from pathlib import Path -from collections import Counter +from langchain_community.document_loaders.base import BaseLoader +from langchain_core.documents import Document as LangChainDocument +from llama_index.core.schema import Document as LlamaDocument +from llama_parse import LlamaParse +from llama_parse.utils import Language, ResultType from pptx import Presentation -from pptx.presentation import Presentation as PresentationObject from pptx.enum.shapes import MSO_SHAPE_TYPE -from typing import List, Set -from llama_parse import LlamaParse -from llama_parse.utils import ResultType, Language -from llama_index.core.schema import Document as LlamaDocument + from megaparse.markdown_processor import MarkdownProcessor -from megaparse.unstructured_convertor import ModelEnum, UnstructuredParser -from pathlib import Path -from llama_index.core import download_loader -from unstructured.partition.auto import partition -import pandas as pd from megaparse.multimodal_convertor.megaparse_vision import MegaParseVision -from langchain_core.documents import Document as LangChainDocument -from langchain_community.document_loaders.base import BaseLoader +from megaparse.unstructured_convertor import ModelEnum, UnstructuredParser + class Converter: def __init__(self) -> None: pass - async def convert(self, file_path: str| Path) -> LangChainDocument: + async def convert(self, file_path: str | Path) -> LangChainDocument: raise NotImplementedError("Subclasses should implement this method") def save_md(self, md_content: str, file_path: Path | str) -> None: with open(file_path, "w") as f: f.write(md_content) + class XLSXConverter(Converter): def __init__(self) -> None: pass - async def convert(self, file_path: str| Path) -> LangChainDocument: + async def convert(self, file_path: str | Path) -> LangChainDocument: if isinstance(file_path, str): file_path = Path(file_path) - xls = pd.ExcelFile(file_path) #type: ignore + xls = pd.ExcelFile(file_path) # type: ignore sheets = pd.read_excel(xls) target_text = self.table_to_text(sheets) - return LangChainDocument(page_content=target_text, metadata={"filename": file_path.name, "type": "xlsx"}) - - def convert_tab(self, file_path: str|Path, tab_name: str) -> str: + return LangChainDocument( + page_content=target_text, + metadata={"filename": file_path.name, "type": "xlsx"}, + ) + + def convert_tab(self, file_path: str | Path, tab_name: str) -> str: if isinstance(file_path, str): file_path = Path(file_path) xls = pd.ExcelFile(str(file_path)) - sheets = pd.read_excel(xls, tab_name) - target_text = self.table_to_text(sheets) + sheets = pd.read_excel(xls, tab_name) + target_text = self.table_to_text(sheets) return target_text - + def table_to_text(self, df): text_rows = [] for _, row in df.iterrows(): @@ -68,13 +69,13 @@ def table_to_text(self, df): if row_text: text_rows.append("|" + row_text + "|") return "\n".join(text_rows) - + class DOCXConverter(Converter): def __init__(self) -> None: self.header_handled = False - async def convert(self, file_path: str|Path) -> LangChainDocument: + async def convert(self, file_path: str | Path) -> LangChainDocument: if isinstance(file_path, str): file_path = Path(file_path) doc = Document(str(file_path)) @@ -92,7 +93,10 @@ async def convert(self, file_path: str|Path) -> LangChainDocument: md_content += self._handle_table(Table(element, doc)) # Add more handlers here (image, header, footer, etc) - return LangChainDocument(page_content="\n".join(md_content), metadata={"filename": file_path.name, "type": "docx"}) + return LangChainDocument( + page_content="\n".join(md_content), + metadata={"filename": file_path.name, "type": "docx"}, + ) def _handle_header(self, header) -> str: if not self.header_handled: @@ -164,7 +168,7 @@ def __init__(self, add_images=False) -> None: self.header_handled = False self.add_images = add_images - async def convert(self, file_path: str|Path) -> LangChainDocument: + async def convert(self, file_path: str | Path) -> LangChainDocument: if isinstance(file_path, str): file_path = Path(file_path) prs = Presentation(str(file_path)) @@ -193,7 +197,10 @@ async def convert(self, file_path: str|Path) -> LangChainDocument: slide_md_str = f"## Slide {i+1}\n{slide_md_str}" md_content.append(slide_md_str) - return LangChainDocument(page_content = "\n".join(md_content), metadata = {"filename": file_path.name, "type": "pptx"}) + return LangChainDocument( + page_content="\n".join(md_content), + metadata={"filename": file_path.name, "type": "pptx"}, + ) def _handle_header(self, placeholders) -> str: if not self.header_handled: @@ -240,6 +247,7 @@ def save_md(self, md_content: str, file_path: Path | str) -> None: class MethodEnum(str, Enum): """Method to use for the conversion""" + LLAMA_PARSE = "llama_parse" UNSTRUCTURED = "unstructured" MEGAPARSE_VISION = "megaparse_vision" @@ -250,8 +258,8 @@ def __init__( self, llama_parse_api_key: str, method: MethodEnum | str = MethodEnum.UNSTRUCTURED, - model = ModelEnum.NONE, - strategy = "fast", + model=ModelEnum.NONE, + strategy="fast", ) -> None: self.strategy = strategy self.llama_parse_api_key = llama_parse_api_key @@ -262,7 +270,7 @@ def __init__( raise ValueError(f"Method {method} not supported") self.method = method - async def _llama_parse(self, api_key: str, file_path: str|Path): + async def _llama_parse(self, api_key: str, file_path: str | Path): parsing_instructions = "Do not take into account the page breaks (no --- between pages), do not repeat the header and the footer so the tables are merged. Keep the same format for similar tables." self.parser = LlamaParse( api_key=str(api_key), @@ -279,21 +287,32 @@ async def _llama_parse(self, api_key: str, file_path: str|Path): parsed_md = parsed_md + text_content return parsed_md - def _unstructured_parse(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE): + def _unstructured_parse( + self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE + ): unstructured_parser = UnstructuredParser() - return unstructured_parser.convert(file_path, model= model, strategy=self.strategy) - + return unstructured_parser.convert( + file_path, model=model, strategy=self.strategy + ) + async def _lmm_parse(self, file_path: str | Path): lmm_parser = MegaParseVision() return await lmm_parser.parse(file_path) - async def convert(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE, gpt4o_cleaner=False) -> LangChainDocument: + async def convert( + self, + file_path: str | Path, + model: ModelEnum = ModelEnum.NONE, + gpt4o_cleaner=False, + ) -> LangChainDocument: if isinstance(file_path, str): file_path = Path(file_path) parsed_md = "" if self.method == MethodEnum.LLAMA_PARSE: - assert self.llama_parse_api_key is not None, "LLama Parse API key is required for this method" + assert ( + self.llama_parse_api_key is not None + ), "LLama Parse API key is required for this method" parsed_md = await self._llama_parse(self.llama_parse_api_key, file_path) elif self.method == MethodEnum.MEGAPARSE_VISION: parsed_md = await self._lmm_parse(file_path) @@ -303,7 +322,10 @@ async def convert(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE raise ValueError(f"Method {self.method} not supported") if not gpt4o_cleaner: - return LangChainDocument(page_content=parsed_md, metadata={"filename": file_path.name, "type": "pdf"}) + return LangChainDocument( + page_content=parsed_md, + metadata={"filename": file_path.name, "type": "pdf"}, + ) else: md_processor = MarkdownProcessor( parsed_md, @@ -311,7 +333,10 @@ async def convert(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE remove_pagination=True, ) md_cleaned = md_processor.process(gpt4o_cleaner=gpt4o_cleaner) - return LangChainDocument(page_content=md_cleaned, metadata={"filename": file_path.name, "type": "pdf"}) + return LangChainDocument( + page_content=md_cleaned, + metadata={"filename": file_path.name, "type": "pdf"}, + ) def save_md(self, md_content: str, file_path: Path | str) -> None: with open(file_path, "w") as f: @@ -319,13 +344,36 @@ def save_md(self, md_content: str, file_path: Path | str) -> None: class MegaParse(BaseLoader): - def __init__(self, file_path: str| Path, llama_parse_api_key: str | None = None, strategy = "fast") -> None: + def __init__( + self, + file_path: str | Path, + llama_parse_api_key: str | None = None, + strategy="fast", + ) -> None: if isinstance(file_path, str): file_path = Path(file_path) self.file_path = file_path self.llama_parse_api_key = llama_parse_api_key self.strategy = strategy + async def aload(self, **kwargs) -> LangChainDocument: + file_extension: str = os.path.splitext(self.file_path)[1] + if file_extension == ".docx": + converter = DOCXConverter() + elif file_extension == ".pptx": + converter = PPTXConverter() + elif file_extension == ".pdf": + converter = PDFConverter( + llama_parse_api_key=str(self.llama_parse_api_key), + strategy=self.strategy, + ) + elif file_extension == ".xlsx": + converter = XLSXConverter() + else: + raise ValueError(f"Unsupported file extension: {file_extension}") + + return await converter.convert(self.file_path, **kwargs) + def load(self, **kwargs) -> LangChainDocument: file_extension: str = os.path.splitext(self.file_path)[1] if file_extension == ".docx": @@ -333,16 +381,19 @@ def load(self, **kwargs) -> LangChainDocument: elif file_extension == ".pptx": converter = PPTXConverter() elif file_extension == ".pdf": - converter = PDFConverter(llama_parse_api_key=str(self.llama_parse_api_key),strategy=self.strategy) + converter = PDFConverter( + llama_parse_api_key=str(self.llama_parse_api_key), + strategy=self.strategy, + ) elif file_extension == ".xlsx": converter = XLSXConverter() else: print(self.file_path, file_extension) raise ValueError(f"Unsupported file extension: {file_extension}") - + loop = asyncio.get_event_loop() return loop.run_until_complete(converter.convert(self.file_path, **kwargs)) - + def load_tab(self, tab_name: str, **kwargs) -> LangChainDocument: file_extension: str = os.path.splitext(self.file_path)[1] if file_extension == ".xlsx": @@ -350,10 +401,12 @@ def load_tab(self, tab_name: str, **kwargs) -> LangChainDocument: else: print(self.file_path, file_extension) raise ValueError(f"Unsupported file extension for tabs: {file_extension}") - - result = converter.convert_tab(self.file_path, tab_name= tab_name) - return LangChainDocument(page_content=result, metadata={"filename": self.file_path.name, "type": "xlsx"}) + result = converter.convert_tab(self.file_path, tab_name=tab_name) + return LangChainDocument( + page_content=result, + metadata={"filename": self.file_path.name, "type": "xlsx"}, + ) def save_md(self, md_content: str, file_path: Path | str) -> None: os.makedirs(os.path.dirname(file_path), exist_ok=True)