Skip to content

Commit

Permalink
Added doc source (#292)
Browse files Browse the repository at this point in the history
* Added doc_source in collectors and ingestors

* Added doc_source in engines

* Updated test case assertions

* Updated doc_source

* Changed version

* Stringified event_type

* trying queue implementation

* added prints

* workflow use queue now

* workflow use queue now

* workflow use queue now

* Fixes

* Added enum in eventtype

* Removed enum

* added prints

* changes in file buffer

* corrected doc ingestor

* removed prints and minor fix

* Final fixes

* Fixed assertion

* Fixed storage files

* Final fixes

* cleanup

* cleanup

* cleanups

---------

Co-authored-by: ngupta10 <[email protected]>
Co-authored-by: Puneet Saraswat <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2024
1 parent 2559a2e commit 32b08f7
Show file tree
Hide file tree
Showing 51 changed files with 188 additions and 327 deletions.
3 changes: 0 additions & 3 deletions querent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
# System Callbacks
from .callback import *

# System Insights
from .insights import *

# System Logging and Utilities
from .core import *
from .utils import *
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/aws/aws_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
for obj in response.get("Contents", []):
file = self.download_object_as_byte_stream(obj["Key"])
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=obj["Key"], data=chunk, error=None)
yield CollectedBytes(file=obj["Key"], data=None, error=None, eof=True)
yield CollectedBytes(file=obj["Key"], data=chunk, error=None, doc_source=f"s3://{self.bucket_name}/{self.region}")
yield CollectedBytes(file=obj["Key"], data=None, error=None, eof=True, doc_source=f"s3://{self.bucket_name}/{self.region}")

except PermissionError as exc:
self.logger.error(f"Getting Permission Error on file {file}, as {exc}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/azure/azure_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
self.container_client, blob.name
)
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=blob.name, data=chunk, error=None)
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True)
yield CollectedBytes(file=blob.name, data=chunk, error=None, doc_source=f"azure://{self.container_name}/{self.account_url}")
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True, doc_source=f"azure://{self.container_name}/{self.account_url}")
except Exception as e:
# Handle exceptions gracefully, e.g., log the error
self.logger.error(f"Error polling Azure Blob Storage: {e}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/drive/google_drive_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
self.logger.info("No files found in Google Drive")
for file in files:
async for chunk in self.read_chunks(file["id"]):
yield CollectedBytes(data=chunk, file=file["name"])
yield CollectedBytes(data=None, file=file["name"], eof=True)
yield CollectedBytes(data=chunk, file=file["name"], doc_source=f"drive://{self.folder_to_crawl}")
yield CollectedBytes(data=None, file=file["name"], eof=True, doc_source=f"drive://{self.folder_to_crawl}")
except Exception as e:
raise common_errors.PollingError(
f"Failed to poll Google Drive: {str(e)}"
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/dropbox/dropbox_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:

file_content_bytes = response.content
async for chunk in self.stream_blob(file_content_bytes):
yield CollectedBytes(file=entry.name, data=chunk)
yield CollectedBytes(file=entry.name, data=None, eof=True)
yield CollectedBytes(file=entry.name, data=chunk, doc_source=f"dropbox://{self.folder_path}")
yield CollectedBytes(file=entry.name, data=None, eof=True, doc_source=f"dropbox://{self.folder_path}")
except dropbox.exceptions.ApiError as e:
self.logger.error(f"Error polling Dropbox: {e}")
raise common_errors.PollingError(
Expand Down
2 changes: 2 additions & 0 deletions querent/collectors/email/email_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
yield CollectedBytes(
data=message,
file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email",
doc_source=f"email://{self.config.imap_server}/{self.config.imap_folder}"
)
yield CollectedBytes(
data=None,
file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email",
eof=True,
doc_source=f"email://{self.config.imap_server}/{self.config.imap_folder}"
)
except imaplib.IMAP4.error as e:
self.logger.error(f"Error fetching emails from IMAP server: {e}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/fs/fs_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
async with aiofiles.open(file_path, "rb") as file:
file_path_str = str(file_path)
async for chunk in self.read_chunks(file):
yield CollectedBytes(file=file_path_str, data=chunk, error=None)
yield CollectedBytes(file=file_path_str, data=chunk, error=None, doc_source=f"file://{self.root_dir}")
yield CollectedBytes(
file=file_path_str, data=None, error=None, eof=True
file=file_path_str, data=None, error=None, eof=True, doc_source=f"file://{self.root_dir}"
)
except PermissionError as exc:
raise common_errors.PermissionError(
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/gcs/gcs_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
blobs = list(bucket.list_blobs()) # Convert to a list
for blob in blobs:
async for chunk in self.stream_blob(blob):
yield CollectedBytes(file=blob.name, data=chunk, error=None)
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True)
yield CollectedBytes(file=blob.name, data=chunk, error=None, doc_source=f"gcs://{self.bucket_name}")
yield CollectedBytes(file=blob.name, data=None, error=None, eof=True, doc_source=f"gcs://{self.bucket_name}")
except Exception as e:
# Handle exceptions gracefully, e.g., log the error
self.logger.error(f"Error connecting to GCS: {e}")
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/github/github_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ async def fetch_files_in_folder(self, api_url):
file_response.raise_for_status()
file_contents = await file_response.read()
# Assume process_data() is correctly implemented for your file type
yield CollectedBytes(file=item["name"], data=file_contents)
yield CollectedBytes(file=item["name"], data=file_contents, doc_source=f"github://{self.repository}")

yield CollectedBytes(file = item["name"], data = None, eof = True)
yield CollectedBytes(file = item["name"], data = None, eof = True, doc_source=f"github://{self.repository}")
elif item["type"] == "dir":
# Recursively fetch files in subfolders
async for sub_item in self.fetch_files_in_folder(item["url"]):
Expand Down
4 changes: 2 additions & 2 deletions querent/collectors/jira/jira_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
for issue in issues:
json_issue = json.dumps(issue.raw).encode("utf-8")
yield CollectedBytes(
data=json_issue, file=f"jira_issue_{issue.key}.json.jira"
data=json_issue, file=f"jira_issue_{issue.key}.json.jira", doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}"
)
yield CollectedBytes(
data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True
data=None, file=f"jira_issue_{issue.key}.json.jira", eof=True, doc_source=f"jira://{self.config.jira_server}/{self.config.jira_project}"
)

except common_errors.ConnectionError as e:
Expand Down
10 changes: 5 additions & 5 deletions querent/collectors/news/news_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
publish_date = article.get('publishedAt').split('T')[0]

title = article['title']
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"))
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True)
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"), doc_source=f"news://{self.config.query}")
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True, doc_source=f"news://{self.config.query}")

total_results = response.get("totalResults", 0)
total_pages = (
Expand All @@ -93,7 +93,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
break
except Exception as e:
self.logger.error(f"Error fetching news articles: {e}")
yield CollectedBytes(file="Error", data=None, error=e)
yield CollectedBytes(file="Error", data=None, error=e, doc_source=f"news://{self.config.query}")
break

# After exhausting the current batch, reset for next polling cycle
Expand Down Expand Up @@ -137,8 +137,8 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:

publish_date = article.get('publishedAt').split('T')[0]
title = article['title']
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"))
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True)
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"), doc_source=f"news://{self.config.query}")
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True, doc_source=f"news://{self.config.query}")
except Exception as e:
self.logger.error(f"Error fetching news articles: {e}")

