Skip to content

Commit

Permalink
🐛 Source Shopify: fixed store redirection, add bulk checkpointing (
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov authored Jul 29, 2024
1 parent 4dc4200 commit 34f5328
Show file tree
Hide file tree
Showing 19 changed files with 639 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ acceptance_tests:
spec:
tests:
- spec_path: "source_shopify/spec.json"
backward_compatibility_tests_config:
# This is the intentional change.
# Added new fields: `job_checkpoint_interval`, `job_product_variants_include_pres_prices`
# to provide the ability to override this value by the User.
disable_for_version: 2.4.14
connection:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.4.15
dockerImageTag: 2.4.16
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-shopify/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.15"
version = "2.4.16"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class BulkJobCreationFailedConcurrentError(BaseBulkException):

failure_type: FailureType = FailureType.transient_error

class BulkJobRedirectToOtherShopError(BaseBulkException):
"""Raised when the response contains another shop name"""

failure_type: FailureType = FailureType.transient_error

class BulkJobConcurrentError(BaseBulkException):
"""Raised when failing the job after hitting too many BulkJobCreationFailedConcurrentError."""

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def prepare(query: str) -> str:

@dataclass
class ShopifyBulkQuery:
shop_id: int
config: Mapping[str, Any]

@property
def shop_id(self) -> int:
return self.config.get("shop_id")

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -112,6 +116,14 @@ def sort_key(self) -> Optional[str]:
"""
return None

@property
def supports_checkpointing(self) -> bool:
"""
The presence of `sort_key = "UPDATED_AT"` for a query instance, usually means,
the server-side BULK Job results are fetched and ordered correctly, suitable for checkpointing.
"""
return self.sort_key == "UPDATED_AT"

@property
def query_nodes(self) -> Optional[Union[List[Field], List[str]]]:
"""
Expand Down Expand Up @@ -2382,8 +2394,7 @@ class ProductVariant(ShopifyBulkQuery):
"""
{
productVariants(
query: "updated_at:>='2019-04-13T00:00:00+00:00' AND updated_at:<='2024-04-30T12:16:17.273363+00:00'"
sortKey: UPDATED_AT
query: "updatedAt:>='2019-04-13T00:00:00+00:00' AND updatedAt:<='2024-04-30T12:16:17.273363+00:00'"
) {
edges {
node {
Expand Down Expand Up @@ -2457,64 +2468,76 @@ class ProductVariant(ShopifyBulkQuery):
"""

query_name = "productVariants"
sort_key = "ID"

prices_fields: List[str] = ["amount", "currencyCode"]
presentment_prices_fields: List[Field] = [
Field(
name="edges",
fields=[
Field(
name="node",
fields=["__typename", Field(name="price", fields=prices_fields), Field(name="compareAtPrice", fields=prices_fields)],
)
],
)
]
@property
def _should_include_presentment_prices(self) -> bool:
return self.config.get("job_product_variants_include_pres_prices", True)

option_value_fields: List[Field] = [
"id",
"name",
Field(name="hasVariants", alias="has_variants"),
Field(name="swatch", fields=["color", Field(name="image", fields=["id"])]),
]
option_fields: List[Field] = [
"name",
"value",
Field(name="optionValue", alias="option_value", fields=option_value_fields),
]
@property
def query_nodes(self) -> Optional[Union[List[Field], List[str]]]:

# main query
query_nodes: List[Field] = [
"__typename",
"id",
"title",
"price",
"sku",
"position",
"inventoryPolicy",
"compareAtPrice",
"inventoryManagement",
"createdAt",
"updatedAt",
"taxable",
"barcode",
"weight",
"weightUnit",
"inventoryQuantity",
"requiresShipping",
"availableForSale",
"displayName",
"taxCode",
Field(name="selectedOptions", alias="options", fields=option_fields),
Field(name="weight", alias="grams"),
Field(name="image", fields=[Field(name="id", alias="image_id")]),
Field(name="inventoryQuantity", alias="old_inventory_quantity"),
Field(name="product", fields=[Field(name="id", alias="product_id")]),
Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]),
Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]),
Field(name="presentmentPrices", fields=presentment_prices_fields),
]
prices_fields: List[str] = ["amount", "currencyCode"]
presentment_prices_fields: List[Field] = [
Field(
name="edges",
fields=[
Field(
name="node",
fields=[
"__typename",
Field(name="price", fields=prices_fields),
Field(name="compareAtPrice", fields=prices_fields),
],
)
],
)
]
option_value_fields: List[Field] = [
"id",
"name",
Field(name="hasVariants", alias="has_variants"),
Field(name="swatch", fields=["color", Field(name="image", fields=["id"])]),
]
option_fields: List[Field] = [
"name",
"value",
Field(name="optionValue", alias="option_value", fields=option_value_fields),
]
presentment_prices = (
[Field(name="presentmentPrices", fields=presentment_prices_fields)] if self._should_include_presentment_prices else []
)

