From f15d3bb7f3ce7bd7d0b18517b03564cea4da5e6e Mon Sep 17 00:00:00 2001 From: Puneet Saraswat <61435908+saraswatpuneet@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:25:27 -0500 Subject: [PATCH] some rudimentary logic for realtime news (#273) * some rudimentary logic for realtime news * Improved news ingestor * Changed file name --------- Co-authored-by: Ansh5461 --- querent/collectors/news/news_collector.py | 125 ++++++++++++++++----- querent/config/ingestor/ingestor_config.py | 1 + querent/ingestors/ingestor_manager.py | 1 + querent/ingestors/texts/text_ingestor.py | 2 +- querent/storage/gcs_query.py | 21 ++++ tests/ingestors/test_news_ingestor.py | 92 +++++++++++++++ 6 files changed, 216 insertions(+), 26 deletions(-) create mode 100644 querent/storage/gcs_query.py create mode 100644 tests/ingestors/test_news_ingestor.py diff --git a/querent/collectors/news/news_collector.py b/querent/collectors/news/news_collector.py index 54b03401..6e0fed44 100644 --- a/querent/collectors/news/news_collector.py +++ b/querent/collectors/news/news_collector.py @@ -1,13 +1,19 @@ import asyncio +import datetime +import uuid from newsapi import NewsApiClient from typing import AsyncGenerator from querent.common.types.collected_bytes import CollectedBytes -from querent.config.collector.collector_config import CollectorConfig, CollectorBackend, NewsCollectorConfig +from querent.config.collector.collector_config import ( + CollectorBackend, + NewsCollectorConfig, +) from querent.collectors.collector_base import Collector from querent.collectors.collector_factory import CollectorFactory from querent.common.uri import Uri from querent.logging.logger import setup_logger + class NewsCollector(Collector): """Enhanced class for collecting news articles using NewsAPI with advanced search capabilities and pagination.""" @@ -26,9 +32,79 @@ async def disconnect(self): async def poll(self) -> AsyncGenerator[CollectedBytes, None]: current_page = 1 - total_pages = float('inf') # Assume an infinite number of pages initially + total_pages = float("inf") # Assume an infinite number of pages initially + + # Adjust to_date for special cases to the current date + should_keep_going = False + if self.config.to_date in ["now", "latest", "", "present"]: + should_keep_going = True + + if should_keep_going: + while should_keep_going: + if should_keep_going: + self.config.to_date = datetime.datetime.now().strftime("%Y-%m-%d") + while current_page <= total_pages: + try: + response = self.newsapi.get_everything( + q=self.config.query, + sources=self.config.sources, + domains=self.config.domains, + exclude_domains=self.config.exclude_domains, + from_param=self.config.from_date, + to=self.config.to_date, + language=self.config.language, + sort_by=self.config.sort_by, + page_size=self.config.page_size, + page=current_page, + ) + if response["status"] == "ok": + articles = response.get("articles", []) + if not articles: + # If no articles found, consider sleeping for a shorter duration + await asyncio.sleep(3600) # Example: 1 hour + continue + + for article in articles: + article_data = { + "source": article.get('source', {}).get('name'), + "author": article.get('author'), + "title": article.get('title'), + "description": article.get('description'), + "url": article.get('url'), + "urlToImage": article.get('urlToImage'), + "publishedAt": article.get('publishedAt'), + "content": article.get('content') + } + publish_date = article.get('publishedAt').split('T')[0] + + title = article['title'] + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8")) + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True) + + total_results = response.get("totalResults", 0) + total_pages = ( + total_results + self.config.page_size - 1 + ) // self.config.page_size + current_page += 1 + else: + self.logger.error( + f"News API request failed: {response.get('message')}" + ) + break + except Exception as e: + self.logger.error(f"Error fetching news articles: {e}") + yield CollectedBytes(file="Error", data=None, error=e) + break - while current_page <= total_pages: + # After exhausting the current batch, reset for next polling cycle + current_page = 1 + total_pages = float("inf") + if not should_keep_going: + break + # Adjust the to_date for the next cycle + self.config.from_date = self.config.to_date + await asyncio.sleep(86400) + else: try: response = self.newsapi.get_everything( q=self.config.query, @@ -40,35 +116,34 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]: language=self.config.language, sort_by=self.config.sort_by, page_size=self.config.page_size, - page=current_page + page=current_page, ) + if response["status"] == "ok": + articles = response.get("articles", []) + if not articles: + return - if response['status'] == 'ok': - articles = response.get('articles', []) for article in articles: article_data = { - "source": article.get('source', {}).get('name'), - "author": article.get('author'), - "title": article.get('title'), - "description": article.get('description'), - "url": article.get('url'), - "urlToImage": article.get('urlToImage'), - "publishedAt": article.get('publishedAt'), - "content": article.get('content') + "source": article.get('source', {}).get('name'), + "author": article.get('author'), + "title": article.get('title'), + "description": article.get('description'), + "url": article.get('url'), + "urlToImage": article.get('urlToImage'), + "publishedAt": article.get('publishedAt'), + "content": article.get('content') } - yield CollectedBytes(file=article["title"], data=str(article_data).encode("utf-8")) - yield CollectedBytes(file=article["title"], data=None, error=None, eof=True) - - total_results = response.get('totalResults', 0) - total_pages = (total_results + self.config.page_size - 1) // self.config.page_size # Calculate total pages - current_page += 1 - else: - self.logger.error(f"News API request failed: {response.get('message')}") - break + + publish_date = article.get('publishedAt').split('T')[0] + title = article['title'] + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8")) + yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True) except Exception as e: self.logger.error(f"Error fetching news articles: {e}") - yield CollectedBytes(file="Error", data=None, error=e) - break + + + class NewsCollectorFactory(CollectorFactory): def backend(self) -> CollectorBackend: diff --git a/querent/config/ingestor/ingestor_config.py b/querent/config/ingestor/ingestor_config.py index 02710d1c..6d5b459b 100644 --- a/querent/config/ingestor/ingestor_config.py +++ b/querent/config/ingestor/ingestor_config.py @@ -32,3 +32,4 @@ class IngestorBackend(str, Enum): Email = "email" Slack = "slack" Jira = "jira" + News = "news" diff --git a/querent/ingestors/ingestor_manager.py b/querent/ingestors/ingestor_manager.py index 4ff4d0a6..f9b86196 100644 --- a/querent/ingestors/ingestor_manager.py +++ b/querent/ingestors/ingestor_manager.py @@ -109,6 +109,7 @@ def __init__( IngestorBackend.Slack.value: TextIngestorFactory(is_token_stream=True), IngestorBackend.Email.value: EmailIngestorFactory(), IngestorBackend.Jira.value: JsonIngestorFactory(), + IngestorBackend.News.value: TextIngestorFactory(is_token_stream=True) # Add more mappings as needed } self.file_caches = LRUCache(maxsize=cache_size) diff --git a/querent/ingestors/texts/text_ingestor.py b/querent/ingestors/texts/text_ingestor.py index c3921458..97e73bcd 100644 --- a/querent/ingestors/texts/text_ingestor.py +++ b/querent/ingestors/texts/text_ingestor.py @@ -10,7 +10,7 @@ class TextIngestorFactory(IngestorFactory): - SUPPORTED_EXTENSIONS = {"txt", "slack", ""} + SUPPORTED_EXTENSIONS = {"txt", "slack", "", "news"} def __init__(self, is_token_stream=False): self.is_token_stream = is_token_stream diff --git a/querent/storage/gcs_query.py b/querent/storage/gcs_query.py new file mode 100644 index 00000000..92ce1e59 --- /dev/null +++ b/querent/storage/gcs_query.py @@ -0,0 +1,21 @@ +from google.cloud import storage + +def list_gcs_files(bucket_name): + """Lists all the files in a GCS bucket.""" + # Create a storage client. + storage_client = storage.Client() + + # Get the GCS bucket. + bucket = storage_client.get_bucket(bucket_name) + + # List all objects in the bucket. + blobs = bucket.list_blobs() + + # Print the names of the objects in the bucket. + for blob in blobs: + print(blob.name) + +if __name__ == "__main__": + # Set your GCS bucket name here. + bucket_name = 'querent-test' + list_gcs_files(bucket_name) diff --git a/tests/ingestors/test_news_ingestor.py b/tests/ingestors/test_news_ingestor.py new file mode 100644 index 00000000..46b5074b --- /dev/null +++ b/tests/ingestors/test_news_ingestor.py @@ -0,0 +1,92 @@ +# import asyncio +# from pathlib import Path +# import pytest +# import os +# import uuid +# from dotenv import load_dotenv + +# from querent.config.collector.collector_config import ( +# FSCollectorConfig, +# DriveCollectorConfig, +# NewsCollectorConfig, +# SlackCollectorConfig, +# ) +# from querent.common.uri import Uri +# from querent.ingestors.ingestor_manager import IngestorFactoryManager +# from querent.collectors.collector_resolver import CollectorResolver +# from querent.common.types.ingested_tokens import IngestedTokens + + +# load_dotenv() + +# # Assuming you've set your News API key in an environment variable called NEWS_API_KEY +# NEWS_API_KEY = os.getenv("NEWS_API_KEY") + +# def news_collector_config(): +# # Example configuration for collecting news about "technology" +# return NewsCollectorConfig( +# config_source={ +# "id": str(uuid.uuid4()), +# "name": "News API", +# "api_key":NEWS_API_KEY, +# "query":"Tesla", +# "from_date":"2024-03-01", +# "to_date":"2024-03-10", +# "language":"en", +# "sort_by":"publishedAt", +# "page_size":5, +# "page":1, +# "config": {}, +# "uri": "news://" +# } +# ) + + +# @pytest.mark.asyncio +# async def test_multiple_collectors_all_async(): +# # Set up the collectors +# collectors = [ +# CollectorResolver().resolve( +# Uri("news://"), +# news_collector_config(), +# ) +# ] + +# for collector in collectors: +# await collector.connect() + +# # Set up the result queue +# result_queue = asyncio.Queue() + +# # Create the IngestorFactoryManager +# ingestor_factory_manager = IngestorFactoryManager( +# collectors=collectors, result_queue=result_queue +# ) + +# # Start the ingest_all_async in a separate task +# ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async()) + +# # Wait for the task to complete +# await asyncio.gather(ingest_task) + +# # Optionally, check the result_queue for ingested data +# counter = 0 +# unique_files = set() +# messages = 0 +# while not result_queue.empty(): +# ingested_data = await result_queue.get() +# if ingested_data is not None: +# if ( +# isinstance(ingested_data, IngestedTokens) +# and ingested_data.is_token_stream +# ): +# messages += 1 +# else: +# unique_files.add(ingested_data.file) +# counter += 1 +# assert counter > 0 + + + +# if __name__ == "__main__": +# asyncio.run(test_multiple_collectors_all_async())