Skip to content

Commit

Permalink
Merge branch 'master' into artem1205/source-facebook-marketing-OC-2765
Browse files Browse the repository at this point in the history
  • Loading branch information
artem1205 authored Aug 21, 2023
2 parents 5200126 + 2b80138 commit cd97f40
Show file tree
Hide file tree
Showing 18 changed files with 284 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def validate_all_tags_are_keyvalue_pairs(
def is_major_version(version: str) -> bool:
"""Check whether the version is of format N.0.0"""
semver_version = semver.Version.parse(version)
return semver_version.minor == 0 and semver_version.patch == 0
return semver_version.minor == 0 and semver_version.patch == 0 and semver_version.prerelease is None


def validate_major_version_bump_has_breaking_change_entry(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
metadataSpecVersion: 1.0
data:
name: AlloyDB for PostgreSQL
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
connectorType: source
dockerRepository: airbyte/image-exists-1
githubIssueLabel: source-alloydb-strict-encrypt
dockerImageTag: 2.0.0-dev.cf3628ccf3
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
connectorSubtype: database
releaseStage: generally_available
license: MIT
releases:
breakingChanges:
2.0.0:
upgradeDeadline: 2023-08-22
message: "This version changes the connector’s authentication method from `ApiKey` to `oAuth`, per the [API guide](https://amazon-sqs.com/api/someguide)."
tags:
- language:java
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ COPY source_google_sheets ./source_google_sheets
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/source-google-sheets
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
dockerRepository: airbyte/source-google-sheets
githubIssueLabel: source-google-sheets
icon: google-sheets.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
) from err
raise Exception(f"Could not run discovery: {reason}")

def read(
def _read(
self,
logger: AirbyteLogger,
config: json,
Expand Down Expand Up @@ -206,7 +206,22 @@ def read(
else:
logger.info(f"Skipping syncing sheet {sheet}: {reason}")

logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")
def read(
self,
logger: AirbyteLogger,
config: json,
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Generator[AirbyteMessage, None, None]:
try:
yield from self._read(logger, config, catalog, state)
except errors.HttpError as e:
if e.status_code == 429:
logger.info(f"Stopped syncing process due to rate limits. {e.reason}")
else:
logger.info(f"{e.status_code}: {e.reason}")
finally:
logger.info(f"Finished syncing spreadsheet {Helpers.get_spreadsheet_id(config['spreadsheet_id'])}")

@staticmethod
def get_credentials(config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging

import pytest
import requests
from airbyte_cdk.models.airbyte_protocol import (
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
)
from airbyte_cdk.utils import AirbyteTracedException
from apiclient import errors
from source_google_sheets import SourceGoogleSheets
from source_google_sheets.client import GoogleSheetsClient
from source_google_sheets.helpers import SCOPES
from source_google_sheets.helpers import SCOPES, Helpers


def test_invalid_credentials_error_message(invalid_config):
Expand Down Expand Up @@ -58,3 +67,29 @@ def test_discover_403_error(mocker, invalid_config):
expected_message = ("Forbidden when requesting spreadsheet with id invalid_spreadsheet_id. The caller does not have right permissions. "
"See docs for more details here: https://cloud.google.com/service-infrastructure/docs/service-control/reference/rpc/google.api/servicecontrol.v1#code")
assert e.value.args[0] == expected_message


def test_read_429_error(mocker, invalid_config, caplog):
source = SourceGoogleSheets()
resp = requests.Response()
resp.status = 429
resp.reason = "Request a higher quota limit"
mocker.patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes=SCOPES: None)
mocker.patch.object(GoogleSheetsClient, "get", return_value=mocker.Mock)
mocker.patch.object(Helpers, "get_sheets_in_spreadsheet", side_effect=errors.HttpError(resp=resp, content=b''))

sheet1 = "soccer_team"
sheet1_columns = frozenset(["arsenal", "chelsea", "manutd", "liverpool"])
sheet1_schema = {"properties": {c: {"type": "string"} for c in sheet1_columns}}
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema, supported_sync_modes=["full_refresh"]),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
),
]
)
records = list(source.read(logger=logging.getLogger("airbyte"), config=invalid_config, catalog=catalog))
assert [] == records
assert "Stopped syncing process due to rate limits. Request a higher quota limit" in caplog.text

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ acceptance_tests:
path: "integration_tests/expected_records.jsonl"
fail_on_extra_columns: true
timeout_seconds: 3600
ignored_fields:
campaign_groups:
- name: "lastModified"
bypass_reason: "Volatile data"
empty_streams:
- name: ad_member_company_size_analytics
bypass_reason: "Empty stream; Retention period is 2y"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=3.17.3
LABEL io.airbyte.version=3.17.4
LABEL io.airbyte.name=airbyte/source-stripe
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 3.17.3
dockerImageTag: 3.17.4
dockerRepository: airbyte/source-stripe
githubIssueLabel: source-stripe
icon: stripe.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,35 @@
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests import HTTPError

STRIPE_ERROR_CODES = {
"more_permissions_required": "This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate",
"account_invalid": "The card, or account the card is connected to, is invalid. You need to contact your card issuer "
"to check that the card is working correctly.",
"oauth_not_supported": "Please use a different authentication method.",
}


class StripeAvailabilityStrategy(HttpAvailabilityStrategy):
def handle_http_error(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Tuple[bool, Optional[str]]:
status_code = error.response.status_code
if status_code not in [400, 403]:
raise error
parsed_error = error.response.json()
error_code = parsed_error.get("error", {}).get("code")
error_message = STRIPE_ERROR_CODES.get(error_code, parsed_error.get("error", {}).get("message"))
if not error_message:
raise error
doc_ref = self._visit_docs_message(logger, source)
reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {error_message}. {doc_ref} "
response_error_message = stream.parse_response_error_message(error.response)
if response_error_message:
reason += response_error_message
return False, reason


class StripeSubStreamAvailabilityStrategy(HttpAvailabilityStrategy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,27 +666,7 @@
"type": ["null", "string"]
},
"shipping_address": {
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
"$ref": "address.json"
}
}
},
Expand All @@ -701,27 +681,7 @@
"type": ["null", "object"],
"properties": {
"billing_address": {
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
"$ref": "address.json"
},
"email": {
"type": ["null", "string"]
Expand All @@ -730,27 +690,7 @@
"type": ["null", "string"]
},
"shipping_address": {
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
"$ref": "address.json"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_stripe.availability_strategy import StripeSubStreamAvailabilityStrategy

STRIPE_ERROR_CODES: List = [
# stream requires additional permissions
"more_permissions_required",
# account_id doesn't have the access to the stream
"account_invalid",
]
from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy

STRIPE_API_VERSION = "2022-11-15"


Expand All @@ -30,6 +24,10 @@ class StripeStream(HttpStream, ABC):
DEFAULT_SLICE_RANGE = 365
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return StripeAvailabilityStrategy()

def __init__(self, start_date: int, account_id: str, slice_range: int = DEFAULT_SLICE_RANGE, **kwargs):
super().__init__(**kwargs)
self.account_id = account_id
Expand Down Expand Up @@ -66,27 +64,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
response_json = response.json()
yield from response_json.get("data", []) # Stripe puts records in a container array "data"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
try:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
parsed_error = e.response.json()
error_code = parsed_error.get("error", {}).get("code")
error_message = parsed_error.get("message")
# if the API Key doesn't have required permissions to particular stream, this stream will be skipped
if status_code == 403 and error_code in STRIPE_ERROR_CODES:
self.logger.warn(f"Stream {self.name} is skipped, due to {error_code}. Full message: {error_message}")
pass
else:
self.logger.error(f"Syncing stream {self.name} is failed, due to {error_code}. Full message: {error_message}")


class BasePaginationStripeStream(StripeStream, ABC):
def request_params(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import pytest
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator


@pytest.fixture(autouse=True)
def disable_cache(mocker):
mocker.patch(
"source_stripe.streams.Customers.use_cache",
new_callable=mocker.PropertyMock,
return_value=False
)
mocker.patch(
"source_stripe.streams.Transfers.use_cache",
new_callable=mocker.PropertyMock,
return_value=False
)
mocker.patch(
"source_stripe.streams.Subscriptions.use_cache",
new_callable=mocker.PropertyMock,
return_value=False
)
mocker.patch(
"source_stripe.streams.SubscriptionItems.use_cache",
new_callable=mocker.PropertyMock,
return_value=False
)


@pytest.fixture(name="config")
def config_fixture():
config = {"client_secret": "sk_test(live)_<secret>",
"account_id": "<account_id>", "start_date": "2020-05-01T00:00:00Z"}
return config


@pytest.fixture(name="stream_args")
def stream_args_fixture():
authenticator = TokenAuthenticator("sk_test(live)_<secret>")
args = {
"authenticator": authenticator,
"account_id": "<account_id>",
"start_date": 1588315041,
"slice_range": 365,
}
return args
Loading

0 comments on commit cd97f40

Please sign in to comment.