Skip to content

Commit

Permalink
[airbyte-cdk] add running stream status with rate limit reason to bac…
Browse files Browse the repository at this point in the history
…koff aproach (#40681)
  • Loading branch information
lazebnyi authored Jul 10, 2024
1 parent c7375a8 commit f00ed4a
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 5 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
AirbyteStreamState,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteStreamStatusReason,
AirbyteStreamStatusReasonType,
AirbyteTraceMessage,
AuthFlowType,
ConfiguredAirbyteCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
error_message="Request timeout.",
),
429: ErrorResolution(
response_action=ResponseAction.RETRY,
response_action=ResponseAction.RATE_LIMITED,
failure_type=FailureType.transient_error,
error_message="Too many requests.",
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ResponseAction(Enum):
RETRY = "RETRY"
FAIL = "FAIL"
IGNORE = "IGNORE"
RATE_LIMITED = "RATE_LIMITED"


@dataclass
Expand Down
21 changes: 18 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import requests
import requests_cache
from airbyte_cdk.models import Level
from airbyte_cdk.models import AirbyteStreamStatus, AirbyteStreamStatusReason, AirbyteStreamStatusReasonType, Level, StreamDescriptor
from airbyte_cdk.sources.http_config import MAX_CONNECTION_POOL_SIZE
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.call_rate import APIBudget, CachedLimiterSession, LimiterSession
Expand All @@ -27,6 +27,7 @@
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from airbyte_cdk.sources.streams.http.rate_limiting import http_client_default_backoff_handler, user_defined_backoff_handler
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests.auth import AuthBase

Expand Down Expand Up @@ -250,14 +251,28 @@ def _send(
"Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text}
)

# Request/repsonse logging for declarative cdk.
# Request/response logging for declarative cdk
if log_formatter is not None and response is not None and self._message_repository is not None:
formatter = log_formatter
self._message_repository.log_message(
Level.DEBUG,
lambda: formatter(response), # type: ignore # log_formatter is always cast to a callable
)

# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
# TODO: Update to handle with message repository when concurrent message repository is ready
reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
message = stream_status_as_airbyte_message(StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons).json(
exclude_unset=True
)

# Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
# wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
# end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the
# backoff retry loop.
print(message)

if error_resolution.response_action == ResponseAction.FAIL:
if response:
error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message '{self._error_message_parser.parse_response_error_message(response)}'"
Expand All @@ -281,7 +296,7 @@ def _send(
self._logger.info(error_resolution.error_message or log_message)

# TODO: Consider dynamic retry count depending on subsequent error codes
elif error_resolution.response_action == ResponseAction.RETRY:
elif error_resolution.response_action == ResponseAction.RETRY or error_resolution.response_action == ResponseAction.RATE_LIMITED:
user_defined_backoff_time = None
for backoff_strategy in self._backoff_strategies:
backoff_time = backoff_strategy.backoff_time(
Expand Down
9 changes: 8 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/stream_status_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@


from datetime import datetime
from typing import List, Optional, Union

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStream,
AirbyteStreamStatus,
AirbyteStreamStatusReason,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
StreamDescriptor,
Expand All @@ -17,7 +19,11 @@
from airbyte_cdk.models import Type as MessageType


def as_airbyte_message(stream: AirbyteStream, current_status: AirbyteStreamStatus) -> AirbyteMessage:
def as_airbyte_message(
stream: Union[AirbyteStream, StreamDescriptor],
current_status: AirbyteStreamStatus,
reasons: Optional[List[AirbyteStreamStatusReason]] = None,
) -> AirbyteMessage:
"""
Builds an AirbyteStreamStatusTraceMessage for the provided stream
"""
Expand All @@ -30,6 +36,7 @@ def as_airbyte_message(stream: AirbyteStream, current_status: AirbyteStreamStatu
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name=stream.name, namespace=stream.namespace),
status=current_status,
reasons=reasons,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,25 @@ def backoff_time(self, *args, **kwargs):
with pytest.raises(UserDefinedBackoffException):
http_client.send_request(http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={})
assert mocked_send.call_count == 2


def test_send_emit_stream_status_with_rate_limit_reason(capsys):
class BackoffStrategy:
def backoff_time(self, *args, **kwargs):
return 0.001

http_client = HttpClient(name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()), backoff_strategy=BackoffStrategy())

mocked_response = MagicMock(spec=requests.Response)
mocked_response.status_code = 429
mocked_response.headers = {}
mocked_response.ok = False
session_send = MagicMock(spec=requests.Session.send)
session_send.return_value = mocked_response

with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send:
with pytest.raises(UserDefinedBackoffException):
http_client.send_request(http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={})

trace_messages = capsys.readouterr().out.split()
assert len(trace_messages) == mocked_send.call_count

0 comments on commit f00ed4a

Please sign in to comment.