Skip to content

Commit

Permalink
some rudimentary logic for realtime news (#273)
Browse files Browse the repository at this point in the history
* some rudimentary logic for realtime news

* Improved news ingestor

* Changed file name

---------

Co-authored-by: Ansh5461 <[email protected]>
  • Loading branch information
saraswatpuneet and Ansh5461 authored Mar 29, 2024
1 parent 894f84d commit f15d3bb
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 26 deletions.
125 changes: 100 additions & 25 deletions querent/collectors/news/news_collector.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import asyncio
import datetime
import uuid
from newsapi import NewsApiClient
from typing import AsyncGenerator
from querent.common.types.collected_bytes import CollectedBytes
from querent.config.collector.collector_config import CollectorConfig, CollectorBackend, NewsCollectorConfig
from querent.config.collector.collector_config import (
CollectorBackend,
NewsCollectorConfig,
)
from querent.collectors.collector_base import Collector
from querent.collectors.collector_factory import CollectorFactory
from querent.common.uri import Uri
from querent.logging.logger import setup_logger


class NewsCollector(Collector):
"""Enhanced class for collecting news articles using NewsAPI with advanced search capabilities and pagination."""

Expand All @@ -26,9 +32,79 @@ async def disconnect(self):

async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
current_page = 1
total_pages = float('inf') # Assume an infinite number of pages initially
total_pages = float("inf") # Assume an infinite number of pages initially

# Adjust to_date for special cases to the current date
should_keep_going = False
if self.config.to_date in ["now", "latest", "", "present"]:
should_keep_going = True

if should_keep_going:
while should_keep_going:
if should_keep_going:
self.config.to_date = datetime.datetime.now().strftime("%Y-%m-%d")
while current_page <= total_pages:
try:
response = self.newsapi.get_everything(
q=self.config.query,
sources=self.config.sources,
domains=self.config.domains,
exclude_domains=self.config.exclude_domains,
from_param=self.config.from_date,
to=self.config.to_date,
language=self.config.language,
sort_by=self.config.sort_by,
page_size=self.config.page_size,
page=current_page,
)
if response["status"] == "ok":
articles = response.get("articles", [])
if not articles:
# If no articles found, consider sleeping for a shorter duration
await asyncio.sleep(3600) # Example: 1 hour
continue

for article in articles:
article_data = {
"source": article.get('source', {}).get('name'),
"author": article.get('author'),
"title": article.get('title'),
"description": article.get('description'),
"url": article.get('url'),
"urlToImage": article.get('urlToImage'),
"publishedAt": article.get('publishedAt'),
"content": article.get('content')
}
publish_date = article.get('publishedAt').split('T')[0]

title = article['title']
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"))
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True)

total_results = response.get("totalResults", 0)
total_pages = (
total_results + self.config.page_size - 1
) // self.config.page_size
current_page += 1
else:
self.logger.error(
f"News API request failed: {response.get('message')}"
)
break
except Exception as e:
self.logger.error(f"Error fetching news articles: {e}")
yield CollectedBytes(file="Error", data=None, error=e)
break

while current_page <= total_pages:
# After exhausting the current batch, reset for next polling cycle
current_page = 1
total_pages = float("inf")
if not should_keep_going:
break
# Adjust the to_date for the next cycle
self.config.from_date = self.config.to_date
await asyncio.sleep(86400)
else:
try:
response = self.newsapi.get_everything(
q=self.config.query,
Expand All @@ -40,35 +116,34 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
language=self.config.language,
sort_by=self.config.sort_by,
page_size=self.config.page_size,
page=current_page
page=current_page,
)
if response["status"] == "ok":
articles = response.get("articles", [])
if not articles:
return

if response['status'] == 'ok':
articles = response.get('articles', [])
for article in articles:
article_data = {
"source": article.get('source', {}).get('name'),
"author": article.get('author'),
"title": article.get('title'),
"description": article.get('description'),
"url": article.get('url'),
"urlToImage": article.get('urlToImage'),
"publishedAt": article.get('publishedAt'),
"content": article.get('content')
"source": article.get('source', {}).get('name'),
"author": article.get('author'),
"title": article.get('title'),
"description": article.get('description'),
"url": article.get('url'),
"urlToImage": article.get('urlToImage'),
"publishedAt": article.get('publishedAt'),
"content": article.get('content')
}
yield CollectedBytes(file=article["title"], data=str(article_data).encode("utf-8"))
yield CollectedBytes(file=article["title"], data=None, error=None, eof=True)

total_results = response.get('totalResults', 0)
total_pages = (total_results + self.config.page_size - 1) // self.config.page_size # Calculate total pages
current_page += 1
else:
self.logger.error(f"News API request failed: {response.get('message')}")
break

