From 63d5beef899cf3fc63e58d09c1c583892ad47489 Mon Sep 17 00:00:00 2001 From: anneum <60262966+anneum@users.noreply.github.com> Date: Thu, 21 Dec 2023 15:24:55 +0100 Subject: [PATCH] Use the rate limits from the header to better control the waiting times (#3) Co-authored-by: sx3eanne --- pyproject.toml | 2 +- src/sparta/twitterapi/rate_limiter.py | 70 +++++++++++ src/sparta/twitterapi/tweets/full_search.py | 107 ++++++++-------- src/sparta/twitterapi/tweets/quote_tweets.py | 45 +++---- src/sparta/twitterapi/tweets/recent_search.py | 110 ++++++++-------- src/sparta/twitterapi/tweets/retweets.py | 47 +++---- src/sparta/twitterapi/tweets/tweets.py | 52 ++++---- src/sparta/twitterapi/users/follower.py | 53 ++++++-- src/sparta/twitterapi/users/user.py | 117 +++++++++++------- 9 files changed, 374 insertions(+), 229 deletions(-) create mode 100644 src/sparta/twitterapi/rate_limiter.py diff --git a/pyproject.toml b/pyproject.toml index 4968907..60f4b4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sparta-twitterapi" -version = "0.1.53" +version = "0.1.54" description = "" authors = ["Andreas Neumeier ", "Jasmin Riedl ", "Lukas Metzner ", "Benedikt Radtke "] readme = "README.md" diff --git a/src/sparta/twitterapi/rate_limiter.py b/src/sparta/twitterapi/rate_limiter.py new file mode 100644 index 0000000..007f8b2 --- /dev/null +++ b/src/sparta/twitterapi/rate_limiter.py @@ -0,0 +1,70 @@ +import asyncio +import logging +import time +from typing import Dict, Optional + +logger = logging.getLogger(__name__) + + +class RateLimiter: + """A utility class for handling rate limiting in API requests. + + This class provides methods to check and handle rate limits as provided + by the API, typically through HTTP headers. It tracks the number of requests + remaining before hitting the rate limit and the time when the rate limit + will be reset. + + Attributes: + remaining (Optional[int]): The number of requests left for the rate limit window. + Defaults to None. + reset_time (Optional[int]): The UTC epoch time in seconds when the rate limit + will be reset. Defaults to None. + + Methods: + wait_for_limit_reset: Asynchronously waits until the rate limit is reset if + the limit has been reached. + update_limits(headers): Updates the rate limit remaining and reset time based + on the HTTP headers from a response. + should_wait(): Determines whether it is necessary to wait for the rate limit + reset based on the remaining requests. + """ + + def __init__(self) -> None: + self.remaining: Optional[int] = None + self.reset_time: Optional[int] = None + + async def wait_for_limit_reset(self) -> None: + """Asynchronously waits until the rate limit is reset. + + This method calculates the time to wait based on the current time and the + reset time of the rate limit. It then pauses execution for that duration, + effectively throttling the rate of API requests. + + Raises: + Warning: If the reset time has passed but the method is called. + """ + if self.reset_time is not None: + wait_time = max(self.reset_time - int(time.time()), 1) # Warte mindestens 1 Sekunde + logger.warning(f"Rate limit exceeded. Waiting {wait_time} seconds.") + await asyncio.sleep(wait_time) + + def update_limits(self, headers: Dict[str, str]) -> None: + """Updates the rate limit information based on the response headers. + + Args: + headers (Dict[str, str]): A dictionary of HTTP headers from an API response. + + This method extracts the 'x-rate-limit-remaining' and 'x-rate-limit-reset' + values from the headers and updates the internal state of the rate limiter. + """ + self.remaining = int(headers.get("x-rate-limit-remaining", 1)) + self.reset_time = int(headers.get("x-rate-limit-reset", 0)) + + def should_wait(self) -> bool: + """Determines if waiting for rate limit reset is necessary. + + Returns: + bool: True if the remaining number of requests is zero and it's necessary + to wait until the rate limit is reset. False otherwise. + """ + return self.remaining == 0 diff --git a/src/sparta/twitterapi/tweets/full_search.py b/src/sparta/twitterapi/tweets/full_search.py index 53c3343..2853d3c 100644 --- a/src/sparta/twitterapi/tweets/full_search.py +++ b/src/sparta/twitterapi/tweets/full_search.py @@ -42,7 +42,6 @@ """ import asyncio -import json import logging import os from datetime import datetime @@ -52,6 +51,7 @@ from sparta.twitterapi.models.tweet_response import TweetResponse from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsCountsAllResponse, SearchCount +from sparta.twitterapi.rate_limiter import RateLimiter from sparta.twitterapi.tweets.constants import EXPANSIONS, MEDIA_FIELDS, PLACE_FIELDS, POLL_FIELDS, TWEET_FIELDS, USER_FIELDS logger = logging.getLogger(__name__) @@ -70,29 +70,29 @@ async def get_full_search( until_id: str = None, sort_order: str = None, ) -> AsyncGenerator[TweetResponse, None]: - """Returns Tweets that match a search query. If no time or id parameters are specified, the last 30 days are assumed. + """Asynchronously retrieves tweets that match a specified search query. + + This function queries the Twitter API to find tweets matching the given search criteria. It handles rate limiting using an internal instance of + RateLimiter, automatically pausing requests if the rate limit is exceeded. Args: - query (str): One query/rule/filter for matching Tweets. Refer to https://t.co/rulelength to identify the max query length. - How to build a rule: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule - start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive - (i.e. 12:00:01 includes the first second of the minute). Defaults to None. - end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive - (i.e. 12:00:01 excludes the first second of the minute). Defaults to None. - since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None. - until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None. - sort_order (str, optional): This order in which to return results. Possible options: recency and relevancy. Defaults to None. + query (str): The search query for matching Tweets. Refer to Twitter API documentation for details on query format and limitations. + start_time (datetime, optional): The oldest UTC timestamp from which tweets will be provided. Inclusive and in second granularity. + end_time (datetime, optional): The newest UTC timestamp to which tweets will be provided. Exclusive and in second granularity. + since_id (str, optional): Returns results with a Tweet ID greater than this ID. + until_id (str, optional): Returns results with a Tweet ID less than this ID. + sort_order (str, optional): The order in which to return results (e.g., 'recency' or 'relevancy'). + Yields: + TweetResponse: An object representing the tweet data for each tweet that matches the query. Raises: - Exception: Cannot get the search result due to an http error. - - Returns: - AsyncGenerator[TweetResponse, None]: AsyncGenerator that yields TweetResponses. + Exception: If an HTTP error occurs that prevents retrieving the tweets or if the query parameters are invalid. - Yields: - Iterator[AsyncGenerator[TweetResponse, None]]: A TweetResponse Object. + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "query": query, @@ -120,34 +120,29 @@ async def get_full_search( logger.debug(f"search recent params={params}") async with session.get("https://api.twitter.com/2/tweets/search/all", params=params) as response: if response.status == 400: - logger.error(f"Cannot search tweets (HTTP {response.status}): {await response.text()}") + logger.error(f"Cannot search full tweets (HTTP {response.status}): {await response.text()}") raise Exception + rate_limiter.update_limits(dict(response.headers)) + if response.status == 429: - logger.warn(f"{response.status} Too Many Requests. Sleep for 5 second...") - await asyncio.sleep(5) + await rate_limiter.wait_for_limit_reset() continue + if not response.ok: - logger.error(f"Cannot search tweets (HTTP {response.status}): {await response.text()}") - await asyncio.sleep(5) + logger.error(f"Cannot search full tweets (HTTP {response.status}): {await response.text()}") + await asyncio.sleep(10) continue - response_text = await response.text() - response_json = json.loads(response_text) + response_json = await response.json() for tweet in response_json.get("data", []): yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {})) - # try: - # Get2TweetsSearchAllResponse.model_validate(response_json) - # except Exception as e: - # logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") - # # logger.warn(response_text) - - if "next_token" in response_json["meta"]: - params["next_token"] = response_json["meta"]["next_token"] + if "next_token" in response_json.get("meta"): + params["next_token"] = response_json.get("meta").get("next_token") else: - return + break async def get_full_search_count( @@ -158,29 +153,32 @@ async def get_full_search_count( until_id: str = None, granularity: str = "hour", ) -> AsyncGenerator[SearchCount, None]: - """Returns the number of tweets that match a query according to a granularity (e.g. hourwise) over a given time period. If no time or id parameters are - specified, the last 30 days are assumed. + """Asynchronously retrieves the count of tweets matching a specified search query, aggregated according to a specified granularity (e.g., hourly). + + This function queries the Twitter API to count tweets matching the given search criteria and aggregates the counts based on the specified granularity. + It handles rate limiting using an internal instance of RateLimiter, automatically pausing requests if the rate limit is exceeded. Args: - query (str): One query/rule/filter for matching Tweets. Refer to https://t.co/rulelength to identify the max query length. - How to build a rule: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule - start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive - (i.e. 12:00:01 includes the first second of the minute). Defaults to None. - end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive - (i.e. 12:00:01 excludes the first second of the minute). Defaults to None. - since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None. - until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None. - granularity (str, optional): The granularity for the search counts results. Defaults to 'hour' - - Returns: - AsyncGenerator[SearchCount, None]: AsyncGenerator that yields a Twitter SearchCounts. + query (str): The search query for matching Tweets. Refer to Twitter API documentation for query format and limitations. + start_time (datetime, optional): The oldest UTC timestamp from which tweet counts will be provided. Inclusive and in second granularity. + end_time (datetime, optional): The newest UTC timestamp to which tweet counts will be provided. Exclusive and in second granularity. + since_id (str, optional): Returns results with a Tweet ID greater than this ID. + until_id (str, optional): Returns results with a Tweet ID less than this ID. + granularity (str, optional): The granularity for the search counts results ('minute', 'hour', or 'day'). Defaults to 'hour'. Yields: - Iterator[AsyncGenerator[SearchCount, None]]: A Twitter SearchCount Object. + SearchCount: An object representing the tweet count data for each interval according to the specified granularity. + + Raises: + Exception: If an invalid granularity is specified or if an HTTP error occurs that prevents retrieving the tweet counts. + + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ if granularity not in ["minute", "hour", "day"]: raise Exception(f"Wrong granularity. Given granularity: {granularity}. Possible values = minute, hour, day") + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "query": query, @@ -200,17 +198,18 @@ async def get_full_search_count( logger.debug(f"search recent params={params}") async with session.get("https://api.twitter.com/2/tweets/counts/all", params=params) as response: if response.status == 400: - logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") + logger.error(f"Cannot get full tweet count (HTTP {response.status}): {await response.text()}") raise Exception + rate_limiter.update_limits(dict(response.headers)) + if response.status == 429: - logger.warn("429 Too Many Requests.") - await asyncio.sleep(5) + await rate_limiter.wait_for_limit_reset() continue if not response.ok: - logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") - await asyncio.sleep(5) + logger.error(f"Cannot search full tweet counts (HTTP {response.status}): {await response.text()}") + await asyncio.sleep(10) continue counts = Get2TweetsCountsAllResponse.model_validate_json(await response.text()) @@ -221,4 +220,4 @@ async def get_full_search_count( if counts.meta and counts.meta.next_token: params["next_token"] = counts.meta.next_token else: - return + break diff --git a/src/sparta/twitterapi/tweets/quote_tweets.py b/src/sparta/twitterapi/tweets/quote_tweets.py index 5878d77..da8ae3f 100644 --- a/src/sparta/twitterapi/tweets/quote_tweets.py +++ b/src/sparta/twitterapi/tweets/quote_tweets.py @@ -25,7 +25,6 @@ """ import asyncio -import json import logging import os from datetime import datetime @@ -34,6 +33,7 @@ import aiohttp from sparta.twitterapi.models.tweet_response import TweetResponse +from sparta.twitterapi.rate_limiter import RateLimiter # from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsIdQuoteTweetsResponse from sparta.twitterapi.tweets.constants import EXPANSIONS, MEDIA_FIELDS, PLACE_FIELDS, POLL_FIELDS, TWEET_FIELDS, USER_FIELDS @@ -53,27 +53,29 @@ async def get_quote_tweets( since_id: str = None, until_id: str = None, ) -> AsyncGenerator[TweetResponse, None]: - """Returns quoted Tweets. + """Asynchronously retrieves tweets quoting a specified tweet. + + This function queries the Twitter API to find tweets that are quote tweets of the tweet corresponding to the given ID. It handles rate limiting using an + internal instance of RateLimiter, automatically pausing requests if the rate limit is exceeded. The function also supports time-based filtering and + pagination. Args: - id (str): A tweet ID for which to retrieve quote tweets. - start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive - (i.e. 12:00:01 includes the first second of the minute). Defaults to None. - end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive - (i.e. 12:00:01 excludes the first second of the minute). Defaults to None. - since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None. - until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None. + id (str): The ID of the tweet for which to retrieve quote tweets. + start_time (datetime, optional): The oldest UTC timestamp from which quote tweets will be provided. Inclusive and in second granularity. + end_time (datetime, optional): The newest UTC timestamp to which quote tweets will be provided. Exclusive and in second granularity. + since_id (str, optional): Returns quote tweets with an ID greater than this ID. + until_id (str, optional): Returns quote tweets with an ID less than this ID. + Yields: + TweetResponse: An object representing a tweet that quotes the specified tweet. Raises: - Exception: Cannot get the search result due to an http error. - - Returns: - AsyncGenerator[TweetResponse, None]: AsyncGenerator that yields TweetResponses. + Exception: If an HTTP error occurs that prevents retrieving the quote tweets or if the tweet ID is invalid. - Yields: - Iterator[AsyncGenerator[TweetResponse, None]]: A TweetResponse Object. + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "tweet.fields": TWEET_FIELDS, @@ -101,17 +103,18 @@ async def get_quote_tweets( logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") raise Exception + rate_limiter.update_limits(dict(response.headers)) + if response.status == 429: - logger.warn(f"{response.status} Too Many Requests. Sleep for 1 minute...") - await asyncio.sleep(60) + await rate_limiter.wait_for_limit_reset() continue + if not response.ok: logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") - await asyncio.sleep(5) + await asyncio.sleep(10) continue - response_text = await response.text() - response_json = json.loads(response_text) + response_json = await response.json() for tweet in response_json.get("data", []): yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {})) @@ -125,7 +128,7 @@ async def get_quote_tweets( if "errors" in response_json: logger.warn(f'Errors: {response_json["errors"]}') if "next_token" in response_json.get("meta", {}): - params["pagination_token"] = response_json["meta"]["next_token"] + params["pagination_token"] = response_json.get("meta") else: break except Exception as e: diff --git a/src/sparta/twitterapi/tweets/recent_search.py b/src/sparta/twitterapi/tweets/recent_search.py index acd10f7..07aebd5 100644 --- a/src/sparta/twitterapi/tweets/recent_search.py +++ b/src/sparta/twitterapi/tweets/recent_search.py @@ -52,7 +52,8 @@ import aiohttp from sparta.twitterapi.models.tweet_response import TweetResponse -from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsCountsRecentResponse, Get2TweetsSearchRecentResponse, SearchCount +from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsCountsRecentResponse, SearchCount +from sparta.twitterapi.rate_limiter import RateLimiter from sparta.twitterapi.tweets.constants import EXPANSIONS, MEDIA_FIELDS, PLACE_FIELDS, POLL_FIELDS, TWEET_FIELDS, USER_FIELDS logger = logging.getLogger(__name__) @@ -71,28 +72,31 @@ async def get_recent_search( until_id: str = None, sort_order: str = None, ) -> AsyncGenerator[TweetResponse, None]: - """Get tweets from the last 7 days (maximum) that match the query. + """Asynchronously retrieves tweets from the last 7 days that match the specified query. + + This function queries the Twitter API to find tweets matching the given search criteria within the last 7 days. It uses an internal instance of + RateLimiter to handle rate limiting, automatically pausing requests if the rate limit is exceeded. Args: - query (str): One query/rule/filter for matching Tweets. Refer to https://t.co/rulelength to identify the max query length. - How to build a rule: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule - start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive - (i.e. 12:00:01 includes the first second of the minute). Defaults to None. - end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive - (i.e. 12:00:01 excludes the first second of the minute). Defaults to None. - since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None. - until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None. - sort_order (str, optional): This order in which to return results. Possible options: recency and relevancy. Defaults to None. + query (str): The search query for matching Tweets. Refer to https://t.co/rulelength to identify the max query length. How to build a rule: + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule + start_time (datetime, optional): The oldest UTC timestamp from which tweets will be provided. Inclusive and in second granularity. + end_time (datetime, optional): The newest UTC timestamp to which tweets will be provided. Exclusive and in second granularity. + since_id (str, optional): Returns results with a Tweet ID greater than this ID. + until_id (str, optional): Returns results with a Tweet ID less than this ID. + sort_order (str, optional): The order in which to return results (e.g., 'recency' or 'relevancy'). Defaults to None. - Raises: - Exception: Cannot get the search result due to an http error. + Yields: + TweetResponse: An object representing the tweet data for each tweet that matches the query. - Returns: - AsyncGenerator[TweetResponse, None]: AsyncGenerator that yields TweetResponses. + Raises: + Exception: If an HTTP error occurs that prevents retrieving the tweets or if the query parameters are invalid. - Yields: - Iterator[AsyncGenerator[TweetResponse, None]]: A TweetResponse Object. + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + + rate_limiter = RateLimiter() next_token: Optional[str] = None async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { @@ -127,31 +131,26 @@ async def get_recent_search( logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") raise Exception + rate_limiter.update_limits(dict(response.headers)) + if response.status == 429: - logger.warn("429 Too Many Requests. Sleep for 5 second...") - await asyncio.sleep(5) + await rate_limiter.wait_for_limit_reset() continue + if not response.ok: logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") - await asyncio.sleep(5) + await asyncio.sleep(10) continue - response_text = await response.text() - response_json = json.loads(response_text) + response_json = await response.json() for tweet in response_json.get("data", []): yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {})) - try: - Get2TweetsSearchRecentResponse.model_validate(response_json) - except Exception as e: - logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") - # logger.warn(response_text) - - if "next_token" in response_json["meta"]: - params["next_token"] = response_json["meta"]["next_token"] + if "next_token" in response_json.get("meta"): + params["next_token"] = response_json.get("meta").get("next_token") else: - return + break async def get_recent_search_count( @@ -162,29 +161,35 @@ async def get_recent_search_count( until_id: str = None, granularity: str = None, ) -> AsyncGenerator[SearchCount, None]: - """Returns the number of tweets that match a search query that match a query according to a granularity (e.g. hourwise) over a given time period (maximum - from the last 7 days). + """Asynchronously retrieves the count of tweets matching a specified search query from the last 7 days, aggregated according to a specified granularity. + + This function queries the Twitter API to count tweets matching the given search criteria within the last 7 days and aggregates the counts based on the + specified granularity. It uses an internal instance of RateLimiter to handle rate limiting, automatically pausing requests if the rate limit is exceeded. Args: - query (str): One query/rule/filter for matching Tweets. Refer to https://t.co/rulelength to identify the max query length. - How to build a rule: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule - start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive - (i.e. 12:00:01 includes the first second of the minute). Defaults to None. - end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive - (i.e. 12:00:01 excludes the first second of the minute). Defaults to None. - since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None. - until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None. - granularity (str, optional): The granularity for the search counts results. Defaults to 'hour' - - Returns: - AsyncGenerator[SearchCount, None]: AsyncGenerator that yields a Twitter SearchCounts. + query (str): The search query for matching Tweets. Refer to https://t.co/rulelength to identify the max query length. How to build a rule: + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule + start_time (datetime, optional): The oldest UTC timestamp from which tweet counts will be provided. Inclusive and in second granularity. + end_time (datetime, optional): The newest UTC timestamp to which tweet counts will be provided. Exclusive and in second granularity. + since_id (str, optional): Returns results with a Tweet ID greater than this ID. + until_id (str, optional): Returns results with a Tweet ID less than this ID. + granularity (str, optional): The granularity for the search counts results (e.g., 'minute', 'hour', or 'day'). Yields: - Iterator[AsyncGenerator[SearchCount, None]]: A Twitter SearchCount Object. + SearchCount: An object representing the tweet count data for each interval according to the specified granularity. + + Raises: + Exception: If an invalid granularity is specified, if an HTTP error occurs that prevents retrieving the tweet counts, or if the query parameters are + invalid. + + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + if granularity not in ["minute", "hour", "day"]: raise Exception(f"Wrong granularity. Given granularity: {granularity}. Possible values = minute, hour, day") + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "query": query, @@ -204,17 +209,18 @@ async def get_recent_search_count( logger.debug(f"search recent params={params}") async with session.get("https://api.twitter.com/2/tweets/counts/recent", params=params) as response: if response.status == 400: - logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") + logger.error(f"Cannot get recent tweet counts (HTTP {response.status}): {await response.text()}") raise Exception + rate_limiter.update_limits(dict(response.headers)) + if response.status == 429: - logger.warn("429 Too Many Requests.") - await asyncio.sleep(5) + await rate_limiter.wait_for_limit_reset() continue if not response.ok: - logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") - await asyncio.sleep(5) + logger.error(f"Cannot get recent tweet counts (HTTP {response.status}): {await response.text()}") + await asyncio.sleep(10) continue response_text = await response.text() @@ -228,4 +234,4 @@ async def get_recent_search_count( if counts.meta and counts.meta.next_token: params["next_token"] = counts.meta.next_token else: - return + break diff --git a/src/sparta/twitterapi/tweets/retweets.py b/src/sparta/twitterapi/tweets/retweets.py index 0ebcdbb..c3ef0c0 100644 --- a/src/sparta/twitterapi/tweets/retweets.py +++ b/src/sparta/twitterapi/tweets/retweets.py @@ -20,7 +20,6 @@ """ import asyncio -import json import logging import os from typing import AsyncGenerator, Dict, Optional @@ -28,6 +27,7 @@ import aiohttp from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsIdRetweetedByResponse, User +from sparta.twitterapi.rate_limiter import RateLimiter from sparta.twitterapi.tweets.constants import TWEET_FIELDS, USER_EXPANSIONS, USER_FIELDS logger = logging.getLogger(__name__) @@ -39,21 +39,25 @@ async def get_retweets(id: str) -> AsyncGenerator[User, None]: - """Get tweets from the last 7 days (maximum) that match the query. + """Asynchronously retrieves users who have retweeted a specified tweet. + + This function queries the Twitter API to find users who have retweeted the tweet corresponding to the given ID. It handles rate limiting using an internal + instance of RateLimiter, automatically pausing requests if the rate limit is exceeded. The function also handles pagination automatically if there are more + results than can be returned in a single response. Args: - id (str): "A single Tweet ID. + id (str): The ID of the tweet for which to retrieve retweeters. - Raises: - Exception: Cannot get the search result due to an http error. - Exception: User not found error. + Yields: + User: An object representing a Twitter user who has retweeted the specified tweet. - Returns: - AsyncGenerator[User, None]: AsyncGenerator that yields Twitter User objects. + Raises: + Exception: If an HTTP error occurs that prevents retrieving the retweets or if the tweet ID is invalid. - Yields: - Iterator[AsyncGenerator[User, None]]: A Twitter User object. + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + rate_limiter = RateLimiter() pagination_token: Optional[str] = None async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { @@ -70,20 +74,21 @@ async def get_retweets(id: str) -> AsyncGenerator[User, None]: logger.debug(f"search recent params={params}") async with session.get(f"https://api.twitter.com/2/tweets/{id}/retweeted_by", params=params) as response: if response.status == 400: - logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") + logger.error(f"Cannot search retweets (HTTP {response.status}): {await response.text()}") raise Exception + rate_limiter.update_limits(dict(response.headers)) + if response.status == 429: - logger.warn("429 Too Many Requests. Sleep for 5 second...") - await asyncio.sleep(5) + await rate_limiter.wait_for_limit_reset() continue + if not response.ok: - logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}") - await asyncio.sleep(5) + logger.error(f"Cannot search retweets (HTTP {response.status}): {await response.text()}") + await asyncio.sleep(10) continue - response_text = await response.text() - response_json = json.loads(response_text) + response_json = await response.json() for user in response_json.get("data", []): yield User.model_validate(user) @@ -91,10 +96,10 @@ async def get_retweets(id: str) -> AsyncGenerator[User, None]: try: Get2TweetsIdRetweetedByResponse.model_validate(response_json) except Exception as e: - logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") + logger.warning(f"Inconsistent twitter OpenAPI documentation {e}") # logger.warn(response_text) - if "next_token" in response_json["meta"]: - params["pagination_token"] = response_json["meta"]["next_token"] + if "next_token" in response_json.get("meta"): + params["pagination_token"] = response_json.get("meta").get("next_token") else: - return + break diff --git a/src/sparta/twitterapi/tweets/tweets.py b/src/sparta/twitterapi/tweets/tweets.py index 8f91ad2..14f8c0a 100644 --- a/src/sparta/twitterapi/tweets/tweets.py +++ b/src/sparta/twitterapi/tweets/tweets.py @@ -22,7 +22,6 @@ print(json.dumps(tweet_response.includes)) """ -import json import logging import os from typing import AsyncGenerator, Dict, List @@ -30,6 +29,7 @@ import aiohttp from sparta.twitterapi.models.tweet_response import TweetResponse +from sparta.twitterapi.rate_limiter import RateLimiter from sparta.twitterapi.tweets.constants import EXPANSIONS, MEDIA_FIELDS, PLACE_FIELDS, POLL_FIELDS, TWEET_FIELDS, USER_FIELDS logger = logging.getLogger(__name__) @@ -41,20 +41,24 @@ async def get_tweets_by_id(ids: List[str]) -> AsyncGenerator[TweetResponse, None]: - """Returns Tweets specified by the requested ID. + """Asynchronously retrieves tweets by their IDs. - Args: - ids (List[str]): A list of Tweet IDs. Up to 100 are allowed in a single request. + This function handles the retrieval of tweets from Twitter's API based on a list of tweet IDs. It respects the rate limiting by utilizing an internal + RateLimiter instance. If the rate limit is exceeded, the function will automatically wait until it can proceed with requests. - Raises: - Exception: Cannot get the search result due to an http error. + Args: + ids (List[str]): A list of tweet IDs for which to retrieve tweets. Up to 100 IDs can be included in a single request. Returns: - AsyncGenerator[TweetResponse, None]: AsyncGenerator that yields TweetResponses. + AsyncGenerator[TweetResponse, None]: An asynchronous generator that yields TweetResponse objects for each tweet. + + Raises: + Exception: If an HTTP error occurs that prevents retrieving the tweets. Yields: - Iterator[AsyncGenerator[TweetResponse, None]]: A TweetResponse Object. + TweetResponse: An object representing the tweet data for each given tweet ID. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "ids": ",".join(ids), @@ -66,18 +70,20 @@ async def get_tweets_by_id(ids: List[str]) -> AsyncGenerator[TweetResponse, None "place.fields": PLACE_FIELDS, } logger.debug(f"search recent params={params}") - async with session.get("https://api.twitter.com/2/tweets", params=params) as response: - if not response.ok: - raise Exception(f"Cannot search tweets {params} (HTTP {response.status}): {await response.text()}") - - response_text = await response.text() - response_json = json.loads(response_text) - - for tweet in response_json.get("data", []): - yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {})) - try: - # Get2TweetsResponse.model_validate(response_json) - pass - except Exception as e: - logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") - logger.warn(response_text) + while True: + async with session.get("https://api.twitter.com/2/tweets", params=params) as response: + rate_limiter.update_limits(dict(response.headers)) + + if response.status == 429: + await rate_limiter.wait_for_limit_reset() + continue + + if not response.ok: + raise Exception(f"Cannot search tweets {params} (HTTP {response.status}): {await response.text()}") + + response_json = await response.json() + + for tweet in response_json.get("data", []): + yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {})) + + break diff --git a/src/sparta/twitterapi/users/follower.py b/src/sparta/twitterapi/users/follower.py index cd7b6e8..8655e4b 100644 --- a/src/sparta/twitterapi/users/follower.py +++ b/src/sparta/twitterapi/users/follower.py @@ -29,7 +29,7 @@ print(user.model_dump_json()) """ -import json +import asyncio import logging import os from typing import AsyncGenerator, Dict @@ -37,6 +37,7 @@ import aiohttp from sparta.twitterapi.models.twitter_v2_spec import Get2UsersIdFollowersResponse, User +from sparta.twitterapi.rate_limiter import RateLimiter from sparta.twitterapi.tweets.constants import USER_FIELDS logger = logging.getLogger(__name__) @@ -64,6 +65,7 @@ async def get_followers_by_id(id: str, max_resulsts: int = 1000) -> AsyncGenerat Yields: Iterator[AsyncGenerator[User, None]]: A Twitter User object. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "user.fields": USER_FIELDS, @@ -75,25 +77,39 @@ async def get_followers_by_id(id: str, max_resulsts: int = 1000) -> AsyncGenerat while True: logger.debug(f"Search users params={params}") async with session.get(f"https://api.twitter.com/2/users/{id}/followers", params=params) as response: + if response.status == 400: + logger.error(f"Cannot get followers for user (HTTP {response.status}): {await response.text()}") + raise Exception + + rate_limiter.update_limits(dict(response.headers)) + + if response.status == 429: + await rate_limiter.wait_for_limit_reset() + continue + if not response.ok: - raise Exception(f"Cannot get followers for user with params: {params} (HTTP {response.status}): {await response.text()}") + logger.error(f"Cannot get followers for user with params: {params} (HTTP {response.status}): {await response.text()}") + await asyncio.sleep(10) + continue + + response_json = await response.json() - response_json = json.loads(await response.text()) try: users = Get2UsersIdFollowersResponse.model_validate(response_json) except Exception as e: logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") logger.warn(response_json) + if not users.data: raise Exception(users) for user in users.data: yield user - if "next_token" in response_json["meta"]: - params["pagination_token"] = response_json["meta"]["next_token"] + if "next_token" in response_json.get("meta"): + params["pagination_token"] = response_json.get("meta").get("next_token") else: - return + break async def get_following_by_id(id: str, max_resulsts: int = 1000) -> AsyncGenerator[User, None]: @@ -113,6 +129,7 @@ async def get_following_by_id(id: str, max_resulsts: int = 1000) -> AsyncGenerat Yields: Iterator[AsyncGenerator[User, None]]: A Twitter User object. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "user.fields": USER_FIELDS, @@ -124,22 +141,36 @@ async def get_following_by_id(id: str, max_resulsts: int = 1000) -> AsyncGenerat while True: logger.debug(f"Search users params={params}") async with session.get(f"https://api.twitter.com/2/users/{id}/following", params=params) as response: + if response.status == 400: + logger.error(f"Cannot get followed users (HTTP {response.status}): {await response.text()}") + raise Exception + + rate_limiter.update_limits(dict(response.headers)) + + if response.status == 429: + await rate_limiter.wait_for_limit_reset() + continue + if not response.ok: - raise Exception(f"Cannot get followers for user with params: {params} (HTTP {response.status}): {await response.text()}") + logger.error(f"Cannot get followed users with params: {params} (HTTP {response.status}): {await response.text()}") + await asyncio.sleep(10) + continue + + response_json = await response.json() - response_json = json.loads(await response.text()) try: users = Get2UsersIdFollowersResponse.model_validate(response_json) except Exception as e: logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") logger.warn(response_json) + if not users.data: raise Exception(users) for user in users.data: yield user - if "next_token" in response_json["meta"]: - params["pagination_token"] = response_json["meta"]["next_token"] + if "next_token" in response_json.get("meta"): + params["pagination_token"] = response_json.get("meta").get("next_token") else: - return + break diff --git a/src/sparta/twitterapi/users/user.py b/src/sparta/twitterapi/users/user.py index 5ec12ca..e9a0532 100644 --- a/src/sparta/twitterapi/users/user.py +++ b/src/sparta/twitterapi/users/user.py @@ -36,6 +36,7 @@ import aiohttp from sparta.twitterapi.models.twitter_v2_spec import Get2UsersByResponse, Get2UsersResponse, User +from sparta.twitterapi.rate_limiter import RateLimiter from sparta.twitterapi.tweets.constants import USER_FIELDS logger = logging.getLogger(__name__) @@ -47,76 +48,100 @@ async def get_users_by_username(usernames: List[str]) -> AsyncGenerator[User, None]: - """Retrieves information about users specified by their username. + """Asynchronously retrieves information about users specified by their usernames. + + This function queries the Twitter API to get information about users based on a list of Twitter usernames. It handles rate limiting using an internal + instance of RateLimiter, automatically pausing requests if the rate limit is exceeded. Args: - usernames (List[str]): A List of usernames (Twitter handle). Up to 100 are allowed in a single request. + usernames (List[str]): A list of Twitter usernames (handles). Up to 100 usernames are allowed in a single request. - Raises: - Exception: Cannot get the search result due to an http error. - Exception: User not found error. + Yields: + User: An object representing a Twitter user for each provided username. - Returns: - AsyncGenerator[User, None]: AsyncGenerator that yields Twitter User objects. + Raises: + Exception: If an HTTP error occurs that prevents retrieving the user information or if any of the usernames are invalid. - Yields: - Iterator[AsyncGenerator[User, None]]: A Twitter User object. + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "usernames": ",".join(usernames), "user.fields": USER_FIELDS, } logger.debug(f"Search users params={params}") - async with session.get("https://api.twitter.com/2/users/by", params=params) as response: - if not response.ok: - raise Exception(f"Cannot search users {params} (HTTP {response.status}): {await response.text()}") - - response_json = await response.text() - try: - users = Get2UsersByResponse.model_validate_json(response_json) - except Exception as e: - logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") - logger.warn(response_json) - if not users.data: - raise Exception(users) - for user in users.data: - yield user + while True: + async with session.get("https://api.twitter.com/2/users/by", params=params) as response: + rate_limiter.update_limits(dict(response.headers)) + + if response.status == 429: + await rate_limiter.wait_for_limit_reset() + continue + + if not response.ok: + raise Exception(f"Cannot search users {params} (HTTP {response.status}): {await response.text()}") + + response_json = await response.json() + try: + users = Get2UsersByResponse.model_validate(response_json) + except Exception as e: + logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") + logger.warn(response_json) + if not users.data: + raise Exception(users) + for user in users.data: + yield user + + break async def get_users_by_ids(ids: List[str]) -> AsyncGenerator[User, None]: - """Retrieves information about users specified by their ID. + """Asynchronously retrieves information about users specified by their usernames. + + This function queries the Twitter API to get information about users based on a list of Twitter user ids. It handles rate limiting using an internal + instance of RateLimiter, automatically pausing requests if the rate limit is exceeded. Args: - ids (List[str]): A list of User IDs. Up to 100 are allowed in a single request. + usernames (List[str]): A list of Twitter user ids. Up to 100 usernames are allowed in a single request. - Raises: - Exception: Cannot get the search result due to an http error. - Exception: User not found error. + Yields: + User: An object representing a Twitter user for each provided username. - Returns: - AsyncGenerator[User, None]: AsyncGenerator that yields Twitter User objects. + Raises: + Exception: If an HTTP error occurs that prevents retrieving the user information or if any of the usernames are invalid. - Yields: - Iterator[AsyncGenerator[User, None]]: A Twitter User object. + Note: + The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response. """ + rate_limiter = RateLimiter() async with aiohttp.ClientSession(headers=headers) as session: params: Dict[str, str] = { "ids": ",".join(ids), "user.fields": USER_FIELDS, } logger.debug(f"Search users params={params}") - async with session.get("https://api.twitter.com/2/users", params=params) as response: - if not response.ok: - raise Exception(f"Cannot search users {params} (HTTP {response.status}): {await response.text()}") - - response_json = await response.text() - try: - users = Get2UsersResponse.model_validate_json(response_json) - except Exception as e: - logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") - logger.warn(response_json) - if not users.data: - raise Exception(users) - for user in users.data: - yield user + while True: + async with session.get("https://api.twitter.com/2/users", params=params) as response: + rate_limiter.update_limits(dict(response.headers)) + + if response.status == 429: + await rate_limiter.wait_for_limit_reset() + continue + + if not response.ok: + raise Exception(f"Cannot search users {params} (HTTP {response.status}): {await response.text()}") + + response_json = await response.json() + try: + users = Get2UsersResponse.model_validate(response_json) + except Exception as e: + logger.warn(f"Inconsistent twitter OpenAPI documentation {e}") + logger.warn(response_json) + if not users.data: + raise Exception(users) + for user in users.data: + yield user + + break