Expand Down
2 changes: 2 additions & 0 deletions querent/collectors/slack/slack_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
yield CollectedBytes(
file=f"slack://{self.channel}.slack",
data=bytes(message["text"] + "\n\n", "utf-8"),
doc_source = f"slack://{self.channel}"
)

if not response["has_more"]:
Expand All @@ -71,6 +72,7 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
data=None,
error=None,
eof=True,
doc_source = f"slack://{self.channel}"
)
break
else:
Expand Down
2 changes: 1 addition & 1 deletion querent/collectors/webscaper/web_scraper_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def poll(self):
while urls_to_scrape:
url = urls_to_scrape.pop()
content = await self.scrape_website(url)
yield CollectedBytes(file=None, data=content.data, error=None)
yield CollectedBytes(file=None, data=content.data, error=None, doc_source=self.website_url)
# Find and add links from this page to the list of URLs to scrape
new_urls = self.extract_links(url)
urls_to_scrape.extend(new_urls)
Expand Down
3 changes: 2 additions & 1 deletion querent/common/types/collected_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@


class CollectedBytes:
def __init__(self, file: str, data: bytes, error: str = None, eof: bool = False):
def __init__(self, file: str, data: bytes, error: str = None, eof: bool = False, doc_source = str):
self.data = data
self.error = error
self.file = file
self.eof = eof
self.doc_source = doc_source
if self.file:
file = str(file)
self.extension = file.split(".")[-1]
Expand Down
17 changes: 7 additions & 10 deletions querent/common/types/file_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,19 @@ def __init__(self):

def add_chunk(self, filename, chunk):
try:
if chunk is None:
return self.end_file(filename)

if filename not in self.file_chunks:
self.file_chunks[filename] = {}

# Automatically assign a chunk ID

if chunk is None:
return self.end_file(filename)