publish_date = article.get('publishedAt').split('T')[0]
title = article['title']
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=str(article_data).encode("utf-8"))
yield CollectedBytes(file=f"{self.config.query}_{publish_date}.news", data=None, error=None, eof=True)
except Exception as e:
self.logger.error(f"Error fetching news articles: {e}")
yield CollectedBytes(file="Error", data=None, error=e)
break




class NewsCollectorFactory(CollectorFactory):
def backend(self) -> CollectorBackend:
Expand Down
1 change: 1 addition & 0 deletions querent/config/ingestor/ingestor_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ class IngestorBackend(str, Enum):
Email = "email"
Slack = "slack"
Jira = "jira"
News = "news"
1 change: 1 addition & 0 deletions querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
IngestorBackend.Slack.value: TextIngestorFactory(is_token_stream=True),
IngestorBackend.Email.value: EmailIngestorFactory(),
IngestorBackend.Jira.value: JsonIngestorFactory(),
IngestorBackend.News.value: TextIngestorFactory(is_token_stream=True)
# Add more mappings as needed
}
self.file_caches = LRUCache(maxsize=cache_size)
Expand Down
2 changes: 1 addition & 1 deletion querent/ingestors/texts/text_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class TextIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"txt", "slack", ""}
SUPPORTED_EXTENSIONS = {"txt", "slack", "", "news"}

def __init__(self, is_token_stream=False):
self.is_token_stream = is_token_stream
Expand Down
21 changes: 21 additions & 0 deletions querent/storage/gcs_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from google.cloud import storage

def list_gcs_files(bucket_name):
"""Lists all the files in a GCS bucket."""
# Create a storage client.
storage_client = storage.Client()

# Get the GCS bucket.
bucket = storage_client.get_bucket(bucket_name)

# List all objects in the bucket.
blobs = bucket.list_blobs()

# Print the names of the objects in the bucket.
for blob in blobs:
print(blob.name)

if __name__ == "__main__":
# Set your GCS bucket name here.
bucket_name = 'querent-test'
list_gcs_files(bucket_name)
92 changes: 92 additions & 0 deletions tests/ingestors/test_news_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# import asyncio
# from pathlib import Path
# import pytest
# import os
# import uuid
# from dotenv import load_dotenv

# from querent.config.collector.collector_config import (
# FSCollectorConfig,
# DriveCollectorConfig,
# NewsCollectorConfig,
# SlackCollectorConfig,
# )
# from querent.common.uri import Uri
# from querent.ingestors.ingestor_manager import IngestorFactoryManager
# from querent.collectors.collector_resolver import CollectorResolver
# from querent.common.types.ingested_tokens import IngestedTokens


# load_dotenv()

# # Assuming you've set your News API key in an environment variable called NEWS_API_KEY
# NEWS_API_KEY = os.getenv("NEWS_API_KEY")

# def news_collector_config():
# # Example configuration for collecting news about "technology"
# return NewsCollectorConfig(
# config_source={
# "id": str(uuid.uuid4()),
# "name": "News API",
# "api_key":NEWS_API_KEY,
# "query":"Tesla",
# "from_date":"2024-03-01",
# "to_date":"2024-03-10",
# "language":"en",
# "sort_by":"publishedAt",
# "page_size":5,
# "page":1,
# "config": {},
# "uri": "news://"
# }
# )


# @pytest.mark.asyncio
# async def test_multiple_collectors_all_async():
# # Set up the collectors
# collectors = [
# CollectorResolver().resolve(
# Uri("news://"),
# news_collector_config(),
# )
# ]

# for collector in collectors:
# await collector.connect()

# # Set up the result queue
# result_queue = asyncio.Queue()

# # Create the IngestorFactoryManager
# ingestor_factory_manager = IngestorFactoryManager(
# collectors=collectors, result_queue=result_queue
# )

# # Start the ingest_all_async in a separate task
# ingest_task = asyncio.create_task(ingestor_factory_manager.ingest_all_async())

# # Wait for the task to complete
# await asyncio.gather(ingest_task)

# # Optionally, check the result_queue for ingested data
# counter = 0
# unique_files = set()
# messages = 0
# while not result_queue.empty():
# ingested_data = await result_queue.get()
# if ingested_data is not None:
# if (
# isinstance(ingested_data, IngestedTokens)
# and ingested_data.is_token_stream
# ):
# messages += 1
# else:
# unique_files.add(ingested_data.file)
# counter += 1
# assert counter > 0



# if __name__ == "__main__":
# asyncio.run(test_multiple_collectors_all_async())

0 comments on commit f15d3bb

Please sign in to comment.