Skip to content

Commit

Permalink
Bol Bam: PDF ingestor fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
saraswatpuneet committed Sep 2, 2023
1 parent 16f34f7 commit d55fed7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 40 deletions.
67 changes: 33 additions & 34 deletions querent/ingestors/pdfs/pdf_ingestor_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ class PdfIngestorFactory(IngestorFactory):
async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
return None
return PdfIngestor(processors)
Expand All @@ -26,48 +28,45 @@ def __init__(self, processors: List[AsyncProcessor]):

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[List[str], None]:
) -> AsyncGenerator[str, None]:
current_file = None
collected_bytes = b""
try:
collected_bytes = b"" # Initialize an empty byte string
current_file = None

async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
continue # Skip error bytes
# TODO handle error
continue

# If it's a new file, start collecting bytes for it
if chunk_bytes.file != current_file:
if current_file:
# Process the collected bytes of the previous file
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text
collected_bytes = b"" # Reset collected bytes for the new file
if current_file is None:
current_file = chunk_bytes.file

collected_bytes += chunk_bytes.data # Collect the bytes

# Process the collected bytes of the last file
if current_file:
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text

elif current_file != chunk_bytes.file:
# we have a new file, process the old one
async for page_text in self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield page_text
collected_bytes = b""
current_file = chunk_bytes.file
collected_bytes += chunk_bytes.data
except Exception as e:
yield []

async def extract_and_process_pdf(self, collected_bytes: CollectedBytes) -> List[str]:
text = await self.extract_text_from_pdf(collected_bytes)
return await self.process_data(text)
# TODO handle exception
yield ""
finally:
# process the last file
async for page_text in self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield page_text
pass

async def extract_text_from_pdf(self, collected_bytes: CollectedBytes) -> str:
async def extract_and_process_pdf(
self, collected_bytes: CollectedBytes
) -> AsyncGenerator[str, None]:
pdf = fitz.open(stream=collected_bytes.data, filetype="pdf")
text = ""
for page in pdf:
text += page.getText()
return text
text = page.get_text()
processed_text = await self.process_data(text)
yield processed_text

async def process_data(self, text: str) -> List[str]:
processed_data = text
Expand Down
16 changes: 10 additions & 6 deletions tests/test_pdf_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,34 @@
from querent.ingestors.ingestor_manager import IngestorFactoryManager
import pytest


@pytest.mark.asyncio
async def test_collect_and_ingest_pdf():
# Set up the collector
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

# Set up the ingestor
ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = await ingestor_factory_manager.get_factory("pdf") # Notice the use of await here
ingestor_factory = await ingestor_factory_manager.get_factory(
"pdf"
) # Notice the use of await here
ingestor = await ingestor_factory.create("pdf", [])

# Collect and ingest the PDF
ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
if ingested is not "" or ingested is not None:
counter += 1
assert counter == 1
assert counter == 19

await poll_and_print() # Notice the use of await here


Expand Down

0 comments on commit d55fed7

Please sign in to comment.