chunk_id = len(self.file_chunks[filename])
self.file_chunks[filename][chunk_id] = chunk

return filename, None

except Exception as e:
self.logger.error(f"Error adding a chunk: {e}")
raise Exception(f"An error occurred while adding a chunk: {e}")
return filename, None

def end_file(self, filename):
try:
Expand All @@ -56,10 +54,9 @@ def end_file(self, filename):
del self.file_chunks[filename] # Clear the file entry
return filename, full_content
else:
raise Exception(f"No chunks found for file: {filename}")
return filename, None
except Exception as e:
self.logger.error(f"Error ending the file: {e}")
raise Exception(f"An error occurred while ending the file: {e}")
return filename, None

def get_content(self, filename):
try:
Expand Down
3 changes: 2 additions & 1 deletion querent/common/types/ingested_code.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
class IngestedCode:
"""Class for ingested code type of data"""

def __init__(self, file: str, data: [str], error: str = None) -> None:
def __init__(self, file: str, data: [str], doc_source: str, error: str = None) -> None:
self.data = data
self.error = error
self.file = file
self.doc_source = doc_source
file = str(file)
self.extension = file.rsplit(".", maxsplit=1)[-1]

Expand Down
2 changes: 2 additions & 0 deletions querent/common/types/ingested_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(
coordinates: list = [],
ocr_text: list = [],
error: str = None,
doc_source = str,
) -> None:
self.file = file
self.text = text
Expand All @@ -21,6 +22,7 @@ def __init__(
self.page_num = page_num
self.coordinates = coordinates
self.ocr_text = ocr_text
self.doc_source = doc_source
file = str(file)
self.extension = file.split(".")[-1]
self.file_id = file.split("/")[-1].split(".")[0]
Expand Down
5 changes: 3 additions & 2 deletions querent/common/types/ingested_tokens.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Union
from typing import Union, List


class IngestedTokens:
def __init__(
self, file: str, data: [str], error: str = None, is_token_stream=False
self, file: str, data: List[str], error: str = None, is_token_stream=False, doc_source=str
) -> None:
self.data = data
self.error = error
self.is_token_stream = is_token_stream
self.doc_source = doc_source
if file:
self.file = file
file = str(file)
Expand Down
3 changes: 2 additions & 1 deletion querent/common/types/querent_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ class EventType:


class EventState:
def __init__(self, event_type: EventType, timestamp: float, payload: Any, file: str):
def __init__(self, event_type: EventType, timestamp: float, payload: Any, file: str, doc_source: str):
self.event_type = event_type
self.timestamp = timestamp
self.payload = payload
self.file = file
self.file = file
self.doc_source = doc_source
4 changes: 4 additions & 0 deletions querent/common/types/querent_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ def empty(self):
"""
return self.queue.empty()

def qsize(self):

return self.queue.qsize()

def __aiter__(self):
return self
Expand Down
5 changes: 3 additions & 2 deletions querent/core/base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,11 @@ async def _listen_for_state_changes(self):
if new_state.payload == "Terminate":
break
new_state = {
"event_type": new_state.event_type,
"event_type": str(new_state.event_type),
"timestamp": new_state.timestamp,
"payload": new_state.payload,
"file": new_state.file
"file": new_state.file,
"doc_source": new_state.doc_source,
}
await self._notify_subscribers(new_state["event_type"], new_state)
else:
Expand Down
8 changes: 5 additions & 3 deletions querent/core/transformers/bert_ner_opensourcellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def process_tokens(self, data: IngestedTokens):
doc_entity_pairs = []
number_sentences = 0
try:
doc_source = data.doc_source
if not BERTLLM.validate_ingested_tokens(data):
self.set_termination_event()
return
Expand Down Expand Up @@ -199,7 +200,6 @@ async def process_tokens(self, data: IngestedTokens):
else:
filtered_triples = pairs_with_predicates
if not filtered_triples:
self.logger.debug("No entity pairs")
return
elif not self.skip_inferences:
relationships = self.semantic_extractor.process_tokens(filtered_triples)
Expand All @@ -212,17 +212,19 @@ async def process_tokens(self, data: IngestedTokens):
if not self.termination_event.is_set():
graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple))
if graph_json:
current_state = EventState(EventType.Graph,1.0, graph_json, file)
current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple))
if vector_json:
current_state = EventState(EventType.Vector,1.0, vector_json, file)
current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source)
await self.set_state(new_state=current_state)
else:
return
else:
return
else:
return filtered_triples, file
else:
return
except Exception as e:
self.logger.debug(f"Invalid {self.__class__.__name__} configuration. Unable to process tokens. {e}")
Loading

0 comments on commit 32b08f7

Please sign in to comment.