diff --git a/README.md b/README.md index f4a8f584..42d40a64 100644 --- a/README.md +++ b/README.md @@ -8,19 +8,17 @@ The Asynchronous Data Dynamo and Graph Neural Network Catalyst ## Unlock Insights, Asynchronous Scaling, and Forge a Knowledge-Driven Future -πŸš€ **Async at its Core**: Querent thrives in an asynchronous world. With asynchronous processing, we handle multiple data sources seamlessly, eliminating bottlenecks for utmost efficiency. +πŸš€ **Asynchronous Processing**: Querent excels in handling data from multiple sources concurrently with asynchronous processing, eliminating bottlenecks and maximizing efficiency. -πŸ’‘ **Knowledge Graphs Made Easy**: Constructing intricate knowledge graphs is a breeze. Querent's robust architecture simplifies building comprehensive knowledge graphs, enabling you to uncover hidden data relationships. +πŸ’‘ **Effortless Knowledge Graph Construction:**: Querent's robust architecture simplifies building comprehensive knowledge graphs, enabling you to uncover hidden data relationships. -🌐 **Scalability Redefined**: Scaling your data operations is effortless with Querent. We scale horizontally, empowering you to process multiple data streams without breaking a sweat. +🌐 **Seamless Scalability**: Easily scale your data operations with Querent's horizontal scaling capabilities, allowing for the smooth processing of multiple data streams. -πŸ”¬ **GNN Integration**: Querent seamlessly integrates with Graph Neural Networks (GNNs), enabling advanced data analysis, recommendation systems, and predictive modeling. +πŸ” **Data-Driven Insights**: Extract actionable information and make data-informed decisions with ease. -πŸ” **Data-Driven Insights**: Dive deep into data-driven insights with Querent's tools. Extract actionable information and make data-informed decisions with ease. +🧠 **Advanced Language Model Utilization**: Utilize state-of-the-art language models (LLMs) for natural language processing tasks, enabling Querent to tackle complex text-based challenges. -🧠 **Leverage Language Models**: Utilize state-of-the-art language models (LLMs) for text data. Querent empowers natural language processing, tackling complex text-based tasks. - -πŸ“ˆ **Efficient Memory Usage**: Querent is mindful of memory constraints. Our framework uses memory-efficient techniques, ensuring you can handle large datasets economically. +πŸ“ˆ **Memory-Efficient Framework**: Querent is designed to handle large datasets economically, using memory-efficient techniques to ensure optimal performance even under memory constraints. ## Table of Contents @@ -29,72 +27,38 @@ The Asynchronous Data Dynamo and Graph Neural Network Catalyst - [Table of Contents](#table-of-contents) - [Introduction](#introduction) - [Features](#features) - - [Getting Started](#getting-started) - - [Prerequisites](#prerequisites) - - [Installation](#installation) - [Usage](#usage) - [Configuration](#configuration) - [Querent: an asynchronous engine for LLMs](#querent-an-asynchronous-engine-for-llms) - - [Ease of Use](#ease-of-use) + - [Getting Started](#getting-started) + - [Prerequisites](#prerequisites) + - [Installation](#installation) + - [Setup DB](#setup-db) + - [Example](#example) + - [Perform Similarity Search](#performing-similarity-search) + - [Graph Traversal](#traversing-the-data) + - [Benefits](#additional-benefits) - [Contributing](#contributing) - [License](#license) + ## Introduction -Querent is designed to simplify and optimize data collection and processing workflows. Whether you need to scrape web data, ingest files, preprocess text, or create complex knowledge graphs, Querent offers a flexible framework for building and scaling these processes. +Querent is designed to simplify and optimize data collection and processing workflows. Whether you need to ingest files, preprocess text, or create complex knowledge graphs from local data, Querent offers a flexible framework for building and scaling these processes. ## Features -- **Collectors:** Gather data from various sources asynchronously, including web scraping and file collection. +- **Collectors:** Gather local data from file sources asynchronously. - **Ingestors:** Process collected data efficiently with custom transformations and filtering. - **Processors:** Apply asynchronous data processing, including text preprocessing, cleaning, and feature extraction. -- **Engines:** Execute a suite of LLM engines to extract insights from data, leveraging parallel processing for enhanced efficiency. - -- **Storage:** Store processed data in various storage systems, such as databases or cloud storage. - -- **Workflow Management:** Efficiently manage and scale data workflows with task orchestration. - -- **Scalability:** Querent is designed to scale horizontally, handling large volumes of data with ease. - -## Getting Started - -Let's get Querent up and running on your local machine. - -### Prerequisites - -- Python 3.9+ -- Virtual environment (optional but recommended) - -### Installation - -1. Create a virtual environment (recommended): - - ```bash - python -m venv venv - source venv/bin/activate # On Windows, use `venv\Scripts\activate` - ``` -2. Install latest Querent Workflow Orchestrator package: +- **Engines:** Leverage a Language Model (LLM) engine to convert textual data into knowledge triples (Subject, Predicate, Object) based on attention matrix scores. - ```bash - pip install querent - ``` - -3. Install the project dependencies: +- **Storage:** Store processed data in a PostgreSQL storage system. - ```bash - python3 -m spacy download en_core_web_lg - ``` -4. Apt install the project dependencies: - ```bash - sudo apt install tesseract-ocr - sudo apt install libtesseract-dev - sudo apt-get install ffmpeg - sudo apt install antiword - ``` ## Usage @@ -104,17 +68,10 @@ Querent provides a flexible framework that adapts to your specific data collecti 2. **Collecting Data:** Implement collector classes to gather data from chosen sources. Handle errors and edge cases gracefully. -3. **Processing Data:** Create ingestors and processors to clean, transform, and filter collected data. Apply custom logic to meet your requirements. +3. **Processing Data:** Create ingestors and processors to clean, transform, and filter collected data. 4. **Storage:** Choose your storage system (e.g., databases) and configure connections. Store processed data efficiently. -5. **Task Orchestration:** For large tasks, implement a task orchestrator to manage and distribute the workload. - -6. **Scaling:** To handle scalability, consider running multiple instances of collectors and ingestors in parallel. - -7. **Monitoring:** Implement monitoring and logging to track task progress, detect errors, and ensure smooth operation. - -8. **Documentation:** Maintain thorough project documentation to make it easy for others (and yourself) to understand and contribute. ## Configuration @@ -161,139 +118,148 @@ sequenceDiagram ``` -## Ease of Use +## Getting Started + +Let's get Querent up and running on your local machine. + +### Prerequisites + +- Python 3.9+ +- Virtual environment (optional but recommended) + +### Installation + +1. Create a virtual environment (recommended): + + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use `venv\Scripts\activate` + ``` +2. Install latest Querent Workflow Orchestrator package: + + ```bash + pip install querent + ``` + +3. Install the project dependencies: + + ```bash + python3 -m spacy download en_core_web_lg + ``` + +4. Apt install the project dependencies: + ```bash + sudo apt install tesseract-ocr + sudo apt install libtesseract-dev + sudo apt-get install ffmpeg + sudo apt install antiword + ``` +5. Install torch + ``` + pip install torch + ``` +6. Install Docker : Refer to the [official documentation](https://docs.docker.com/engine/install/) + +### Setup DB + +1. **Download the docker compose file** - [Postgres docker compose file.](tests/tutorial/docker-compose.yaml) + +2. **Run Postgres Instance** - Navigate to the directory where the docker compose file is downloaded. Execute the below: + +```bash + docker compose up +``` + +### Example -With Querent, creating scalable workflows with any LLM is just a few lines of code. +1. **Download the example file with fixed entities** - [Example file.](tests/tutorial/example_fixed_entities.py). Then also download the [example pdf](tests/data/readme_assets/example.pdf) and place it in a directory. + +2. **Run the example file** - This script will load the BERT-based embedding model to extract attention weights. The algorithm is designed to identify semantic triples in the data. In the example.py file above, users should modify the script to change the directory where the `example.pdf` file is stored. If running on personal files, modify the fixed entities and their respective types. This will create semantic triples (Subject, Predicate, Object) based on user-provided data. Execute the below: ```python -import pytest -import uuid -from pathlib import Path -import asyncio - -from querent.callback.event_callback_interface import EventCallbackInterface -from querent.common.types.ingested_tokens import IngestedTokens -from querent.common.types.ingested_code import IngestedCode -from querent.common.types.ingested_images import IngestedImages -from querent.common.types.ingested_messages import IngestedMessages -from querent.common.types.querent_event import EventState, EventType -from querent.common.types.querent_queue import QuerentQueue -from querent.core.base_engine import BaseEngine -from querent.querent.querent import Querent -from querent.querent.resource_manager import ResourceManager -from querent.collectors.collector_resolver import CollectorResolver -from querent.common.uri import Uri -from querent.config.collector.collector_config import FSCollectorConfig -from querent.ingestors.ingestor_manager import IngestorFactoryManager - -# Create input and output queues -input_queue = QuerentQueue() -resource_manager = ResourceManager() - - -# Define a simple mock LLM engine for testing -class MockLLMEngine(BaseEngine): - def __init__(self, input_queue: QuerentQueue): - super().__init__(input_queue) - - async def process_tokens(self, data: IngestedTokens): - if data is None or data.is_error(): - # the LLM developer can raise an error here or do something else - # the developers of Querent can customize the behavior of Querent - # to handle the error in a way that is appropriate for the use case - self.set_termination_event() - return - # Set the state of the LLM - # At any given point during the execution of the LLM, the LLM developer - # can set the state of the LLM using the set_state method - # The state of the LLM is stored in the state attribute of the LLM - # The state of the LLM is published to subscribers of the LLM - current_state = EventState(EventType.Graph, 1.0, "anything", "dummy.txt") - await self.set_state(new_state=current_state) - - async def process_code(self, data: IngestedCode): - pass - - async def process_messages(self, data: IngestedMessages): - return super().process_messages(data) - - async def process_images(self, data: IngestedImages): - return super().process_images(data) - - def validate(self): - return True - - -@pytest.mark.asyncio -async def test_example_workflow_with_querent(): - # Initialize some collectors to collect the data - directory_path = "path/to/your/data/directory" - collectors = [ - CollectorResolver().resolve( - Uri("file://" + str(Path(directory_path).resolve())), - FSCollectorConfig(root_path=directory_path, id=str(uuid.uuid4())), - ) - ] - - # Connect to the collector - for collector in collectors: - await collector.connect() - - # Set up the result queue - result_queue = asyncio.Queue() - - # Create the IngestorFactoryManager - ingestor_factory_manager = IngestorFactoryManager( - collectors=collectors, result_queue=result_queue - ) - - # Start the ingest_all_async in a separate task - ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async()) - - ### A Typical Use Case ### - # Create an engine to harness the LLM - llm_mocker = MockLLMEngine(input_queue) - - # Define a callback function to subscribe to state changes - class StateChangeCallback(EventCallbackInterface): - async def handle_event(self, event_type: EventType, event_state: EventState): - print(f"New state: {event_state}") - print(f"New state type: {event_type}") - assert event_state.event_type == EventType.Graph - - # Subscribe to state change events - # This pattern is ideal as we can expose multiple events for each use case of the LLM - llm_mocker.subscribe(EventType.Graph, StateChangeCallback()) - - ## one can also subscribe to other events, e.g. EventType.CHAT_COMPLETION ... - - # Create a Querent instance with a single MockLLM - # here we see the simplicity of the Querent - # massive complexity is hidden in the Querent, - # while being highly configurable, extensible, and scalable - # async architecture helps to scale to multiple querenters - # How async architecture works: - # 1. Querent starts a worker task for each querenter - # 2. Querenter starts a worker task for each worker - # 3. Each worker task runs in a loop, waiting for input data - # 4. When input data is received, the worker task processes the data - # 5. The worker task notifies subscribers of state changes - # 6. The worker task repeats steps 3-5 until termination - querent = Querent( - [llm_mocker], - resource_manager=resource_manager, - ) - # Start the querent - querent_task = asyncio.create_task(querent.start()) - await asyncio.gather(ingest_task, querent_task) - - -if __name__ == "__main__": - asyncio.run(test_example_workflow_with_querent()) + python3 example_fixed_entities.py +``` + + +3. **Example Output** - Two tables are initialized when the above script is run + +- **Metadata table** - + +| id | event_id | subject | subject_type | predicate | object | object_type | sentence | file | doc_source | score | +|----|--------------------------------------|----------|--------------|-----------|--------|-------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|---------------------------------------------------------------------|---------| +| 1 | 298b4df3-a2f1-4721-b78d-9099309257c2 | coach | person | athlete | health | method | coach torres, with her innovative approach to student-athlete health and her emphasis on holistic training methods, has significantly influenced the physical and mental preparedness of greenwood's athletes. | /home/user/querent-main/readme_assets/example.pdf | file:///home/user/querent-main/readme_assets | 0.159262 | + + +- **Embedding table** - + +| id | event_id | embeddings | +|----|--------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------| +| 1 | 298b4df3-a2f1-4721-b78d-9099309257c2 | [-0.00637318,0.0032276064,-0.016642869,0.018911008,-0.004372431,0.035932742,0.010418983,-0.00960234,0.009969827,-0.021499356,...] | + + +## Performing Similarity Search + +Users can perform similarity searches in the embedding table to find relevant documents based on the vector embeddings. Here’s how you can do it: + +1. Convert your query into a vector embedding using the same embedding model used for creating the embeddings in the embedding table. +2. Find similar matches: Perform a similarity search in the embedding table to find the top N similar embeddings. + +3. Retrieve relevant data: Use the `event_id` from the similar embeddings to fetch the corresponding data from the metadata table. + +This approach is highly useful when dealing with thousands of files, as it essentially creates pointers to knowledge, making it easy to retrieve relevant information efficiently. + +## Traversing the Data +Querent allows you to traverse the data using SQL queries, enabling you to explore inward and outward edges from either the subject or object. Here’s how: + +1. Get Outward Edges: Find all relationships where a given entity is the subject. +```sql +SELECT * FROM public.metadata +WHERE subject = 'your_entity'; +``` + +2. Get Inward Edges: Find all relationships where a given entity is the object. +```sql +SELECT * FROM public.metadata +WHERE object = 'your_entity'; +``` +3. Find Shortest Path Based on Score: Use recursive queries to find the shortest path between entities based on the score. +```sql +WITH RECURSIVE Path (id, event_id, subject, object, score, path, depth) AS ( + SELECT id, event_id, subject, object, score, ARRAY[subject, object]::VARCHAR[], 1 + FROM public.metadata + WHERE subject = 'start_entity' + UNION ALL + SELECT m.id, m.event_id, m.subject, m.object, p.score + m.score, p.path || m.object, p.depth + 1 + FROM metadata m + JOIN Path p ON m.subject = p.object + WHERE p.depth < 10 -- Limit depth to prevent infinite recursion + AND NOT (m.object = ANY(p.path)) -- Avoid cycles +) +SELECT * +FROM Path +WHERE 'end_entity' = ANY(path) +ORDER BY score ASC +LIMIT 1; + 1; ``` + +## Additional Benefits + +1. Preparing Factual Data: The extracted triples can be used to prepare factual data for fine-tuning or training large language models (LLMs). + +2. GNN Use Cases: Graph Neural Networks (GNNs) can utilize the relationships and entities extracted to perform downstream tasks such as link prediction, node classification, and more. + +3. AI Use Cases: Enable advanced AI functionalities like cross-document summarization, entity recognition, and trend analysis across a large corpus of documents. + +4. Replacing the Need for a dedicated Graph Database: By using PostgreSQL and the embedded vectors, you can achieve efficient graph traversal and relationship mapping without the overhead of a dedicated graph database. This reduces complexity and cost. + +5. Scalability: This method scales well with the number of documents, making it suitable for large datasets. + +This system not only enhances data retrieval and analysis but also provides a robust foundation for various AI and machine learning applications. + ## Contributing Contributions to Querent are welcome! Please follow our [contribution guidelines](CONTRIBUTING.md) to get started. diff --git a/querent/core/transformers/bert_ner_opensourcellm.py b/querent/core/transformers/bert_ner_opensourcellm.py index 510c84eb..87f1b837 100644 --- a/querent/core/transformers/bert_ner_opensourcellm.py +++ b/querent/core/transformers/bert_ner_opensourcellm.py @@ -1,4 +1,5 @@ import json +import uuid from transformers import AutoConfig, AutoTokenizer import transformers import time @@ -298,7 +299,6 @@ async def process_tokens(self, data: IngestedTokens): try: doc_entity_pairs = [] doc_source = data.doc_source - if not BERTLLM.validate_ingested_tokens(data): self.set_termination_event() return @@ -311,7 +311,7 @@ async def process_tokens(self, data: IngestedTokens): doc_entity_pairs = self._get_entity_pairs(content) if not doc_entity_pairs: return - + doc_entity_pairs = self._process_entity_types(doc_entity_pairs) if not self.entity_context_extractor and not self.predicate_context_extractor: pairs_withattn = self.attn_scores_instance.extract_and_append_attention_weights(doc_entity_pairs) @@ -341,6 +341,7 @@ def _prepare_content(self, data): else: content = clean_text file = data.get_file_path() + return content, file def _get_entity_pairs(self, content): @@ -414,8 +415,9 @@ async def _process_embedding_triples(self, embedding_triples, file, doc_source): for triple in embedding_triples: if self.termination_event.is_set(): return + event_id = str(uuid.uuid4()) - graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple)) + graph_json = json.dumps(TripleToJsonConverter.convert_graphjson(triple, event_id=event_id)) if graph_json: current_state = EventState( event_type=EventType.Graph, @@ -436,7 +438,7 @@ async def _process_embedding_triples(self, embedding_triples, file, doc_source): base_weights=[predicate_score, predicate_score, 3], normalize_weights=True # Normalize weights to ensure they sum to 1 ) - vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple=triple, embeddings=final_emb)) + vector_json = json.dumps(TripleToJsonConverter.convert_vectorjson(triple=triple, embeddings=final_emb,event_id=event_id)) if vector_json: current_state = EventState( event_type=EventType.Vector, diff --git a/querent/kg/rel_helperfunctions/attn_based_relationship_filter.py b/querent/kg/rel_helperfunctions/attn_based_relationship_filter.py index f9a2b877..85c1a75f 100644 --- a/querent/kg/rel_helperfunctions/attn_based_relationship_filter.py +++ b/querent/kg/rel_helperfunctions/attn_based_relationship_filter.py @@ -172,9 +172,10 @@ def process_tokens(ner_instance : NER_LLM, extractor, filtered_triples, nlp_mode updated_triples = [] for subject, predicate_metadata, object in filtered_triples: try: - context = predicate_metadata['current_sentence'].replace("\n"," ") + context = predicate_metadata['current_sentence'].replace("\n"," ").lower() head_positions = ner_instance.find_subword_indices(context, predicate_metadata['entity1_nn_chunk']) tail_positions = ner_instance.find_subword_indices(context, predicate_metadata['entity2_nn_chunk']) + if head_positions[0][0] > tail_positions[0][0]: head_entity = {'entity': object, 'noun_chunk':predicate_metadata['entity2_nn_chunk'], 'entity_label':predicate_metadata['entity2_label'] } tail_entity = {'entity': subject, 'noun_chunk':predicate_metadata['entity1_nn_chunk'], 'entity_label':predicate_metadata['entity1_label']} @@ -188,20 +189,18 @@ def process_tokens(ner_instance : NER_LLM, extractor, filtered_triples, nlp_mode attention_matrix = extractor.inference_attention(model_input) token_idx_with_word = ner_instance.tokenize_sentence_with_positions(context) spacy_doc = nlp_model(context) - filter = IndividualFilter(True, 0.02, token_idx_with_word, spacy_doc) - + filter = IndividualFilter(True, 0.01, token_idx_with_word, spacy_doc) + ## HEAD Entity Based Attention Search candidate_paths = perform_search(entity_pair.head_entity['start_idx'], attention_matrix, entity_pair, search_candidates=5, require_contiguous=True, max_relation_length=8, num_initial_tokens=extractor.num_start_tokens()) candidate_paths = remove_duplicates(candidate_paths) filtered_results = filter.filter(candidates=candidate_paths,e_pair=entity_pair) predicate_he, score_he = get_best_relation(filtered_results) - ##TAIL ENTITY Based Attention Search candidate_paths = perform_search(entity_pair.tail_entity['start_idx'], attention_matrix, entity_pair, search_candidates=5, require_contiguous=True, max_relation_length=8, num_initial_tokens=extractor.num_start_tokens()) candidate_paths = remove_duplicates(candidate_paths) filtered_results = filter.filter(candidates=candidate_paths,e_pair=entity_pair) predicate_te, score_te = get_best_relation(filtered_results) - if score_he > score_te and (score_he >= 0.1 or score_te >= 0.1): triple = create_semantic_triple(head_entity=head_entity['noun_chunk'], tail_entity=tail_entity['noun_chunk'], diff --git a/querent/kg/rel_helperfunctions/triple_to_json.py b/querent/kg/rel_helperfunctions/triple_to_json.py index 950d3b4a..f577bfdc 100644 --- a/querent/kg/rel_helperfunctions/triple_to_json.py +++ b/querent/kg/rel_helperfunctions/triple_to_json.py @@ -31,7 +31,7 @@ def _parse_json_str(json_str): raise ValueError(f"Error decoding JSON: {e}") @staticmethod - def convert_graphjson(triple): + def convert_graphjson(triple, event_id = None): try: subject, json_str, object_ = triple predicate_info = TripleToJsonConverter._parse_json_str(json_str) @@ -39,6 +39,7 @@ def convert_graphjson(triple): return {} json_object = { + "event_id": event_id, "subject": TripleToJsonConverter._normalize_text(subject, replace_space=True), "subject_type": TripleToJsonConverter._normalize_text(predicate_info.get("subject_type", "Unlabeled"), replace_space=True), "object": TripleToJsonConverter._normalize_text(object_, replace_space=True), @@ -67,7 +68,7 @@ def dynamic_weighted_average_embeddings(embeddings, base_weights, normalize_weig return weighted_sum @staticmethod - def convert_vectorjson(triple, blob = None, embeddings=None): + def convert_vectorjson(triple, blob = None, embeddings=None, event_id = None): try: subject, json_str, object_ = triple data = TripleToJsonConverter._parse_json_str(json_str) @@ -76,6 +77,7 @@ def convert_vectorjson(triple, blob = None, embeddings=None): id_format = f"{TripleToJsonConverter._normalize_text(subject,replace_space=True)}-{TripleToJsonConverter._normalize_text(data.get('predicate', ''),replace_space=True)}-{TripleToJsonConverter._normalize_text(object_,replace_space=True)}" json_object = { + "event_id": event_id, "id": TripleToJsonConverter._normalize_text(id_format), "embeddings": embeddings.tolist(), "size": len(embeddings.tolist()), diff --git a/tests/data/readme_assets/example.pdf b/tests/data/readme_assets/example.pdf new file mode 100644 index 00000000..312fc479 Binary files /dev/null and b/tests/data/readme_assets/example.pdf differ diff --git a/tests/tutorial/__init__.py b/tests/tutorial/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/tutorial/docker-compose.yaml b/tests/tutorial/docker-compose.yaml new file mode 100644 index 00000000..b1b7382e --- /dev/null +++ b/tests/tutorial/docker-compose.yaml @@ -0,0 +1,23 @@ +version: '3' +services: + postgres: + image: pgvector/pgvector:pg16 + environment: + - POSTGRES_USER=querent + - POSTGRES_PASSWORD=querent + - POSTGRES_DB=querent_test + volumes: + - ./quester/storage/sql/:/docker-entrypoint-initdb.d + ports: + - "5432:5432" + networks: + - querent + healthcheck: + test: ["CMD-SHELL", "pg_isready", "-d", "querent_test"] + interval: 30s + timeout: 60s + retries: 5 + start_period: 80s + +networks: + querent: diff --git a/tests/tutorial/example_fixed_entities.py b/tests/tutorial/example_fixed_entities.py new file mode 100644 index 00000000..4226161d --- /dev/null +++ b/tests/tutorial/example_fixed_entities.py @@ -0,0 +1,102 @@ +import asyncio +from asyncio import Queue +import json +from pathlib import Path +from querent.callback.event_callback_interface import EventCallbackInterface +from querent.collectors.fs.fs_collector import FSCollectorFactory +from querent.common.types.querent_event import EventState, EventType +from querent.config.collector.collector_config import FSCollectorConfig +from querent.common.uri import Uri +from querent.config.core.llm_config import LLM_Config +from querent.ingestors.ingestor_manager import IngestorFactoryManager +import uuid +import numpy as np +from querent.core.transformers.bert_ner_opensourcellm import BERTLLM +from querent.querent.resource_manager import ResourceManager +from querent.querent.querent import Querent +from postgres_utility import DatabaseManager + +async def ingest_all_async(): + db_manager = DatabaseManager( + dbname="querent_test", + user="querent", + password="querent", + host="localhost", + port="5432" + ) + + db_manager.connect_db() + db_manager.create_tables() + directories = ["/home/nishantg/querent-main/querent/tests/data/readme_assets"] + collectors = [ + FSCollectorFactory().resolve( + Uri("file://" + str(Path(directory).resolve())), + FSCollectorConfig(config_source={ + "id": str(uuid.uuid4()), + "root_path": directory, + "name": "Local-config", + "config": {}, + "backend": "localfile", + "uri": "file://", + }), + ) + for directory in directories + ] + + result_queue = asyncio.Queue() + + ingestor_factory_manager = IngestorFactoryManager( + collectors=collectors, result_queue=result_queue + ) + ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async()) + resource_manager = ResourceManager() + bert_llm_config = LLM_Config( + # ner_model_name="English", + rel_model_type = "bert", + rel_model_path = 'bert-base-uncased', + fixed_entities = [ + "university", "greenwood", "liam zheng", "department", "Metroville", + "Emily Stanton", "Coach", "health", "training", "athletes" + ], + sample_entities = [ + "organization", "organization", "person", "department", "city", + "person", "person", "method", "method", "person" + ], + is_confined_search = True + ) + llm_instance = BERTLLM(result_queue, bert_llm_config) + + class StateChangeCallback(EventCallbackInterface): + def handle_event(self, event_type: EventType, event_state: EventState): + if event_state['event_type'] == EventType.Graph: + triple = json.loads(event_state['payload']) + db_manager.insert_metadata( + event_id=triple['event_id'], + subject=triple['subject'], + subject_type=triple['subject_type'], + predicate=triple['predicate'], + object=triple['object'], + object_type=triple['object_type'], + sentence=triple['sentence'], + file=event_state['file'], + doc_source=event_state['doc_source'], + score=triple['score'] +) + elif event_state['event_type'] == EventType.Vector: + triple_v = json.loads(event_state['payload']) + db_manager.insert_embedding( + event_id=triple_v['event_id'], + embeddings=triple_v['embeddings'], + ) + + llm_instance.subscribe(EventType.Graph, StateChangeCallback()) + llm_instance.subscribe(EventType.Vector, StateChangeCallback()) + querent = Querent( + [llm_instance], + resource_manager=resource_manager, + ) + querent_task = asyncio.create_task(querent.start()) + await asyncio.gather(ingest_task, querent_task) + +if __name__ == "__main__": + asyncio.run(ingest_all_async()) diff --git a/tests/tutorial/postgres_utility.py b/tests/tutorial/postgres_utility.py new file mode 100644 index 00000000..7597b587 --- /dev/null +++ b/tests/tutorial/postgres_utility.py @@ -0,0 +1,199 @@ +import psycopg2 +from psycopg2 import sql +from psycopg2.extras import Json + +from querent.kg.rel_helperfunctions.embedding_store import EmbeddingStore +import numpy as np + +class DatabaseManager: + def __init__(self, dbname, user, password, host, port): + self.dbname = dbname + self.user = user + self.password = password + self.host = host + self.port = port + self.connection = None + + def connect_db(self): + try: + self.connection = psycopg2.connect( + dbname=self.dbname, + user=self.user, + password=self.password, + host=self.host, + port=self.port + ) + print("Database connection established") + except Exception as e: + print(f"Error connecting to database: {e}") + + def create_tables(self): + create_metadata_table_query = """ + CREATE TABLE IF NOT EXISTS metadata ( + id SERIAL PRIMARY KEY, + event_id UUID, + subject VARCHAR(255), + subject_type VARCHAR(255), + predicate VARCHAR(255), + object VARCHAR(255), + object_type VARCHAR(255), + sentence TEXT, + file VARCHAR(255), + doc_source VARCHAR(255), + score FLOAT + ); + """ + + create_embedding_table_query = """ + CREATE TABLE IF NOT EXISTS embedding ( + id SERIAL PRIMARY KEY, + event_id UUID, + embeddings VECTOR(384) + ); + """ + + try: + with self.connection.cursor() as cursor: + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") # Enable pgvector extension + cursor.execute(create_metadata_table_query) + cursor.execute(create_embedding_table_query) + self.connection.commit() + print("Tables created successfully") + except Exception as e: + print(f"Error creating tables: {e}") + self.connection.rollback() + + def insert_metadata(self, event_id, subject, subject_type, predicate, object, object_type, sentence, file, doc_source, score): + insert_query = """ + INSERT INTO metadata (event_id, subject, subject_type, predicate, object, object_type, sentence, file, doc_source, score) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id; + """ + try: + with self.connection.cursor() as cursor: + cursor.execute(insert_query, (event_id, subject, subject_type, predicate, object, object_type, sentence, file, doc_source, score)) + metadata_id = cursor.fetchone()[0] + self.connection.commit() + return metadata_id + except Exception as e: + print(f"Error inserting metadata: {e}") + self.connection.rollback() + + def insert_embedding(self,event_id, embeddings): + insert_query = """ + INSERT INTO embedding (event_id, embeddings) + VALUES (%s, %s); + """ + try: + with self.connection.cursor() as cursor: + cursor.execute(insert_query, (event_id, embeddings)) + self.connection.commit() + except Exception as e: + print(f"Error inserting embedding: {e}") + self.connection.rollback() + + def close_connection(self): + if self.connection: + self.connection.close() + print("Database connection closed") + + def find_similar_embeddings(self, sentence_embedding, top_k=3, similarity_threshold=0.9): + # print("Senetence embeddi ---", sentence_embedding) + emb = sentence_embedding + query = f""" + SELECT id, 1 - (embeddings <=> '{emb}') AS cosine_similarity + FROM public.embedding + ORDER BY cosine_similarity DESC + LIMIT {top_k}; + """ + try: + with self.connection.cursor() as cursor: + cursor.execute(query, (sentence_embedding, top_k)) + results = cursor.fetchall() + for result in results: + print("Result -----------", result) + filtered_results = [result for result in results if result[1] >= similarity_threshold] + return filtered_results + except Exception as e: + print(f"Error in finding similar embeddings: {e}") + return [] + + def fetch_metadata_by_ids(self, metadata_ids): + print("metafataaaa ids-----", metadata_ids) + + query = """ + SELECT * FROM public.metadata WHERE id IN %s; + """ + try: + with self.connection.cursor() as cursor: + cursor.execute(query, (tuple(metadata_ids),)) + results = cursor.fetchall() + return results + except Exception as e: + print(f"Error fetching metadata: {e}") + return [] + + +# Usage example +if __name__ == "__main__": + db_manager = DatabaseManager( + dbname="querent_test", + user="querent", + password="querent", + host="localhost", + port="5432" + ) + + db_manager.connect_db() + db_manager.create_tables() + + # # Example data insertion + # metadata_id = db_manager.insert_metadata( + # subject='the_environmental_sciences_department', + # subject_type='i_org', + # predicate='have_be_advocate_clean_energy_use', + # object='dr__emily_stanton', + # object_type='i_per', + # sentence='This is an example sentence.', + # file='example_file', + # doc_source='example_source' + # ) + + # db_manager.insert_embedding( + # subject_emb=[0.1, 0.2, 0.3], # Example vectors + # object_emb=[0.4, 0.5, 0.6], + # predicate_emb=[0.7, 0.8, 0.9], + # sentence_emb=[1.0, 1.1, 1.2], + # metadata_id=metadata_id + # ) + # db_manager.update_database_with_averages() + query_1 = "What is gas injection ?" + # query_1 = "What is eagle ford shale porosity and permiability ?" + # query_1 = "What is austin chalk formation ?" + # query_1 = "What type of source rock does austin chalk reservoir have ?" + # query_1 = "What are some of the important characteristics of Gulf of Mexico basin ?" + # query_1 = "Which wells are producing oil ?" + create_emb = EmbeddingStore() + query_1_emb = create_emb.get_embeddings([query_1])[0] +# Find similar embeddings in the database + similar_embeddings = db_manager.find_similar_embeddings(query_1_emb, top_k=10) + # Extract metadata IDs from the results + metadata_ids = [result[0] for result in similar_embeddings] + +# Fetch metadata for these IDs + # metadata_results = db_manager.fetch_metadata_by_ids(metadata_ids) + # print(metadata_results) + # traverser_bfs_results = db_manager.traverser_bfs(metadata_ids=metadata_ids) + # print(traverser_bfs_results) + # print(db_manager.show_detailed_relationship_paths(traverser_bfs_results)) + # print(db_manager.suggest_queries_based_on_edges(traverser_bfs_results)) + + + ## Second Query + # print("2nd Query ---------------------------------------------------") + # user_choice = [27, 29, 171] + # traverser_bfs_results = db_manager.traverser_bfs(metadata_ids=user_choice) + # print(traverser_bfs_results) + # print(db_manager.show_detailed_relationship_paths(traverser_bfs_results)) + # print(db_manager.suggest_queries_based_on_edges(traverser_bfs_results)) + db_manager.close_connection() \ No newline at end of file diff --git a/tests/workflows/test_multiple_collectors.py b/tests/workflows/test_multiple_collectors.py index cc8cc003..aa7f242d 100644 --- a/tests/workflows/test_multiple_collectors.py +++ b/tests/workflows/test_multiple_collectors.py @@ -120,7 +120,7 @@ async def test_multiple_collectors_all_async(): ): messages += 1 counter += 1 - assert counter == 112 + assert counter == 94 assert messages > 0