Skip to content

Commit

Permalink
Use the rate limits from the header to better control the waiting tim…
Browse files Browse the repository at this point in the history
…es (#3)

Co-authored-by: sx3eanne <[email protected]>
  • Loading branch information
anneum and UnibwSparta committed Dec 21, 2023
1 parent 6d85d6b commit 63d5bee
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 229 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sparta-twitterapi"
version = "0.1.53"
version = "0.1.54"
description = ""
authors = ["Andreas Neumeier <[email protected]>", "Jasmin Riedl <[email protected]>", "Lukas Metzner <[email protected]>", "Benedikt Radtke <[email protected]>"]
readme = "README.md"
Expand Down
70 changes: 70 additions & 0 deletions src/sparta/twitterapi/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import logging
import time
from typing import Dict, Optional

logger = logging.getLogger(__name__)


class RateLimiter:
"""A utility class for handling rate limiting in API requests.
This class provides methods to check and handle rate limits as provided
by the API, typically through HTTP headers. It tracks the number of requests
remaining before hitting the rate limit and the time when the rate limit
will be reset.
Attributes:
remaining (Optional[int]): The number of requests left for the rate limit window.
Defaults to None.
reset_time (Optional[int]): The UTC epoch time in seconds when the rate limit
will be reset. Defaults to None.
Methods:
wait_for_limit_reset: Asynchronously waits until the rate limit is reset if
the limit has been reached.
update_limits(headers): Updates the rate limit remaining and reset time based
on the HTTP headers from a response.
should_wait(): Determines whether it is necessary to wait for the rate limit
reset based on the remaining requests.
"""

def __init__(self) -> None:
self.remaining: Optional[int] = None
self.reset_time: Optional[int] = None

async def wait_for_limit_reset(self) -> None:
"""Asynchronously waits until the rate limit is reset.
This method calculates the time to wait based on the current time and the
reset time of the rate limit. It then pauses execution for that duration,
effectively throttling the rate of API requests.
Raises:
Warning: If the reset time has passed but the method is called.
"""
if self.reset_time is not None:
wait_time = max(self.reset_time - int(time.time()), 1) # Warte mindestens 1 Sekunde
logger.warning(f"Rate limit exceeded. Waiting {wait_time} seconds.")
await asyncio.sleep(wait_time)

def update_limits(self, headers: Dict[str, str]) -> None:
"""Updates the rate limit information based on the response headers.
Args:
headers (Dict[str, str]): A dictionary of HTTP headers from an API response.
This method extracts the 'x-rate-limit-remaining' and 'x-rate-limit-reset'
values from the headers and updates the internal state of the rate limiter.
"""
self.remaining = int(headers.get("x-rate-limit-remaining", 1))
self.reset_time = int(headers.get("x-rate-limit-reset", 0))

def should_wait(self) -> bool:
"""Determines if waiting for rate limit reset is necessary.
Returns:
bool: True if the remaining number of requests is zero and it's necessary
to wait until the rate limit is reset. False otherwise.
"""
return self.remaining == 0
107 changes: 53 additions & 54 deletions src/sparta/twitterapi/tweets/full_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"""

import asyncio
import json
import logging
import os
from datetime import datetime
Expand All @@ -52,6 +51,7 @@

from sparta.twitterapi.models.tweet_response import TweetResponse
from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsCountsAllResponse, SearchCount
from sparta.twitterapi.rate_limiter import RateLimiter
from sparta.twitterapi.tweets.constants import EXPANSIONS, MEDIA_FIELDS, PLACE_FIELDS, POLL_FIELDS, TWEET_FIELDS, USER_FIELDS

logger = logging.getLogger(__name__)
Expand All @@ -70,29 +70,29 @@ async def get_full_search(
until_id: str = None,
sort_order: str = None,
) -> AsyncGenerator[TweetResponse, None]:
"""Returns Tweets that match a search query. If no time or id parameters are specified, the last 30 days are assumed.
"""Asynchronously retrieves tweets that match a specified search query.
This function queries the Twitter API to find tweets matching the given search criteria. It handles rate limiting using an internal instance of
RateLimiter, automatically pausing requests if the rate limit is exceeded.
Args:
query (str): One query/rule/filter for matching Tweets. Refer to https://t.co/rulelength to identify the max query length.
How to build a rule: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule
start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive
(i.e. 12:00:01 includes the first second of the minute). Defaults to None.
end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive
(i.e. 12:00:01 excludes the first second of the minute). Defaults to None.
since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None.
until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None.
sort_order (str, optional): This order in which to return results. Possible options: recency and relevancy. Defaults to None.
query (str): The search query for matching Tweets. Refer to Twitter API documentation for details on query format and limitations.
start_time (datetime, optional): The oldest UTC timestamp from which tweets will be provided. Inclusive and in second granularity.
end_time (datetime, optional): The newest UTC timestamp to which tweets will be provided. Exclusive and in second granularity.
since_id (str, optional): Returns results with a Tweet ID greater than this ID.
until_id (str, optional): Returns results with a Tweet ID less than this ID.
sort_order (str, optional): The order in which to return results (e.g., 'recency' or 'relevancy').
Yields:
TweetResponse: An object representing the tweet data for each tweet that matches the query.
Raises:
Exception: Cannot get the search result due to an http error.
Returns:
AsyncGenerator[TweetResponse, None]: AsyncGenerator that yields TweetResponses.
Exception: If an HTTP error occurs that prevents retrieving the tweets or if the query parameters are invalid.
Yields:
Iterator[AsyncGenerator[TweetResponse, None]]: A TweetResponse Object.
Note:
The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response.
"""
rate_limiter = RateLimiter()
async with aiohttp.ClientSession(headers=headers) as session:
params: Dict[str, str] = {
"query": query,
Expand Down Expand Up @@ -120,34 +120,29 @@ async def get_full_search(
logger.debug(f"search recent params={params}")
async with session.get("https://api.twitter.com/2/tweets/search/all", params=params) as response:
if response.status == 400:
logger.error(f"Cannot search tweets (HTTP {response.status}): {await response.text()}")
logger.error(f"Cannot search full tweets (HTTP {response.status}): {await response.text()}")
raise Exception

rate_limiter.update_limits(dict(response.headers))

if response.status == 429:
logger.warn(f"{response.status} Too Many Requests. Sleep for 5 second...")
await asyncio.sleep(5)
await rate_limiter.wait_for_limit_reset()
continue

if not response.ok:
logger.error(f"Cannot search tweets (HTTP {response.status}): {await response.text()}")
await asyncio.sleep(5)
logger.error(f"Cannot search full tweets (HTTP {response.status}): {await response.text()}")
await asyncio.sleep(10)
continue

response_text = await response.text()
response_json = json.loads(response_text)
response_json = await response.json()

for tweet in response_json.get("data", []):
yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {}))

# try:
# Get2TweetsSearchAllResponse.model_validate(response_json)
# except Exception as e:
# logger.warn(f"Inconsistent twitter OpenAPI documentation {e}")
# # logger.warn(response_text)

if "next_token" in response_json["meta"]:
params["next_token"] = response_json["meta"]["next_token"]
if "next_token" in response_json.get("meta"):
params["next_token"] = response_json.get("meta").get("next_token")
else:
return
break


async def get_full_search_count(
Expand All @@ -158,29 +153,32 @@ async def get_full_search_count(
until_id: str = None,
granularity: str = "hour",
) -> AsyncGenerator[SearchCount, None]:
"""Returns the number of tweets that match a query according to a granularity (e.g. hourwise) over a given time period. If no time or id parameters are
specified, the last 30 days are assumed.
"""Asynchronously retrieves the count of tweets matching a specified search query, aggregated according to a specified granularity (e.g., hourly).
This function queries the Twitter API to count tweets matching the given search criteria and aggregates the counts based on the specified granularity.
It handles rate limiting using an internal instance of RateLimiter, automatically pausing requests if the rate limit is exceeded.
Args:
query (str): One query/rule/filter for matching Tweets. Refer to https://t.co/rulelength to identify the max query length.
How to build a rule: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule
start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive
(i.e. 12:00:01 includes the first second of the minute). Defaults to None.
end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive
(i.e. 12:00:01 excludes the first second of the minute). Defaults to None.
since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None.
until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None.
granularity (str, optional): The granularity for the search counts results. Defaults to 'hour'
Returns:
AsyncGenerator[SearchCount, None]: AsyncGenerator that yields a Twitter SearchCounts.
query (str): The search query for matching Tweets. Refer to Twitter API documentation for query format and limitations.
start_time (datetime, optional): The oldest UTC timestamp from which tweet counts will be provided. Inclusive and in second granularity.
end_time (datetime, optional): The newest UTC timestamp to which tweet counts will be provided. Exclusive and in second granularity.
since_id (str, optional): Returns results with a Tweet ID greater than this ID.
until_id (str, optional): Returns results with a Tweet ID less than this ID.
granularity (str, optional): The granularity for the search counts results ('minute', 'hour', or 'day'). Defaults to 'hour'.
Yields:
Iterator[AsyncGenerator[SearchCount, None]]: A Twitter SearchCount Object.
SearchCount: An object representing the tweet count data for each interval according to the specified granularity.
Raises:
Exception: If an invalid granularity is specified or if an HTTP error occurs that prevents retrieving the tweet counts.
Note:
The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response.
"""
if granularity not in ["minute", "hour", "day"]:
raise Exception(f"Wrong granularity. Given granularity: {granularity}. Possible values = minute, hour, day")

rate_limiter = RateLimiter()
async with aiohttp.ClientSession(headers=headers) as session:
params: Dict[str, str] = {
"query": query,
Expand All @@ -200,17 +198,18 @@ async def get_full_search_count(
logger.debug(f"search recent params={params}")
async with session.get("https://api.twitter.com/2/tweets/counts/all", params=params) as response:
if response.status == 400:
logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}")
logger.error(f"Cannot get full tweet count (HTTP {response.status}): {await response.text()}")
raise Exception

rate_limiter.update_limits(dict(response.headers))

if response.status == 429:
logger.warn("429 Too Many Requests.")
await asyncio.sleep(5)
await rate_limiter.wait_for_limit_reset()
continue

if not response.ok:
logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}")
await asyncio.sleep(5)
logger.error(f"Cannot search full tweet counts (HTTP {response.status}): {await response.text()}")
await asyncio.sleep(10)
continue

counts = Get2TweetsCountsAllResponse.model_validate_json(await response.text())
Expand All @@ -221,4 +220,4 @@ async def get_full_search_count(
if counts.meta and counts.meta.next_token:
params["next_token"] = counts.meta.next_token
else:
return
break
45 changes: 24 additions & 21 deletions src/sparta/twitterapi/tweets/quote_tweets.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"""

import asyncio
import json
import logging
import os
from datetime import datetime
Expand All @@ -34,6 +33,7 @@
import aiohttp

from sparta.twitterapi.models.tweet_response import TweetResponse
from sparta.twitterapi.rate_limiter import RateLimiter

# from sparta.twitterapi.models.twitter_v2_spec import Get2TweetsIdQuoteTweetsResponse
from sparta.twitterapi.tweets.constants import EXPANSIONS, MEDIA_FIELDS, PLACE_FIELDS, POLL_FIELDS, TWEET_FIELDS, USER_FIELDS
Expand All @@ -53,27 +53,29 @@ async def get_quote_tweets(
since_id: str = None,
until_id: str = None,
) -> AsyncGenerator[TweetResponse, None]:
"""Returns quoted Tweets.
"""Asynchronously retrieves tweets quoting a specified tweet.
This function queries the Twitter API to find tweets that are quote tweets of the tweet corresponding to the given ID. It handles rate limiting using an
internal instance of RateLimiter, automatically pausing requests if the rate limit is exceeded. The function also supports time-based filtering and
pagination.
Args:
id (str): A tweet ID for which to retrieve quote tweets.
start_time (datetime): The oldest UTC timestamp from which the Tweets will be provided. Timestamp is in second granularity and is inclusive
(i.e. 12:00:01 includes the first second of the minute). Defaults to None.
end_time (datetime): The newest, most recent UTC timestamp to which the Tweets will be provided. Timestamp is in second granularity and is exclusive
(i.e. 12:00:01 excludes the first second of the minute). Defaults to None.
since_id (str, optional): Returns results with a Tweet ID greater than (that is, more recent than) the specified ID. Defaults to None.
until_id (str, optional): Returns results with a Tweet ID less than (that is, older than) the specified ID. Defaults to None.
id (str): The ID of the tweet for which to retrieve quote tweets.
start_time (datetime, optional): The oldest UTC timestamp from which quote tweets will be provided. Inclusive and in second granularity.
end_time (datetime, optional): The newest UTC timestamp to which quote tweets will be provided. Exclusive and in second granularity.
since_id (str, optional): Returns quote tweets with an ID greater than this ID.
until_id (str, optional): Returns quote tweets with an ID less than this ID.
Yields:
TweetResponse: An object representing a tweet that quotes the specified tweet.
Raises:
Exception: Cannot get the search result due to an http error.
Returns:
AsyncGenerator[TweetResponse, None]: AsyncGenerator that yields TweetResponses.
Exception: If an HTTP error occurs that prevents retrieving the quote tweets or if the tweet ID is invalid.
Yields:
Iterator[AsyncGenerator[TweetResponse, None]]: A TweetResponse Object.
Note:
The function automatically handles pagination of results using the 'next_token' provided by Twitter's API response.
"""
rate_limiter = RateLimiter()
async with aiohttp.ClientSession(headers=headers) as session:
params: Dict[str, str] = {
"tweet.fields": TWEET_FIELDS,
Expand Down Expand Up @@ -101,17 +103,18 @@ async def get_quote_tweets(
logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}")
raise Exception

rate_limiter.update_limits(dict(response.headers))

if response.status == 429:
logger.warn(f"{response.status} Too Many Requests. Sleep for 1 minute...")
await asyncio.sleep(60)
await rate_limiter.wait_for_limit_reset()
continue

if not response.ok:
logger.error(f"Cannot search recent tweets (HTTP {response.status}): {await response.text()}")
await asyncio.sleep(5)
await asyncio.sleep(10)
continue

response_text = await response.text()
response_json = json.loads(response_text)
response_json = await response.json()

for tweet in response_json.get("data", []):
yield TweetResponse(tweet=tweet, includes=response_json.get("includes", {}))
Expand All @@ -125,7 +128,7 @@ async def get_quote_tweets(
if "errors" in response_json:
logger.warn(f'Errors: {response_json["errors"]}')
if "next_token" in response_json.get("meta", {}):
params["pagination_token"] = response_json["meta"]["next_token"]
params["pagination_token"] = response_json.get("meta")
else:
break
except Exception as e:
Expand Down
Loading

0 comments on commit 63d5bee

Please sign in to comment.