Skip to content

Commit

Permalink
async load (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
AmineDiro committed Jul 30, 2024
1 parent a497992 commit a4b5572
Showing 1 changed file with 103 additions and 50 deletions.
153 changes: 103 additions & 50 deletions megaparse/Converter.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,81 @@
import asyncio
from enum import Enum
import os
from docx.document import Document as DocumentObject
from collections import Counter
from enum import Enum
from pathlib import Path
from typing import List, Set

import pandas as pd
from docx import Document
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table
from docx.text.paragraph import Paragraph
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
from docx.text.run import Run
from typing import List
from pathlib import Path
from collections import Counter
from langchain_community.document_loaders.base import BaseLoader
from langchain_core.documents import Document as LangChainDocument
from llama_index.core.schema import Document as LlamaDocument
from llama_parse import LlamaParse
from llama_parse.utils import Language, ResultType
from pptx import Presentation
from pptx.presentation import Presentation as PresentationObject
from pptx.enum.shapes import MSO_SHAPE_TYPE
from typing import List, Set
from llama_parse import LlamaParse
from llama_parse.utils import ResultType, Language
from llama_index.core.schema import Document as LlamaDocument

from megaparse.markdown_processor import MarkdownProcessor
from megaparse.unstructured_convertor import ModelEnum, UnstructuredParser
from pathlib import Path
from llama_index.core import download_loader
from unstructured.partition.auto import partition
import pandas as pd
from megaparse.multimodal_convertor.megaparse_vision import MegaParseVision
from langchain_core.documents import Document as LangChainDocument
from langchain_community.document_loaders.base import BaseLoader
from megaparse.unstructured_convertor import ModelEnum, UnstructuredParser


class Converter:
def __init__(self) -> None:
pass

async def convert(self, file_path: str| Path) -> LangChainDocument:
async def convert(self, file_path: str | Path) -> LangChainDocument:
raise NotImplementedError("Subclasses should implement this method")

def save_md(self, md_content: str, file_path: Path | str) -> None:
with open(file_path, "w") as f:
f.write(md_content)


class XLSXConverter(Converter):
def __init__(self) -> None:
pass

async def convert(self, file_path: str| Path) -> LangChainDocument:
async def convert(self, file_path: str | Path) -> LangChainDocument:
if isinstance(file_path, str):
file_path = Path(file_path)
xls = pd.ExcelFile(file_path) #type: ignore
xls = pd.ExcelFile(file_path) # type: ignore
sheets = pd.read_excel(xls)

target_text = self.table_to_text(sheets)

return LangChainDocument(page_content=target_text, metadata={"filename": file_path.name, "type": "xlsx"})

def convert_tab(self, file_path: str|Path, tab_name: str) -> str:
return LangChainDocument(
page_content=target_text,
metadata={"filename": file_path.name, "type": "xlsx"},
)

def convert_tab(self, file_path: str | Path, tab_name: str) -> str:
if isinstance(file_path, str):
file_path = Path(file_path)
xls = pd.ExcelFile(str(file_path))
sheets = pd.read_excel(xls, tab_name)
target_text = self.table_to_text(sheets)
sheets = pd.read_excel(xls, tab_name)
target_text = self.table_to_text(sheets)
return target_text

def table_to_text(self, df):
text_rows = []
for _, row in df.iterrows():
row_text = " | ".join(str(value) for value in row.values if pd.notna(value))
if row_text:
text_rows.append("|" + row_text + "|")
return "\n".join(text_rows)


class DOCXConverter(Converter):
def __init__(self) -> None:
self.header_handled = False

async def convert(self, file_path: str|Path) -> LangChainDocument:
async def convert(self, file_path: str | Path) -> LangChainDocument:
if isinstance(file_path, str):
file_path = Path(file_path)
doc = Document(str(file_path))
Expand All @@ -92,7 +93,10 @@ async def convert(self, file_path: str|Path) -> LangChainDocument:
md_content += self._handle_table(Table(element, doc))
# Add more handlers here (image, header, footer, etc)

return LangChainDocument(page_content="\n".join(md_content), metadata={"filename": file_path.name, "type": "docx"})
return LangChainDocument(
page_content="\n".join(md_content),
metadata={"filename": file_path.name, "type": "docx"},
)

def _handle_header(self, header) -> str:
if not self.header_handled:
Expand Down Expand Up @@ -164,7 +168,7 @@ def __init__(self, add_images=False) -> None:
self.header_handled = False
self.add_images = add_images

async def convert(self, file_path: str|Path) -> LangChainDocument:
async def convert(self, file_path: str | Path) -> LangChainDocument:
if isinstance(file_path, str):
file_path = Path(file_path)
prs = Presentation(str(file_path))
Expand Down Expand Up @@ -193,7 +197,10 @@ async def convert(self, file_path: str|Path) -> LangChainDocument:
slide_md_str = f"## Slide {i+1}\n{slide_md_str}"
md_content.append(slide_md_str)

return LangChainDocument(page_content = "\n".join(md_content), metadata = {"filename": file_path.name, "type": "pptx"})
return LangChainDocument(
page_content="\n".join(md_content),
metadata={"filename": file_path.name, "type": "pptx"},
)

def _handle_header(self, placeholders) -> str:
if not self.header_handled:
Expand Down Expand Up @@ -240,6 +247,7 @@ def save_md(self, md_content: str, file_path: Path | str) -> None:

class MethodEnum(str, Enum):
"""Method to use for the conversion"""

LLAMA_PARSE = "llama_parse"
UNSTRUCTURED = "unstructured"
MEGAPARSE_VISION = "megaparse_vision"
Expand All @@ -250,8 +258,8 @@ def __init__(
self,
llama_parse_api_key: str,
method: MethodEnum | str = MethodEnum.UNSTRUCTURED,
model = ModelEnum.NONE,
strategy = "fast",
model=ModelEnum.NONE,
strategy="fast",
) -> None:
self.strategy = strategy
self.llama_parse_api_key = llama_parse_api_key
Expand All @@ -262,7 +270,7 @@ def __init__(
raise ValueError(f"Method {method} not supported")
self.method = method

async def _llama_parse(self, api_key: str, file_path: str|Path):
async def _llama_parse(self, api_key: str, file_path: str | Path):
parsing_instructions = "Do not take into account the page breaks (no --- between pages), do not repeat the header and the footer so the tables are merged. Keep the same format for similar tables."
self.parser = LlamaParse(
api_key=str(api_key),
Expand All @@ -279,21 +287,32 @@ async def _llama_parse(self, api_key: str, file_path: str|Path):
parsed_md = parsed_md + text_content
return parsed_md

def _unstructured_parse(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE):
def _unstructured_parse(
self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE
):
unstructured_parser = UnstructuredParser()
return unstructured_parser.convert(file_path, model= model, strategy=self.strategy)

return unstructured_parser.convert(
file_path, model=model, strategy=self.strategy
)

async def _lmm_parse(self, file_path: str | Path):
lmm_parser = MegaParseVision()
return await lmm_parser.parse(file_path)

async def convert(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE, gpt4o_cleaner=False) -> LangChainDocument:
async def convert(
self,
file_path: str | Path,
model: ModelEnum = ModelEnum.NONE,
gpt4o_cleaner=False,
) -> LangChainDocument:
if isinstance(file_path, str):
file_path = Path(file_path)

parsed_md = ""
if self.method == MethodEnum.LLAMA_PARSE:
assert self.llama_parse_api_key is not None, "LLama Parse API key is required for this method"
assert (
self.llama_parse_api_key is not None
), "LLama Parse API key is required for this method"
parsed_md = await self._llama_parse(self.llama_parse_api_key, file_path)
elif self.method == MethodEnum.MEGAPARSE_VISION:
parsed_md = await self._lmm_parse(file_path)
Expand All @@ -303,57 +322,91 @@ async def convert(self, file_path: str | Path, model: ModelEnum = ModelEnum.NONE
raise ValueError(f"Method {self.method} not supported")

if not gpt4o_cleaner:
return LangChainDocument(page_content=parsed_md, metadata={"filename": file_path.name, "type": "pdf"})
return LangChainDocument(
page_content=parsed_md,
metadata={"filename": file_path.name, "type": "pdf"},
)
else:
md_processor = MarkdownProcessor(
parsed_md,
strict=True,
remove_pagination=True,
)
md_cleaned = md_processor.process(gpt4o_cleaner=gpt4o_cleaner)
return LangChainDocument(page_content=md_cleaned, metadata={"filename": file_path.name, "type": "pdf"})
return LangChainDocument(
page_content=md_cleaned,
metadata={"filename": file_path.name, "type": "pdf"},
)

def save_md(self, md_content: str, file_path: Path | str) -> None:
with open(file_path, "w") as f:
f.write(md_content)


class MegaParse(BaseLoader):
def __init__(self, file_path: str| Path, llama_parse_api_key: str | None = None, strategy = "fast") -> None:
def __init__(
self,
file_path: str | Path,
llama_parse_api_key: str | None = None,
strategy="fast",
) -> None:
if isinstance(file_path, str):
file_path = Path(file_path)
self.file_path = file_path
self.llama_parse_api_key = llama_parse_api_key
self.strategy = strategy

async def aload(self, **kwargs) -> LangChainDocument:
file_extension: str = os.path.splitext(self.file_path)[1]
if file_extension == ".docx":
converter = DOCXConverter()
elif file_extension == ".pptx":
converter = PPTXConverter()
elif file_extension == ".pdf":
converter = PDFConverter(
llama_parse_api_key=str(self.llama_parse_api_key),
strategy=self.strategy,
)
elif file_extension == ".xlsx":
converter = XLSXConverter()
else:
raise ValueError(f"Unsupported file extension: {file_extension}")

return await converter.convert(self.file_path, **kwargs)

def load(self, **kwargs) -> LangChainDocument:
file_extension: str = os.path.splitext(self.file_path)[1]
if file_extension == ".docx":
converter = DOCXConverter()
elif file_extension == ".pptx":
converter = PPTXConverter()
elif file_extension == ".pdf":
converter = PDFConverter(llama_parse_api_key=str(self.llama_parse_api_key),strategy=self.strategy)
converter = PDFConverter(
llama_parse_api_key=str(self.llama_parse_api_key),
strategy=self.strategy,
)
elif file_extension == ".xlsx":
converter = XLSXConverter()
else:
print(self.file_path, file_extension)
raise ValueError(f"Unsupported file extension: {file_extension}")

loop = asyncio.get_event_loop()
return loop.run_until_complete(converter.convert(self.file_path, **kwargs))

def load_tab(self, tab_name: str, **kwargs) -> LangChainDocument:
file_extension: str = os.path.splitext(self.file_path)[1]
if file_extension == ".xlsx":
converter = XLSXConverter()
else:
print(self.file_path, file_extension)
raise ValueError(f"Unsupported file extension for tabs: {file_extension}")

result = converter.convert_tab(self.file_path, tab_name= tab_name)
return LangChainDocument(page_content=result, metadata={"filename": self.file_path.name, "type": "xlsx"})

result = converter.convert_tab(self.file_path, tab_name=tab_name)
return LangChainDocument(
page_content=result,
metadata={"filename": self.file_path.name, "type": "xlsx"},
)

def save_md(self, md_content: str, file_path: Path | str) -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
Expand Down

0 comments on commit a4b5572

Please sign in to comment.