From 073666bbd32ffdff0e23f8c3d12ad6d3790c20b7 Mon Sep 17 00:00:00 2001 From: Ansh5461 Date: Thu, 25 Apr 2024 18:41:13 +0530 Subject: [PATCH] Trying ingested images --- querent/core/base_engine.py | 19 +- .../transformers/bert_ner_opensourcellm.py | 6 +- .../fixed_entities_set_opensourcellm.py | 124 ++++++++++++- ..._llm_bert_ner_or_fixed_entities_set_ner.py | 31 +++- querent/ingestors/doc/doc_ingestor.py | 16 +- querent/ingestors/pdfs/pdf_ingestor_v1.py | 10 +- .../ner_llm_transformer.py | 171 +++++++++++++++++- tests/workflows/ingested_images_test.py | 117 ++++++++++++ 8 files changed, 474 insertions(+), 20 deletions(-) create mode 100644 tests/workflows/ingested_images_test.py diff --git a/querent/core/base_engine.py b/querent/core/base_engine.py index 2d471aa5..8d68f1a4 100644 --- a/querent/core/base_engine.py +++ b/querent/core/base_engine.py @@ -4,6 +4,7 @@ from querent.callback.event_callback_interface import EventCallbackInterface from querent.common.types.ingested_images import IngestedImages from querent.common.types.ingested_messages import IngestedMessages +from querent.common.types.ingested_table import IngestedTables from querent.common.types.ingested_tokens import IngestedTokens from querent.common.types.ingested_code import IngestedCode from querent.common.types.querent_event import EventState, EventType @@ -120,6 +121,18 @@ async def process_code(self, data: IngestedCode): """ raise NotImplementedError + @abstractmethod + async def process_tables(self, data: IngestedTables): + """ + Process tables asynchronously. + Args: + data (IngestedTables): The input data to process. + Returns: + EventState: The state of the event is set with the event type and the timestamp + of the event and set using `self.set_state(event_state)`. + """ + pass + @abstractmethod async def process_images(self, data: IngestedImages): """ @@ -229,9 +242,13 @@ async def _inner_worker(): elif isinstance(data, IngestedTokens): await self.process_tokens(data) elif isinstance(data, IngestedImages): + print("Got an image from queue--------------------------------------------------------------------------\n\n", data.ocr_text) await self.process_images(data) elif isinstance(data, IngestedCode): await self.process_code(data) + elif isinstance(data, IngestedTables): + continue + # await self.process_tables(data) elif data is None: none_counter += 1 if none_counter >= 2: @@ -241,7 +258,7 @@ async def _inner_worker(): else: raise Exception( - f"Invalid data type {type(data)} for {self.__class__.__name__}. Supported type: {IngestedTokens, IngestedMessages}" + f"Invalid data type {type(data)} for {self.__class__.__name__}. Supported type: {IngestedTokens, IngestedMessages, IngestedTables, IngestedImages}" ) except Exception as e: self.logger.error( diff --git a/querent/core/transformers/bert_ner_opensourcellm.py b/querent/core/transformers/bert_ner_opensourcellm.py index d035bdf9..93532423 100644 --- a/querent/core/transformers/bert_ner_opensourcellm.py +++ b/querent/core/transformers/bert_ner_opensourcellm.py @@ -1,6 +1,7 @@ import json from unidecode import unidecode from transformers import AutoTokenizer +from querent.common.types.ingested_table import IngestedTables from querent.kg.ner_helperfunctions.fixed_predicate import FixedPredicateExtractor from querent.common.types.ingested_images import IngestedImages from querent.config.core.opensource_llm_config import Opensource_LLM_Config @@ -112,8 +113,11 @@ def validate(self) -> bool: def process_messages(self, data: IngestedMessages): return super().process_messages(data) - def process_images(self, data: IngestedImages): + async def process_images(self, data: IngestedImages): return super().process_images(data) + + async def process_tables(self, data: IngestedTables): + return super().process_tables(data) async def process_code(self, data: IngestedCode): return super().process_code(data) diff --git a/querent/core/transformers/fixed_entities_set_opensourcellm.py b/querent/core/transformers/fixed_entities_set_opensourcellm.py index e9a8ab76..dcf318fa 100644 --- a/querent/core/transformers/fixed_entities_set_opensourcellm.py +++ b/querent/core/transformers/fixed_entities_set_opensourcellm.py @@ -1,6 +1,7 @@ import json from unidecode import unidecode from transformers import AutoTokenizer +from querent.common.types.ingested_table import IngestedTables from querent.kg.ner_helperfunctions.fixed_predicate import FixedPredicateExtractor from querent.common.types.ingested_images import IngestedImages from querent.config.core.opensource_llm_config import Opensource_LLM_Config @@ -94,8 +95,119 @@ def validate(self) -> bool: def process_messages(self, data: IngestedMessages): return super().process_messages(data) - def process_images(self, data: IngestedImages): - return super().process_images(data) + def process_tables(self, data: IngestedTables): + pass + + async def process_images(self, data: IngestedImages): + doc_entity_pairs = [] + doc_entity_pairs_ocr = [] + entities_list = [] + final_entities_list = [] + number_sentences = 0 + try: + doc_source = data.doc_source + if not Fixed_Entities_LLM.validate_ingested_images(data): + self.set_termination_event() + return + if data.ocr_text: + ocr_text = ' '.join(data.ocr_text) + else: + ocr_text = data.ocr_text + + if data.text: + clean_text = ' '.join(data.text) + else: + clean_text = data.text + + file, content = data.file, clean_text + + ocr_content = ocr_text + + if ocr_content: + if self.fixed_entities: + ocr_content = self.entity_context_extractor.find_entity_sentences(ocr_content) + ocr_tokens = self.ner_llm_instance._tokenize_and_chunk(ocr_content) + for tokenized_sentence, original_sentence, sentence_idx in ocr_tokens: + (entities, entity_pairs,) = self.ner_llm_instance.extract_entities_from_sentence(original_sentence, sentence_idx, [s[1] for s in ocr_tokens],self.isConfinedSearch, self.fixed_entities, self.sample_entities) + print("Entities ---------", entities) + print("Entities pairs ---------------------------", entity_pairs) + if entity_pairs: + doc_entity_pairs_ocr.append(self.ner_llm_instance.transform_entity_pairs(entity_pairs)) + else: + continue + number_sentences = number_sentences + 1 + + print("Doc entity pairs --------", doc_entity_pairs) + + if len(doc_entity_pairs_ocr) == 0 and len(ocr_content) != 0: + if content: + if self.fixed_entities: + content = self.entity_context_extractor.find_entity_sentences(content) + tokens = self.ner_llm_instance._tokenize_and_chunk(content) + doc_entity_pairs_ocr = self.ner_llm_instance.extract_entities_from_sentence_for_given_sentence(ocr_content, sentence_idx, [s[1] for s in tokens],self.isConfinedSearch, self.fixed_entities, self.sample_entities) + print("doc_entity_pairs_ocr-----------------------", doc_entity_pairs_ocr) + for tokenized_sentence, original_sentence, sentence_idx in tokens: + #return list of entities from document, and entity pair + print("Here in side fo loop") + (entities, entity_pairs,) = self.ner_llm_instance.extract_entities_from_chunk(original_sentence, sentence_idx, [s[1] for s in tokens],self.isConfinedSearch, self.fixed_entities, self.sample_entities) + print("Entity pairs found from content", entity_pairs) + print("Entities found from content", entities) + if entity_pairs: + + doc_entity_pairs.append(self.ner_llm_instance.transform_entity_pairs(entity_pairs)) + entities_list.append(entities) + number_sentences = number_sentences + 1 + #process those entities and ocr entity here + #if FE, then find the one most occuring + #if not FE, find the entity pair, where 1 entity is OCR text, and other is any other entity, which is most occuring, or which has higher confidence + final_entities_list = self.ner_llm_instance.create_subject_object_sentence_tuples(doc_entity_pairs_ocr, entities_list) + + + elif len(ocr_content) == 0: + #highest confidence entity pair from page text + sample_entity_pair = [{'entity': 'Image', 'label': 'image_data', 'score': 1.0, 'start_idx': 1, 'noun_chunk': 'image', 'noun_chunk_length': 1}] + final_entities_list = self.ner_llm_instance.create_subject_object_sentence_tuples(sample_entity_pair, entities_list) + + + print("Final entities ------", final_entities_list) + #- + + if self.sample_entities: + doc_entity_pairs = self.entity_context_extractor.process_entity_types(doc_entities=final_entities_list) + if doc_entity_pairs and any(doc_entity_pairs): + doc_entity_pairs = self.ner_llm_instance.remove_duplicates(final_entities_list) + filtered_triples = process_data(doc_entity_pairs, file) + if not filtered_triples: + self.logger.debug("No entity pairs") + return + elif not self.skip_inferences: + relationships = self.semantic_extractor.process_tokens(filtered_triples) + self.logger.debug(f"length of relationships {len(relationships)}") + relationships = self.semantictriplefilter.filter_triples(relationships) + if len(relationships) > 0: + embedding_triples = self.create_emb.generate_embeddings(relationships) + if self.sample_relationships: + embedding_triples = self.predicate_context_extractor.process_predicate_types(embedding_triples) + for triple in embedding_triples: + if not self.termination_event.is_set(): + graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple)) + if graph_json: + current_state = EventState(EventType.Graph,1.0, graph_json, file, doc_source=doc_source) + await self.set_state(new_state=current_state) + vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple)) + if vector_json: + current_state = EventState(EventType.Vector,1.0, vector_json, file, doc_source=doc_source) + await self.set_state(new_state=current_state) + else: + return + else: + return + else: + return filtered_triples, file + else: + return + except Exception as e: + self.logger.debug(f"Invalid {self.__class__.__name__} configuration. Unable to process tokens. {e}") async def process_code(self, data: IngestedCode): return super().process_code(data) @@ -107,6 +219,14 @@ def validate_ingested_tokens(data: IngestedTokens) -> bool: return False return True + + @staticmethod + def validate_ingested_images(data: IngestedImages) -> bool: + if data.is_error(): + + return False + + return True def count_entity_pairs(self, doc_entity_pairs): total_pairs = 0 diff --git a/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py b/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py index 6855b7a3..7afa77f6 100644 --- a/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py +++ b/querent/core/transformers/gpt_llm_bert_ner_or_fixed_entities_set_ner.py @@ -1,6 +1,7 @@ import asyncio import json import re +from querent.common.types.ingested_table import IngestedTables from querent.core.transformers.fixed_entities_set_opensourcellm import Fixed_Entities_LLM from querent.kg.ner_helperfunctions.fixed_predicate import FixedPredicateExtractor from querent.config.core.gpt_llm_config import GPTConfig @@ -96,11 +97,29 @@ def validate(self) -> bool: def process_messages(self, data: IngestedMessages): return super().process_messages(data) - def process_images(self, data: IngestedImages): - return super().process_messages(data) + async def process_images(self, data: IngestedImages): + try: + if not GPTLLM.validate_ingested_images(data): + self.set_termination_event() + return + + doc_source = data.doc_source + relationships = [] + unique_keys = set() + result = await self.llm_instance.process_images(data) + if not result: + return + + return None + + except Exception as e: + self.logger.debug(f"Invalid {self.__class__.__name__} configuration. Unable to process tokens. {e}") async def process_code(self, data: IngestedCode): return super().process_messages(data) + + async def process_tables(self, data: IngestedTables): + return super().process_tables(data) @staticmethod def validate_ingested_tokens(data: IngestedTokens) -> bool: @@ -109,6 +128,14 @@ def validate_ingested_tokens(data: IngestedTokens) -> bool: return False return True + + @staticmethod + def validate_ingested_images(data: IngestedImages) -> bool: + if data.is_error(): + + return False + + return True def extract_semantic_triples(self, chat_completion): # Extract the message content from the ChatCompletion message_content = chat_completion.choices[0].message.content.replace('\n', '') diff --git a/querent/ingestors/doc/doc_ingestor.py b/querent/ingestors/doc/doc_ingestor.py index c4385078..66b41bad 100644 --- a/querent/ingestors/doc/doc_ingestor.py +++ b/querent/ingestors/doc/doc_ingestor.py @@ -99,14 +99,14 @@ async def extract_text_from_doc(self, collected_bytes: CollectedBytes, doc_sourc ) i = 1 - for table in doc.tables: - table_data = [] - for row in table.rows: - row_data = [] - for cell in row.cells: - row_data.append(cell.text.strip()) - table_data.append(row_data) - yield IngestedTables(file=collected_bytes.file, table = table_data, page_num = i, text = text, error=None) + # for table in doc.tables: + # table_data = [] + # for row in table.rows: + # row_data = [] + # for cell in row.cells: + # row_data.append(cell.text.strip()) + # table_data.append(row_data) + # yield IngestedTables(file=collected_bytes.file, table = table_data, page_num = i, text = text, error=None) i = 1 for rel in doc.part.rels.values(): diff --git a/querent/ingestors/pdfs/pdf_ingestor_v1.py b/querent/ingestors/pdfs/pdf_ingestor_v1.py index e42650a3..5b6f548e 100644 --- a/querent/ingestors/pdfs/pdf_ingestor_v1.py +++ b/querent/ingestors/pdfs/pdf_ingestor_v1.py @@ -14,6 +14,7 @@ import fitz from PIL import Image import io +import base64 import pybase64 import pytesseract @@ -111,8 +112,8 @@ async def extract_and_process_pdf( doc_source=doc_source, ) - async for table in self.extract_table(collected_bytes): - yield table + # async for table in self.extract_table(collected_bytes): + # yield table async for imgae_data in self.extract_img(loader, collected_bytes.file, collected_bytes.data): yield imgae_data @@ -170,8 +171,8 @@ async def extract_img(self, doc, file_path, data): yield IngestedImages( file=file_path, - image=pybase64.b64encode(data), - image_name=f"{str(uuid.UUID)}.{image_ext}", + image=base64.b64encode(image_data).decode('utf-8'), + image_name=f"{str(uuid.uuid4())}.{image_ext}", page_num=page_num, text=[text_content], coordinates=None, @@ -183,6 +184,7 @@ async def get_ocr_from_image(self, image): try: image = Image.open(io.BytesIO(image)) text = pytesseract.image_to_string(image) + print("Got text from images ---------------------------") except Exception as e: self.logger.error("Exception-{e}") raise e diff --git a/querent/kg/ner_helperfunctions/ner_llm_transformer.py b/querent/kg/ner_helperfunctions/ner_llm_transformer.py index 94bcc3c6..2370ff20 100644 --- a/querent/kg/ner_helperfunctions/ner_llm_transformer.py +++ b/querent/kg/ner_helperfunctions/ner_llm_transformer.py @@ -9,6 +9,8 @@ from querent.kg.ner_helperfunctions.dependency_parsing import Dependency_Parsing from unidecode import unidecode import re +from collections import Counter + """ @@ -267,7 +269,7 @@ def extract_binary_pairs(self, entities: List[dict], tokens: List[str], all_sent self.logger.error(f"Error extracting binary pairs: {e}") return binary_pairs - def extract_fixed_entities_from_chunk(self, chunk: List[str], fixed_entities: List[str], entity_types: List[str], default_score=1.0): + def extract_fixed_entities_from_chunk(self, chunk: List[str], fixed_entities: List[str], entity_types: List[str], default_score=None): results = [] merged_chunk = [] # List to hold merged tokens current_word = "" # String to accumulate current word pieces @@ -302,10 +304,12 @@ def extract_fixed_entities_from_chunk(self, chunk: List[str], fixed_entities: Li if start_idx is not None: label = entity_types[normalized_entities.index(entity)] if normalized_entities.index(entity) < len(entity_types) else 'Unknown' + # Calculate score if not provided + score = default_score if default_score is not None else 1.0 # Example default score calculation results.append({ "entity": entity, "label": label, - "score": default_score, + "score": score, "start_idx": start_idx }) except Exception as e: @@ -314,6 +318,169 @@ def extract_fixed_entities_from_chunk(self, chunk: List[str], fixed_entities: Li return sorted(results, key=lambda x: x['start_idx']) + def extract_entities_from_sentence_for_given_sentence(self, sentence: str, sentence_idx: int, all_sentences: List[str], fixed_entities_flag: bool, fixed_entities: List[str],entity_types: List[str]): + print("Extracting entity pair") + try: + tokens = self.tokenize_sentence(sentence) + chunks = self.get_chunks(tokens) + all_page_entities = [] + + for chunk in chunks: + if fixed_entities_flag == False: + entities = self.extract_entities_from_chunk(chunk) + else: + entities = self.extract_fixed_entities_from_chunk(chunk,fixed_entities, entity_types) + all_page_entities.append(entities) + + return all_page_entities + + except Exception as e: + self.logger.error(f"Error extracting entities for an ocr sentence: {e}") + + def get_max_score_entity_pair(self, entities_list): + max_score = 0 + best_pair = None + for i in range(0, len(entities_list), 2): + pair_score = entities_list[i]['score'] + entities_list[i+1]['score'] + if pair_score > max_score: + max_score = pair_score + best_pair = (entities_list[i], entities_list[i+1]) + return best_pair + + def compare_and_retrieve(self, entities_list, binary_pairs, single_entity): + best_pair = self.get_max_score_entity_pair(entities_list) + + found_pair = False + for pair, context in binary_pairs: + if (single_entity['entity'] == best_pair[0]['entity'] or single_entity['entity'] == best_pair[1]['entity']): + found_pair = True + if found_pair and ( + (pair[0]['entity'] == best_pair[0]['entity'] and pair[1]['entity'] == best_pair[1]['entity']) or + (pair[1]['entity'] == best_pair[0]['entity'] and pair[0]['entity'] == best_pair[1]['entity'])): + return { + "entity_pair": best_pair, + "context": context + } + if not found_pair: + return {"message": "No matching entity found in the best pair based on the single entity input."} + return None + + def find_most_frequent_pair_with_entity(self, binary_pairs, ocr_entities): + + result = [] + for single_entity in ocr_entities: + + # Filter pairs to include only those containing the single entity + filtered_pairs = [] + for pair, context in binary_pairs: + if single_entity['entity'] in [pair[0]['entity'], pair[1]['entity']]: + # Since dictionaries are unhashable, convert pair to a tuple of sorted tuples to count occurrences + sorted_pair = tuple(sorted((pair[0]['entity'], pair[1]['entity']))) + filtered_pairs.append((sorted_pair, context)) + + # Count occurrences of each pair + pair_counter = Counter([pair for pair, _ in filtered_pairs]) + + # Find the pair with the maximum occurrence + if not pair_counter: + return None + + most_frequent_pair, count = pair_counter.most_common(1)[0] + + # Retrieve the context information for the most frequent pair + for pair, context in filtered_pairs: + if tuple(sorted((pair[0]['entity'], pair[1]['entity']))) == most_frequent_pair: + """ + + + """ + result.append({ + "most_frequent_pair": most_frequent_pair, + "count": count, + "context": context + }) + + return result + + def find_highest_scoring_pair(self, binary_pairs): + max_score = 0 # Initialize with a value lower than the lowest possible score (scores are usually non-negative). + highest_scoring_pair = None + highest_scoring_context = None + + # Iterate through each pair and calculate the total score + for pair, context in binary_pairs: + total_score = pair[0]['score'] + pair[1]['score'] + if total_score > max_score: + max_score = total_score + highest_scoring_pair = pair + highest_scoring_context = context + + # Return the highest scoring pair along with its context and total score + if highest_scoring_pair: + return { + "highest_scoring_pair": highest_scoring_pair, + "total_score": max_score, + "context": highest_scoring_context + } + else: + return {"message": "No pairs found or all pairs have zero or negative scores."} + + + def find_most_frequent_entity_pair(self, binary_pairs): + # Initialize a counter to keep track of entity pair occurrences + pair_counter = Counter() + + # Iterate through each pair and count each unique pair + for pair, context in binary_pairs: + # Create a canonical form for the pair (sorted by entity name to ensure (A,B) and (B,A) are treated the same) + sorted_pair = tuple(sorted((pair[0]['entity'], pair[1]['entity']))) + pair_counter[sorted_pair] += 1 + + # Find the pair with the highest occurrence + if not pair_counter: + return None + + most_frequent_pair, count = pair_counter.most_common(1)[0] + + # Collect all contexts where the most frequent pair appears + contexts = [context for pair, context in binary_pairs if tuple(sorted((pair[0]['entity'], pair[1]['entity']))) == most_frequent_pair] + + # Return the most frequent pair along with its occurrence count and all associated contexts + return { + "most_frequent_pair": most_frequent_pair, + "occurrence_count": count, + "contexts": contexts + } + + def create_subject_object_sentence_tuples(self, ocr_entities, entity_list): + # Prepare the list to hold the result tuples + results = [] + + for single_entity in ocr_entities: + + # Iterate through each entity in the list + for entity in entity_list: + # Create a tuple with the single entity as 'subject', the current entity as 'object', and use the 'sentence' from the object entity + if 'sentence' in entity: + result_tuple = ( + single_entity, + entity['sentence'], + entity + ) + + results.append(result_tuple) + else: + # Handle cases where 'sentence' might not be present in the entity dictionary + result_tuple = ( + single_entity, + entity, + "No sentence available" + ) + results.append(result_tuple) + + return results + + def extract_entities_from_sentence(self, sentence: str, sentence_idx: int, all_sentences: List[str], fixed_entities_flag: bool, fixed_entities: List[str],entity_types: List[str]): diff --git a/tests/workflows/ingested_images_test.py b/tests/workflows/ingested_images_test.py new file mode 100644 index 00000000..16ec7f61 --- /dev/null +++ b/tests/workflows/ingested_images_test.py @@ -0,0 +1,117 @@ +import asyncio +from asyncio import Queue +import json +from pathlib import Path +from querent.callback.event_callback_interface import EventCallbackInterface +from querent.collectors.fs.fs_collector import FSCollectorFactory +from querent.common.types.ingested_tokens import IngestedTokens +from querent.common.types.querent_event import EventState, EventType +from querent.config.collector.collector_config import FSCollectorConfig +from querent.common.uri import Uri +from querent.config.core.llm_config import LLM_Config +from querent.core.transformers.fixed_entities_set_opensourcellm import Fixed_Entities_LLM +from querent.ingestors.ingestor_manager import IngestorFactoryManager +import pytest +import uuid +from querent.common.types.file_buffer import FileBuffer +from querent.core.transformers.bert_ner_opensourcellm import BERTLLM +from querent.processors.text_cleanup_processor import TextCleanupProcessor +from querent.querent.resource_manager import ResourceManager +from querent.querent.querent import Querent +import time +# from querent.storage.milvus_vectorevent_storage import MilvusDBConnection +from querent.config.core.gpt_llm_config import GPTConfig +from querent.core.transformers.gpt_llm_bert_ner_or_fixed_entities_set_ner import GPTLLM + +@pytest.mark.asyncio +async def test_ingest_all_async(): + # Set up the collectors + # db_conn = DatabaseConnection(dbname="postgres", + # user="postgres", + # password="querent", + # host="localhost", + # port="5432") + # # ml_conn = MilvusDBConnection() + directories = [ "/home/ansh/pyg-trail/testing-ocr"] + collectors = [ + FSCollectorFactory().resolve( + Uri("file://" + str(Path(directory).resolve())), + FSCollectorConfig(config_source={ + "id": str(uuid.uuid4()), + "root_path": directory, + "name": "Local-config", + "config": {}, + "backend": "localfile", + "uri": "file://", + }), + ) + for directory in directories + ] + + # Set up the result queue + result_queue = asyncio.Queue() + text_cleanup_processor = TextCleanupProcessor() + # Create the IngestorFactoryManager + ingestor_factory_manager = IngestorFactoryManager( + collectors=collectors, result_queue=result_queue, processors=[text_cleanup_processor] + ) + ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async()) + resource_manager = ResourceManager() + gpt_llm_config = GPTConfig( + ner_model_name="botryan96/GeoBERT", + # rel_model_path="/home/nishantg/Downloads/openhermes-2.5-mistral-7b.Q5_K_M.gguf", + enable_filtering=True, + openai_api_key="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" + ,filter_params={ + 'score_threshold': 0.5, + 'attention_score_threshold': 0.1, + 'similarity_threshold': 0.5, + 'min_cluster_size': 5, + 'min_samples': 3, + 'cluster_persistence_threshold':0.2 + } + ,user_context="Query: Your task is to analyze and interpret the context to construct semantic triples. Please Identify the entity which is the subject and the entity which is object based on the context, and determine the meaningful relationship or predicate linking the subject entity to the object entity. Determine whether the entity labels provided match the subject type and object type and correct if needed. Also provide the predicate type. Answer:" + ,fixed_entities =["Carbonate", "Clastic", "Porosity", "Permeability", "Oil saturation", "Water saturation", "Gas saturation", "Depth", "Size", "Temperature", "Pressure", "Oil viscosity", "Gas-oil ratio", "Water cut", "Recovery factor", "Enhanced recovery technique", "Horizontal drilling", "Hydraulic fracturing", "Water injection", "Gas injection", "Steam injection", "Seismic activity", "Structural deformation", "Faulting", "Cap rock integrity", "Compartmentalization", "Connectivity", "Production rate", "Depletion rate", "Exploration technique", "Drilling technique", "Completion technique", "Environmental impact", "Regulatory compliance", "Economic analysis", "Market analysis", "oil well", "gas well", "oil field", "Gas field", "eagle ford shale", "ghawar", "johan sverdrup", "karachaganak", "maracaibo"], + sample_entities = ["rock_type", "rock_type", "reservoir_property", "reservoir_property", "reservoir_property", "reservoir_property", "reservoir_property", "reservoir_characteristic", "reservoir_characteristic", "reservoir_characteristic", "reservoir_characteristic", "reservoir_property", "reservoir_property", "production_metric", "production_metric", "recovery_technique", "drilling_technique", "recovery_technique", "recovery_technique", "recovery_technique", "recovery_technique", "geological_feature", "geological_feature", "geological_feature", "reservoir_feature", "reservoir_feature", "reservoir_feature", "production_metric", "production_metric", "exploration_method", "drilling_method", "completion_method", "environmental_aspect", "regulatory_aspect", "economic_aspect", "economic_aspect", "hydrocarbon_source", "hydrocarbon_source", "hydrocarbon_source", "hydrocarbon_source", "reservoir", "reservoir", "reservoir", "reservoir", "reservoir"] + , is_confined_search = True + , huggingface_token = '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' + ) + llm_instance = GPTLLM(result_queue, gpt_llm_config) + class StateChangeCallback(EventCallbackInterface): + def handle_event(self, event_type: EventType, event_state: EventState): + # assert event_state.event_type == EventType.Graph + if event_state['event_type'] == EventType.Graph : + triple = json.loads(event_state['payload']) + print("file---------------------",event_state['file'], "----------------", type(event_state['file'])) + print("triple: {}".format(triple)) + graph_event_data = { + 'subject': triple['subject'], + 'subject_type': triple['subject_type'], + 'object': triple['object'], + 'object_type': triple['object_type'], + 'predicate': triple['predicate'], + 'predicate_type': triple['predicate_type'], + 'sentence': triple['sentence'], + 'document_id': event_state['file'] + } + # db_conn.insert_graph_event(graph_event_data) + assert isinstance(triple['subject'], str) and triple['subject'] + # else : + # vector_triple = json.loads(event_state.payload) + # print("Inside Vector event ---------------------------------", vector_triple) + # milvus_coll = ml_conn.create_collection(collection_name=vector_triple['namespace'],dim = 384) + # ml_conn.insert_vector_event(id = vector_triple['id'], embedding= vector_triple['embeddings'], namespace= vector_triple['namespace'], document=event_state.file, collection= milvus_coll ) + llm_instance.subscribe(EventType.Graph, StateChangeCallback()) + # llm_instance.subscribe(EventType.Vector, StateChangeCallback()) + querent = Querent( + [llm_instance], + resource_manager=resource_manager, + ) + querent_task = asyncio.create_task(querent.start()) + await asyncio.gather(ingest_task, querent_task) + # db_conn.close() + +if __name__ == "__main__": + + # Run the async function + asyncio.run(test_ingest_all_async())