-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Glide destination updates #12
Changes from all commits
5a7fcab
ea07a23
73b4095
6f2770f
84845db
e585cdb
78ea57b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
from abc import ABC, abstractmethod | ||
import uuid | ||
from .log import LOG_LEVEL_DEFAULT | ||
import logging | ||
import requests | ||
|
@@ -18,6 +19,8 @@ | |
"json", | ||
] | ||
|
||
DEFAULT_BATCH_SIZE = 1500 | ||
|
||
class Column(dict): | ||
""" | ||
Represents a Column in the glide API. | ||
|
@@ -46,6 +49,14 @@ def __repr__(self): | |
|
||
|
||
class GlideBigTableBase(ABC): | ||
""" | ||
An API client for interacting with a Glide Big Table. The intention is to | ||
create a new table or update an existing table including the table's schema | ||
and the table's rows. | ||
|
||
The protocol is to call `init`, then `add_row` or `add_rows` one or more times, and finally, `commit`, in that order. | ||
""" | ||
|
||
def headers(self) -> Dict[str, str]: | ||
return { | ||
"Content-Type": "application/json", | ||
|
@@ -55,29 +66,25 @@ def headers(self) -> Dict[str, str]: | |
def url(self, path: str) -> str: | ||
return f"{self.api_host}/{self.api_path_root + '/' if self.api_path_root != '' else ''}{path}" | ||
|
||
""" | ||
An API client for interacting with a Glide Big Table. The intention is to | ||
create a new table or update an existing table including the table's schema | ||
and the table's rows. | ||
|
||
The protocol is to call `init`, `set_schema`, `add_rows` one or more times, and `commit` in that order. | ||
""" | ||
|
||
def init(self, api_key, table_name, api_host="https://api.glideapps.com", api_path_root=""): | ||
def init(self, api_key, table_name, columns, api_host="https://api.glideapps.com", api_path_root="", batch_size = DEFAULT_BATCH_SIZE): | ||
""" | ||
Sets the connection information for the table. | ||
""" | ||
self.api_host = api_host | ||
# todo: validate args | ||
self.api_key = api_key | ||
self.api_host = api_host | ||
self.api_path_root = api_path_root | ||
|
||
self.table_name = table_name | ||
# todo: validate args | ||
self.columns = columns | ||
|
||
# TODO: to optimize batch size for variable number and size of columns, we could estimate row byte size based on the first row and choose a batch size based on that. | ||
self.batch_size = batch_size | ||
|
||
@abstractmethod | ||
def set_schema(self, columns: List[Column]) -> None: | ||
def add_row(self, row: BigTableRow) -> None: | ||
""" | ||
set_schemas the table with the given schema. | ||
Each column is a json-schema property where the key is the column name and the type is the . | ||
Adds a row to the table. | ||
""" | ||
pass | ||
|
||
|
@@ -108,71 +115,47 @@ def create(cls) -> GlideBigTableBase: | |
return GlideBigTableRestStrategy() | ||
|
||
class GlideBigTableRestStrategy(GlideBigTableBase): | ||
def reset(self): | ||
self.stash_id = None | ||
self.stash_serial = 0 | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.reset() | ||
|
||
def set_schema(self, columns: List[Column]) -> None: | ||
logger.debug(f"set_schema columns: {columns}") | ||
if columns is None: | ||
raise ValueError("columns must be provided") | ||
if len(columns) == 0: | ||
raise ValueError("columns must be provided") | ||
self.reset() | ||
self.columns = columns | ||
# Create stash we can stash records into for later | ||
r = requests.post( | ||
self.url(f"stashes"), | ||
headers=self.headers(), | ||
) | ||
try: | ||
r.raise_for_status() | ||
except Exception as e: | ||
raise Exception(f"failed to create stash. Response was '{r.text}'") from e # nopep8 | ||
|
||
result = r.json() | ||
self.stash_id = result["data"]["stashID"] | ||
self.stash_id = str(uuid.uuid4()) | ||
self.stash_serial = 0 | ||
logger.info(f"Created stash for records with id '{self.stash_id}'") | ||
self.buffer = [] | ||
|
||
def raise_if_set_schema_not_called(self): | ||
if self.stash_id is None: | ||
raise ValueError( | ||
"set_schema must be called before add_rows or commit") | ||
def _flush_buffer(self): | ||
rows = self.buffer | ||
if len(rows) == 0: | ||
return | ||
self.buffer = [] | ||
|
||
def _add_row_batch(self, rows: List[BigTableRow]) -> None: | ||
logger.debug(f"Adding rows batch with size {len(rows)}") | ||
path = f"stashes/{self.stash_id}/{self.stash_serial}" | ||
logger.debug(f"Flushing {len(rows)} rows to {path} ...") | ||
r = requests.post( | ||
self.url(f"stashes/{self.stash_id}/{self.stash_serial}"), | ||
self.url(path), | ||
headers=self.headers(), | ||
json=rows | ||
|
||
) | ||
try: | ||
r.raise_for_status() | ||
except Exception as e: | ||
raise Exception(f"failed to add rows batch for serial '{self.stash_serial}'. Response was '{r.text}'") from e # nopep8 | ||
raise Exception(f"Failed to post rows batch to {path} : {r.text}") from e # nopep8 | ||
|
||
logger.info(f"Added {len(rows)} rows as batch for serial '{self.stash_serial}' successfully.") # nopep8 | ||
logger.info(f"Successfully posted {len(rows)} rows to {path}") # nopep8 | ||
self.stash_serial += 1 | ||
|
||
def add_row(self, row: BigTableRow) -> None: | ||
self.buffer.append(row) | ||
if len(self.buffer) >= self.batch_size: | ||
self._flush_buffer() | ||
|
||
def add_rows(self, rows: Iterator[BigTableRow]) -> None: | ||
self.raise_if_set_schema_not_called() | ||
# TODO: to optimize batch size for variable number and size of columns, we could estimate row byte size based on the first row and choose a batch size based on that. | ||
BATCH_SIZE = 500 | ||
batch = [] | ||
for i in range(0, len(rows), BATCH_SIZE): | ||
batch = rows[i:i + min(BATCH_SIZE, len(rows) - i)] | ||
self._add_row_batch(batch) | ||
self.buffer.extend(rows) | ||
if len(self.buffer) >= self.batch_size: | ||
self._flush_buffer() | ||
|
||
def create_table_from_stash(self) -> None: | ||
logger.info(f"Creating new table for table name '{self.table_name}'...") # nopep8 | ||
logger.info(f"Creating new table '{self.table_name}' ...") # nopep8 | ||
r = requests.post( | ||
self.url(f"tables"), | ||
self.url(f"tables?onSchemaError=dropColumns"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not appear to be documented: https://apidocs.glideapps.com/api-reference/v2/tables/post-tables There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docs update was awaiting PR review on the monolith for a few days, but it should hopefully go out to prod today. |
||
headers=self.headers(), | ||
json={ | ||
"name": self.table_name, | ||
|
@@ -187,14 +170,14 @@ def create_table_from_stash(self) -> None: | |
try: | ||
r.raise_for_status() | ||
except Exception as e: | ||
raise Exception(f"failed to create table '{self.table_name}'. Response was '{r.text}'.") from e # nopep8 | ||
raise Exception(f"Failed to create table '{self.table_name}' : {r.text}") from e # nopep8 | ||
|
||
logger.info(f"Creating table '{self.table_name}' succeeded.") | ||
logger.info(f"Successfully created table '{self.table_name}'") | ||
|
||
def overwrite_table_from_stash(self, table_id) -> None: | ||
# overwrite the specified table's schema and rows with the stash: | ||
r = requests.put( | ||
self.url(f"tables/{table_id}"), | ||
self.url(f"tables/{table_id}?onSchemaError=dropColumns"), | ||
headers=self.headers(), | ||
json={ | ||
"schema": { | ||
|
@@ -208,10 +191,9 @@ def overwrite_table_from_stash(self, table_id) -> None: | |
try: | ||
r.raise_for_status() | ||
except Exception as e: | ||
raise Exception(f"failed to overwrite table '{table_id}'. Response was '{r.text}'") from e # nopep8 | ||
raise Exception(f"Failed to overwrite table '{table_id}' : {r.text}") from e # nopep8 | ||
|
||
def commit(self) -> None: | ||
self.raise_if_set_schema_not_called() | ||
# first see if the table already exists | ||
r = requests.get( | ||
self.url(f"tables"), | ||
|
@@ -220,7 +202,7 @@ def commit(self) -> None: | |
try: | ||
r.raise_for_status() | ||
except Exception as e: | ||
raise Exception(f"Failed to get table list. Response was '{r.text}'.") from e # nopep8 | ||
raise Exception(f"Failed to get table list: {r.text}") from e # nopep8 | ||
|
||
found_table_id = None | ||
# confirm if table exists: | ||
|
@@ -234,6 +216,10 @@ def commit(self) -> None: | |
logger.info(f"Found existing table to reuse for table name '{self.table_name}' with ID '{found_table_id}'.") # nopep8 | ||
break | ||
|
||
# flush any remaining buffer to the stash | ||
self._flush_buffer() | ||
|
||
# commit the stash to the table | ||
if found_table_id != None: | ||
self.overwrite_table_from_stash(found_table_id) | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
{ | ||
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/glide", | ||
"supported_destination_sync_modes": ["overwrite"], | ||
"supportsIncremental": true, | ||
"supportsIncremental": false, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch on the whole incremental & state record management. I think I followed a pattern from the first day of developing it and never really got back to thinking it through or testing it. |
||
"connectionSpecification": { | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"title": "Destination Glide", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combining init & set_schema is a nice simplification 🙇♂️