query_nodes: List[Field] = [
"__typename",
"id",
"title",
"price",
"sku",
"position",
"inventoryPolicy",
"compareAtPrice",
"inventoryManagement",
"createdAt",
"updatedAt",
"taxable",
"barcode",
"weight",
"weightUnit",
"inventoryQuantity",
"requiresShipping",
"availableForSale",
"displayName",
"taxCode",
Field(name="selectedOptions", alias="options", fields=option_fields),
Field(name="weight", alias="grams"),
Field(name="image", fields=[Field(name="id", alias="image_id")]),
Field(name="inventoryQuantity", alias="old_inventory_quantity"),
Field(name="product", fields=[Field(name="id", alias="product_id")]),
Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]),
Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]),
] + presentment_prices

return query_nodes

record_composition = {
"new_record": "ProductVariant",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __post_init__(self) -> None:
self.composition: Optional[Mapping[str, Any]] = self.query.record_composition
self.record_process_components: Optional[Callable[[MutableMapping], MutableMapping]] = self.query.record_process_components
self.components: List[str] = self.composition.get("record_components", []) if self.composition else []
# how many records composed
self.record_composed: int = 0

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -127,8 +129,12 @@ def produce_records(self, filename: str) -> Iterable[MutableMapping[str, Any]]:
"""

with open(filename, "r") as jsonl_file:
# reset the counter
self.record_composed = 0

for record in self.process_line(jsonl_file):
yield self.tools.fields_names_to_snake_case(record)
self.record_composed += 1

def read_file(self, filename: str, remove_file: Optional[bool] = True) -> Iterable[Mapping[str, Any]]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,20 @@ def bulk_retry_on_exception(logger: logging.Logger, more_exceptions: Optional[Tu
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(self, *args, **kwargs) -> Any:
# mandatory class attributes
max_retries = self._job_max_retries
stream_name = self.stream_name
backoff_time = self._job_backoff_time

current_retries = 0
while True:
try:
return func(self, *args, **kwargs)
except BULK_RETRY_ERRORS or more_exceptions as ex:
current_retries += 1
if current_retries > max_retries:
if current_retries > self._job_max_retries:
logger.error("Exceeded retry limit. Giving up.")
raise
else:
logger.warning(
f"Stream `{stream_name}`: {ex}. Retrying {current_retries}/{max_retries} after {backoff_time} seconds."
f"Stream `{self.http_client.name}`: {ex}. Retrying {current_retries}/{self._job_max_retries} after {self._job_backoff_time} seconds."
)
sleep(backoff_time)
sleep(self._job_backoff_time)
except ShopifyBulkExceptions.BulkJobCreationFailedConcurrentError:
if self._concurrent_attempt == self._concurrent_max_retry:
message = f"The BULK Job couldn't be created at this time, since another job is running."
Expand All @@ -51,9 +46,13 @@ def wrapper(self, *args, **kwargs) -> Any:

self._concurrent_attempt += 1
logger.warning(
f"Stream: `{self.stream_name}`, the BULK concurrency limit has reached. Waiting {self._concurrent_interval} sec before retry, attempt: {self._concurrent_attempt}.",
f"Stream: `{self.http_client.name}`, the BULK concurrency limit has reached. Waiting {self._concurrent_interval} sec before retry, attempt: {self._concurrent_attempt}.",
)
sleep(self._concurrent_interval)
except ShopifyBulkExceptions.BulkJobRedirectToOtherShopError:
logger.warning(
f"Stream: `{self.http_client.name}`, the `shop name` differs from the provided in `input configuration`. Switching to the `{self._tools.shop_name_from_url(self.base_url)}`.",
)

return wrapper

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def filename_from_url(job_result_url: str) -> str:
f"Could not extract the `filename` from `result_url` provided, details: {job_result_url}",
)

@staticmethod
def shop_name_from_url(url: str) -> str:
match = re.search(r"https://(.*?)(\.myshopify)", url)
if match:
return match.group(1)
else:
# safety net, if there is an error parsing url,
# on no match is found
return url

@staticmethod
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,27 @@
"description": "Defines which API type (REST/BULK) to use to fetch `Transactions` data. If you are a `Shopify Plus` user, leave the default value to speed up the fetch.",
"default": false
},
"job_product_variants_include_pres_prices": {
"type": "boolean",
"title": "Add `Presentment prices` to Product Variants",
"description": "If enabled, the `Product Variants` stream attempts to include `Presentment prices` field (may affect the performance).",
"default": true
},
"job_termination_threshold": {
"type": "integer",
"title": "BULK Job termination threshold",
"description": "The max time in seconds, after which the single BULK Job should be `CANCELED` and retried. The bigger the value the longer the BULK Job is allowed to run.",
"default": 3600,
"minimum": 1
"default": 7200,
"minimum": 3600,
"maximum": 21600
},
"job_checkpoint_interval": {
"type": "integer",
"title": "BULK Job checkpoint (rows collected)",
"description": "The threshold, after which the single BULK Job should be checkpointed.",
"default": 100000,
"minimum": 15000,
"maximum": 200000
}
}
},
Expand Down
Loading

0 comments on commit 34f5328

Please sign in to comment.