diff --git a/querent/config/ingestor_config.py b/querent/config/ingestor_config.py index 5fe94501..410e1dbe 100644 --- a/querent/config/ingestor_config.py +++ b/querent/config/ingestor_config.py @@ -1,12 +1,14 @@ from enum import Enum from typing import Optional -from pydantic import BaseModel, Field class IngestorBackend(str, Enum): PDF = "pdf" TEXT = "txt" DOCX = "docx" + DOC = "doc" + PPT = "ppt" + PPTX = "pptx" CSV = "csv" XLSX = "xlsx" JSON = "json" diff --git a/querent/ingestors/audio/audio_ingestors.py b/querent/ingestors/audio/audio_ingestors.py index 69923cc1..fbc682ea 100644 --- a/querent/ingestors/audio/audio_ingestors.py +++ b/querent/ingestors/audio/audio_ingestors.py @@ -19,7 +19,7 @@ async def supports(self, file_extension: str) -> bool: async def create( self, file_extension: str, processors: List[AsyncProcessor] ) -> BaseIngestor: - if not self.supports(file_extension): + if not await self.supports(file_extension): return None return AudioIngestor(processors) diff --git a/querent/ingestors/doc/doc_ingestor.py b/querent/ingestors/doc/doc_ingestor.py new file mode 100644 index 00000000..84e079e8 --- /dev/null +++ b/querent/ingestors/doc/doc_ingestor.py @@ -0,0 +1,89 @@ +from typing import List, AsyncGenerator +import pytextract +import tempfile +import os +import pytextract + +from querent.processors.async_processor import AsyncProcessor +from querent.ingestors.ingestor_factory import IngestorFactory +from querent.ingestors.base_ingestor import BaseIngestor +from querent.config.ingestor_config import IngestorBackend +from querent.common.types.collected_bytes import CollectedBytes + + +class DocIngestorFactory(IngestorFactory): + SUPPORTED_EXTENSIONS = {"doc", "docx"} + + async def supports(self, file_extension: str) -> bool: + return file_extension.lower() in self.SUPPORTED_EXTENSIONS + + async def create( + self, file_extension: str, processors: List[AsyncProcessor] + ) -> BaseIngestor: + if not await self.supports(file_extension): + return None + return DocIngestor(processors) + + +class DocIngestor(BaseIngestor): + def __init__(self, processors: List[AsyncProcessor]): + super().__init__(IngestorBackend.DOC) + self.processors = processors + + async def ingest( + self, poll_function: AsyncGenerator[CollectedBytes, None] + ) -> AsyncGenerator[str, None]: + current_file = None + collected_bytes = b"" + try: + async for chunk_bytes in poll_function: + if chunk_bytes.is_error(): + # TODO handle error + continue + if current_file is None: + current_file = chunk_bytes.file + elif current_file != chunk_bytes.file: + # we have a new file, process the old one + async for text in self.extract_and_process_doc( + CollectedBytes(file=current_file, data=collected_bytes) + ): + yield text + collected_bytes = b"" + current_file = chunk_bytes.file + collected_bytes += chunk_bytes.data + except Exception as e: + # TODO handle exception + yield "" + finally: + # process the last file + async for text in self.extract_and_process_doc( + CollectedBytes(file=current_file, data=collected_bytes) + ): + yield text + pass + + async def extract_and_process_doc( + self, collected_bytes: CollectedBytes + ) -> AsyncGenerator[str, None]: + text = await self.extract_text_from_doc(collected_bytes) + # print(text) + processed_text = await self.process_data(text) + yield processed_text + + async def extract_text_from_doc(self, collected_bytes: CollectedBytes) -> str: + suffix = "." + collected_bytes.extension + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp_file: + temp_file.write(collected_bytes.data) + + temp_file_path = temp_file.name + try: + txt = pytextract.process(temp_file_path).decode("utf-8") + return txt + finally: + os.remove(temp_file_path) + + async def process_data(self, text: str) -> List[str]: + processed_data = text + for processor in self.processors: + processed_data = await processor.process(processed_data) + return processed_data diff --git a/querent/ingestors/ingestor_manager.py b/querent/ingestors/ingestor_manager.py index fe1d9c71..4ed7e1f0 100644 --- a/querent/ingestors/ingestor_manager.py +++ b/querent/ingestors/ingestor_manager.py @@ -10,6 +10,7 @@ from querent.ingestors.audio.audio_ingestors import AudioIngestorFactory from querent.ingestors.json.json_ingestor import JsonIngestorFactory from querent.ingestors.images.image_ingestor import ImageIngestorFactory +from querent.ingestors.doc.doc_ingestor import DocIngestorFactory class IngestorFactoryManager: @@ -24,6 +25,8 @@ def __init__(self): IngestorBackend.JSON.value: JsonIngestorFactory(), IngestorBackend.JPG.value: ImageIngestorFactory(), IngestorBackend.PNG.value: ImageIngestorFactory(), + IngestorBackend.DOCX.value: DocIngestorFactory(), + IngestorBackend.DOC.value: DocIngestorFactory(), # Ingestor.TEXT.value: TextIngestor(), # Add more mappings as needed } diff --git a/querent/ingestors/json/json_ingestor.py b/querent/ingestors/json/json_ingestor.py index 6500d0e3..0bc04069 100644 --- a/querent/ingestors/json/json_ingestor.py +++ b/querent/ingestors/json/json_ingestor.py @@ -16,7 +16,7 @@ async def supports(self, file_extension: str) -> bool: async def create( self, file_extension: str, processors: List[AsyncProcessor] ) -> BaseIngestor: - if not self.supports(file_extension): + if not await self.supports(file_extension): return None return JsonIngestor(processors) diff --git a/querent/ingestors/pdfs/pdf_ingestor_v1.py b/querent/ingestors/pdfs/pdf_ingestor_v1.py index 8ea726fa..a93c0382 100644 --- a/querent/ingestors/pdfs/pdf_ingestor_v1.py +++ b/querent/ingestors/pdfs/pdf_ingestor_v1.py @@ -16,7 +16,7 @@ async def supports(self, file_extension: str) -> bool: async def create( self, file_extension: str, processors: List[AsyncProcessor] ) -> BaseIngestor: - if not self.supports(file_extension): + if not await self.supports(file_extension): return None return PdfIngestor(processors) diff --git a/querent/ingestors/texts/text_ingestor.py b/querent/ingestors/texts/text_ingestor.py index 187156c0..f2b79dac 100644 --- a/querent/ingestors/texts/text_ingestor.py +++ b/querent/ingestors/texts/text_ingestor.py @@ -15,7 +15,7 @@ async def supports(self, file_extension: str) -> bool: async def create( self, file_extension: str, processors: List[AsyncProcessor] ) -> BaseIngestor: - if not self.supports(file_extension): + if not await self.supports(file_extension): return None return TextIngestor(processors) diff --git a/requirements.txt b/requirements.txt index c8a51886..68aa7a5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -160,3 +160,4 @@ pydub SpeechRecognition pytesseract pillow +pytextract diff --git a/tests/data/doc/7283738976.docx b/tests/data/doc/7283738976.docx new file mode 100755 index 00000000..87edc7da Binary files /dev/null and b/tests/data/doc/7283738976.docx differ diff --git a/tests/data/doc/demo-file.doc b/tests/data/doc/demo-file.doc new file mode 100755 index 00000000..09fd37e9 Binary files /dev/null and b/tests/data/doc/demo-file.doc differ diff --git a/tests/test_audio_ingestor.py b/tests/test_audio_ingestor.py index df98e17f..f6d0fe2b 100644 --- a/tests/test_audio_ingestor.py +++ b/tests/test_audio_ingestor.py @@ -29,10 +29,10 @@ async def poll_and_print(): counter = 0 async for ingested in ingested_call: assert ingested is not None - if len(ingested) == 0: + if len(ingested) != 0: counter += 1 - assert counter == 1 + assert counter == 1 await poll_and_print() diff --git a/tests/test_doc_ingestor.py b/tests/test_doc_ingestor.py new file mode 100644 index 00000000..76709ced --- /dev/null +++ b/tests/test_doc_ingestor.py @@ -0,0 +1,41 @@ +"""Test cases for audio ingestors""" +from pathlib import Path +import pytest +import asyncio + +from querent.collectors.fs.fs_collector import FSCollectorFactory +from querent.config.collector_config import FSCollectorConfig +from querent.common.uri import Uri +from querent.ingestors.ingestor_manager import IngestorFactoryManager + + +@pytest.mark.asyncio +async def test_collect_and_ingest_audio(): + collector_factory = FSCollectorFactory() + uri = Uri("file://" + str(Path("./tests/data/doc/").resolve())) + config = FSCollectorConfig(root_path=uri.path) + collector = collector_factory.resolve(uri, config) + + ingestor_factory_manager = IngestorFactoryManager() + ingestor_factory = await ingestor_factory_manager.get_factory("doc") + + ingestor = await ingestor_factory.create("doc", []) + + # Collect and ingest the PDF + ingested_call = ingestor.ingest(collector.poll()) + counter = 0 + + async def poll_and_print(): + counter = 0 + async for ingested in ingested_call: + assert ingested is not None + if len(ingested) != 0: + counter += 1 + + assert counter == 2 + + await poll_and_print() + + +if __name__ == "__main__": + asyncio.run(test_collect_and_ingest_audio()) diff --git a/tests/test_image_ingestor.py b/tests/test_image_ingestor.py index 5b1a9098..370c82fa 100644 --- a/tests/test_image_ingestor.py +++ b/tests/test_image_ingestor.py @@ -25,7 +25,7 @@ async def poll_and_print(): counter = 0 async for ingested in ingested_call: assert ingested is not None - if len(ingested) == 0: + if len(ingested) != 0: counter += 1 assert counter == 1