Skip to content

Commit

Permalink
Added doc ingestor
Browse files Browse the repository at this point in the history
  • Loading branch information
Ansh5461 committed Sep 11, 2023
1 parent 7b73554 commit 88ef164
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 8 deletions.
4 changes: 3 additions & 1 deletion querent/config/ingestor_config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field


class IngestorBackend(str, Enum):
PDF = "pdf"
TEXT = "txt"
DOCX = "docx"
DOC = "doc"
PPT = "ppt"
PPTX = "pptx"
CSV = "csv"
XLSX = "xlsx"
JSON = "json"
Expand Down
2 changes: 1 addition & 1 deletion querent/ingestors/audio/audio_ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def supports(self, file_extension: str) -> bool:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
if not await self.supports(file_extension):
return None
return AudioIngestor(processors)

Expand Down
89 changes: 89 additions & 0 deletions querent/ingestors/doc/doc_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import List, AsyncGenerator
import pytextract
import tempfile
import os
import pytextract

from querent.processors.async_processor import AsyncProcessor
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.ingestors.base_ingestor import BaseIngestor
from querent.config.ingestor_config import IngestorBackend
from querent.common.types.collected_bytes import CollectedBytes


class DocIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"doc", "docx"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not await self.supports(file_extension):
return None
return DocIngestor(processors)


class DocIngestor(BaseIngestor):
def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.DOC)
self.processors = processors

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[str, None]:
current_file = None
collected_bytes = b""
try:
async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
# TODO handle error
continue
if current_file is None:
current_file = chunk_bytes.file
elif current_file != chunk_bytes.file:
# we have a new file, process the old one
async for text in self.extract_and_process_doc(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield text
collected_bytes = b""
current_file = chunk_bytes.file
collected_bytes += chunk_bytes.data
except Exception as e:
# TODO handle exception
yield ""
finally:
# process the last file
async for text in self.extract_and_process_doc(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield text
pass

async def extract_and_process_doc(
self, collected_bytes: CollectedBytes
) -> AsyncGenerator[str, None]:
text = await self.extract_text_from_doc(collected_bytes)
# print(text)
processed_text = await self.process_data(text)
yield processed_text

async def extract_text_from_doc(self, collected_bytes: CollectedBytes) -> str:
suffix = "." + collected_bytes.extension
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp_file:
temp_file.write(collected_bytes.data)

temp_file_path = temp_file.name
try:
txt = pytextract.process(temp_file_path).decode("utf-8")
return txt
finally:
os.remove(temp_file_path)

async def process_data(self, text: str) -> List[str]:
processed_data = text
for processor in self.processors:
processed_data = await processor.process(processed_data)
return processed_data
3 changes: 3 additions & 0 deletions querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from querent.ingestors.audio.audio_ingestors import AudioIngestorFactory
from querent.ingestors.json.json_ingestor import JsonIngestorFactory
from querent.ingestors.images.image_ingestor import ImageIngestorFactory
from querent.ingestors.doc.doc_ingestor import DocIngestorFactory


class IngestorFactoryManager:
Expand All @@ -24,6 +25,8 @@ def __init__(self):
IngestorBackend.JSON.value: JsonIngestorFactory(),
IngestorBackend.JPG.value: ImageIngestorFactory(),
IngestorBackend.PNG.value: ImageIngestorFactory(),
IngestorBackend.DOCX.value: DocIngestorFactory(),
IngestorBackend.DOC.value: DocIngestorFactory(),
# Ingestor.TEXT.value: TextIngestor(),
# Add more mappings as needed
}
Expand Down
2 changes: 1 addition & 1 deletion querent/ingestors/json/json_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def supports(self, file_extension: str) -> bool:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
if not await self.supports(file_extension):
return None
return JsonIngestor(processors)

Expand Down
2 changes: 1 addition & 1 deletion querent/ingestors/pdfs/pdf_ingestor_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def supports(self, file_extension: str) -> bool:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
if not await self.supports(file_extension):
return None
return PdfIngestor(processors)

Expand Down
2 changes: 1 addition & 1 deletion querent/ingestors/texts/text_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def supports(self, file_extension: str) -> bool:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
if not await self.supports(file_extension):
return None

return TextIngestor(processors)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,4 @@ pydub
SpeechRecognition
pytesseract
pillow
pytextract
Binary file added tests/data/doc/7283738976.docx
Binary file not shown.
Binary file added tests/data/doc/demo-file.doc
Binary file not shown.
4 changes: 2 additions & 2 deletions tests/test_audio_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
if len(ingested) != 0:
counter += 1

assert counter == 1
assert counter == 1

await poll_and_print()

Expand Down
41 changes: 41 additions & 0 deletions tests/test_doc_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Test cases for audio ingestors"""
from pathlib import Path
import pytest
import asyncio

from querent.collectors.fs.fs_collector import FSCollectorFactory
from querent.config.collector_config import FSCollectorConfig
from querent.common.uri import Uri
from querent.ingestors.ingestor_manager import IngestorFactoryManager


@pytest.mark.asyncio
async def test_collect_and_ingest_audio():
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/doc/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = await ingestor_factory_manager.get_factory("doc")

ingestor = await ingestor_factory.create("doc", [])

# Collect and ingest the PDF
ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) != 0:
counter += 1

assert counter == 2

await poll_and_print()


if __name__ == "__main__":
asyncio.run(test_collect_and_ingest_audio())
2 changes: 1 addition & 1 deletion tests/test_image_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
if len(ingested) != 0:
counter += 1
assert counter == 1

Expand Down

0 comments on commit 88ef164

Please sign in to comment.