Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glide destination updates #12

Merged
merged 7 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions airbyte-integrations/connectors/destination-glide/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,48 @@ For information about how to use this connector within Airbyte, see [the documen

## Development

The active todo list is at [./todo.md].
The gist of the Glide-specific code is in `/destination_glide/destination.py` and `/destination_glide/glide.py`.
The active todo list is at `./todo.md`.

### Prerequisites
The gist of the Glide-specific code is in `destination_glide/destination.py` and `destination_glide/glide.py`.

- Python (`^3.9`, tested recently with `3.12.3`)
- Poetry (`^1.7`, tested recently with `1.8.3_1`)
### Setup

I used homebrew for installing these prerequisites on macOS.
1. Ensure you have the following prerequisites installed:

### Unit Tests
- Python (`^3.9`, tested recently with `3.12.3`)
- Poetry (`^1.7`, tested recently with `1.8.3_1`)

The unit tests for that code are in `/destination-glide/unit_tests`. To run them run:
You can use homebrew to install these on macOS.

2. Once you have the above, run:

```sh
./scripts/test-unit.sh
```
poetry install
```

### Running the Tests

### Integration Tests
Create the file `secrets/config.json`. It must confirm to the configuration specification in `destination_glide/spec.json`, which also specifies the configuration UI within the Airbyte product itself for configuring the destination.

The destination has a configuration in `/secrets/config.json`. That file must confirm to the configuration specification in `/destination_glide/spec.json`. It should be something like:
It should be something like:

```json
{
"api_host": "http://localhost:5005",
"api_path_root": "api",
"api_host": "https://api.staging.glideapps.com",
"api_path_root": "",
"api_key": "decafbad-1234-1234-1234-decafbad"
}
```

The spec also specifies the configuration UI within the Airbyte product itself for configuring the destination.
#### Unit Tests

The unit tests for that code are in `destination-glide/unit_tests`. To run them run:

```sh
./scripts/test-unit.sh
```

#### Integration Tests

There are a set of simple integration tests that Airbyte provides that can be triggered with the following scripts:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ def write(
# configure the table based on the stream catalog:
# choose a strategy based on config:

def create_table_client_for_stream(stream_name):
def create_table_client_for_stream(stream_name, columns):
# TODO: sanitize stream_name chars and length for GBT name
glide = GlideBigTableFactory.create()
glide.init(api_key, stream_name, api_host, api_path_root)
glide.init(api_key, stream_name, columns, api_host, api_path_root)
return glide

table_clients = {}
Expand All @@ -91,8 +91,6 @@ def create_table_client_for_stream(stream_name):
if configured_stream.destination_sync_mode != DestinationSyncMode.overwrite:
raise Exception(f'Only destination sync mode overwrite is supported, but received "{configured_stream.destination_sync_mode}".') # nopep8 because https://github.com/hhatto/autopep8/issues/712

glide = create_table_client_for_stream(
configured_stream.stream.name)
# upsert the GBT with schema to set_schema for dumping the data into it
columns = []
properties = configured_stream.stream.json_schema["properties"]
Expand All @@ -103,11 +101,10 @@ def create_table_client_for_stream(stream_name):
Column(prop_name, airbyteTypeToGlideType(prop_type))
)

glide.set_schema(columns)
glide = create_table_client_for_stream(configured_stream.stream.name, columns)
table_clients[configured_stream.stream.name] = glide

# stream the records into the GBT:
buffers = defaultdict(list)
logger.debug("Processing messages...")
for message in input_messages:
logger.debug(f"processing message {message.type}...")
Expand All @@ -120,36 +117,16 @@ def create_table_client_for_stream(stream_name):
continue

# add to buffer
record_data = message.record.data
record_id = str(uuid.uuid4())
stream_buffer = buffers[stream_name]
stream_buffer.append(
(record_id, datetime.datetime.now().isoformat(), record_data))
client = table_clients[stream_name]
client.add_row(message.record.data)
logger.debug("buffering record complete.")

elif message.type == Type.STATE:
# `Type.State` is a signal from the source that we should save the previous batch of `Type.RECORD` messages to the destination.
# It is a checkpoint that enables partial success.
# See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#state--checkpointing
logger.info(f"Writing buffered records to Glide API from {len(buffers.keys())} streams...") # nopep8
for stream_name in buffers.keys():
stream_buffer = buffers[stream_name]
logger.info(f"Saving buffered records to Glide API (stream: '{stream_name}', record count: '{len(stream_buffer)}')...") # nopep8
DATA_INDEX = 2
data_rows = [row_tuple[DATA_INDEX]
for row_tuple in stream_buffer]
if len(data_rows) > 0:
if stream_name not in table_clients:
raise Exception(
f"Stream '{stream_name}' not found in table_clients")
glide = table_clients[stream_name]
glide.add_rows(data_rows)
stream_buffer.clear()
logger.info(f"Saving buffered records to Glide API complete.") # nopep8 because https://github.com/hhatto/autopep8/issues/712

# dump all buffers now as we just wrote them to the table:
buffers = defaultdict(list)
yield message
# FIXME: I don't think partial success applies to us since we only support overwrite mode anyway?
logger.info(f"Ignoring state message: {message.state}")
else:
logger.warn(f"Ignoring unknown Airbyte input message type: {message.type}") # nopep8 because https://github.com/hhatto/autopep8/issues/712

Expand All @@ -158,7 +135,8 @@ def create_table_client_for_stream(stream_name):
glide.commit()
logger.info(f"Committed stream '{stream_name}' to Glide.")

pass
# see https://stackoverflow.com/a/36863998
yield from ()

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
import uuid
from .log import LOG_LEVEL_DEFAULT
import logging
import requests
Expand All @@ -18,6 +19,8 @@
"json",
]

DEFAULT_BATCH_SIZE = 1500

class Column(dict):
"""
Represents a Column in the glide API.
Expand Down Expand Up @@ -46,6 +49,14 @@ def __repr__(self):


class GlideBigTableBase(ABC):
"""
An API client for interacting with a Glide Big Table. The intention is to
create a new table or update an existing table including the table's schema
and the table's rows.

The protocol is to call `init`, then `add_row` or `add_rows` one or more times, and finally, `commit`, in that order.
"""

def headers(self) -> Dict[str, str]:
return {
"Content-Type": "application/json",
Expand All @@ -55,29 +66,25 @@ def headers(self) -> Dict[str, str]:
def url(self, path: str) -> str:
return f"{self.api_host}/{self.api_path_root + '/' if self.api_path_root != '' else ''}{path}"

"""
An API client for interacting with a Glide Big Table. The intention is to
create a new table or update an existing table including the table's schema
and the table's rows.

The protocol is to call `init`, `set_schema`, `add_rows` one or more times, and `commit` in that order.
"""

def init(self, api_key, table_name, api_host="https://api.glideapps.com", api_path_root=""):
def init(self, api_key, table_name, columns, api_host="https://api.glideapps.com", api_path_root="", batch_size = DEFAULT_BATCH_SIZE):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combining init & set_schema is a nice simplification 🙇‍♂️

"""
Sets the connection information for the table.
"""
self.api_host = api_host
# todo: validate args
self.api_key = api_key
self.api_host = api_host
self.api_path_root = api_path_root

self.table_name = table_name
# todo: validate args
self.columns = columns

# TODO: to optimize batch size for variable number and size of columns, we could estimate row byte size based on the first row and choose a batch size based on that.
self.batch_size = batch_size

@abstractmethod
def set_schema(self, columns: List[Column]) -> None:
def add_row(self, row: BigTableRow) -> None:
"""
set_schemas the table with the given schema.
Each column is a json-schema property where the key is the column name and the type is the .
Adds a row to the table.
"""
pass

Expand Down Expand Up @@ -108,71 +115,47 @@ def create(cls) -> GlideBigTableBase:
return GlideBigTableRestStrategy()

class GlideBigTableRestStrategy(GlideBigTableBase):
def reset(self):
self.stash_id = None
self.stash_serial = 0

def __init__(self):
super().__init__()
self.reset()

def set_schema(self, columns: List[Column]) -> None:
logger.debug(f"set_schema columns: {columns}")
if columns is None:
raise ValueError("columns must be provided")
if len(columns) == 0:
raise ValueError("columns must be provided")
self.reset()
self.columns = columns
# Create stash we can stash records into for later
r = requests.post(
self.url(f"stashes"),
headers=self.headers(),
)
try:
r.raise_for_status()
except Exception as e:
raise Exception(f"failed to create stash. Response was '{r.text}'") from e # nopep8

result = r.json()
self.stash_id = result["data"]["stashID"]
self.stash_id = str(uuid.uuid4())
self.stash_serial = 0
logger.info(f"Created stash for records with id '{self.stash_id}'")
self.buffer = []

def raise_if_set_schema_not_called(self):
if self.stash_id is None:
raise ValueError(
"set_schema must be called before add_rows or commit")
def _flush_buffer(self):
rows = self.buffer
if len(rows) == 0:
return
self.buffer = []

def _add_row_batch(self, rows: List[BigTableRow]) -> None:
logger.debug(f"Adding rows batch with size {len(rows)}")
path = f"stashes/{self.stash_id}/{self.stash_serial}"
logger.debug(f"Flushing {len(rows)} rows to {path} ...")
r = requests.post(
self.url(f"stashes/{self.stash_id}/{self.stash_serial}"),
self.url(path),
headers=self.headers(),
json=rows

)
try:
r.raise_for_status()
except Exception as e:
raise Exception(f"failed to add rows batch for serial '{self.stash_serial}'. Response was '{r.text}'") from e # nopep8
raise Exception(f"Failed to post rows batch to {path} : {r.text}") from e # nopep8

logger.info(f"Added {len(rows)} rows as batch for serial '{self.stash_serial}' successfully.") # nopep8
logger.info(f"Successfully posted {len(rows)} rows to {path}") # nopep8
self.stash_serial += 1

def add_row(self, row: BigTableRow) -> None:
self.buffer.append(row)
if len(self.buffer) >= self.batch_size:
self._flush_buffer()

def add_rows(self, rows: Iterator[BigTableRow]) -> None:
self.raise_if_set_schema_not_called()
# TODO: to optimize batch size for variable number and size of columns, we could estimate row byte size based on the first row and choose a batch size based on that.
BATCH_SIZE = 500
batch = []
for i in range(0, len(rows), BATCH_SIZE):
batch = rows[i:i + min(BATCH_SIZE, len(rows) - i)]
self._add_row_batch(batch)
self.buffer.extend(rows)
if len(self.buffer) >= self.batch_size:
self._flush_buffer()

def create_table_from_stash(self) -> None:
logger.info(f"Creating new table for table name '{self.table_name}'...") # nopep8
logger.info(f"Creating new table '{self.table_name}' ...") # nopep8
r = requests.post(
self.url(f"tables"),
self.url(f"tables?onSchemaError=dropColumns"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs update was awaiting PR review on the monolith for a few days, but it should hopefully go out to prod today.

headers=self.headers(),
json={
"name": self.table_name,
Expand All @@ -187,14 +170,14 @@ def create_table_from_stash(self) -> None:
try:
r.raise_for_status()
except Exception as e:
raise Exception(f"failed to create table '{self.table_name}'. Response was '{r.text}'.") from e # nopep8
raise Exception(f"Failed to create table '{self.table_name}' : {r.text}") from e # nopep8

logger.info(f"Creating table '{self.table_name}' succeeded.")
logger.info(f"Successfully created table '{self.table_name}'")

def overwrite_table_from_stash(self, table_id) -> None:
# overwrite the specified table's schema and rows with the stash:
r = requests.put(
self.url(f"tables/{table_id}"),
self.url(f"tables/{table_id}?onSchemaError=dropColumns"),
headers=self.headers(),
json={
"schema": {
Expand All @@ -208,10 +191,9 @@ def overwrite_table_from_stash(self, table_id) -> None:
try:
r.raise_for_status()
except Exception as e:
raise Exception(f"failed to overwrite table '{table_id}'. Response was '{r.text}'") from e # nopep8
raise Exception(f"Failed to overwrite table '{table_id}' : {r.text}") from e # nopep8

def commit(self) -> None:
self.raise_if_set_schema_not_called()
# first see if the table already exists
r = requests.get(
self.url(f"tables"),
Expand All @@ -220,7 +202,7 @@ def commit(self) -> None:
try:
r.raise_for_status()
except Exception as e:
raise Exception(f"Failed to get table list. Response was '{r.text}'.") from e # nopep8
raise Exception(f"Failed to get table list: {r.text}") from e # nopep8

found_table_id = None
# confirm if table exists:
Expand All @@ -234,6 +216,10 @@ def commit(self) -> None:
logger.info(f"Found existing table to reuse for table name '{self.table_name}' with ID '{found_table_id}'.") # nopep8
break

# flush any remaining buffer to the stash
self._flush_buffer()

# commit the stash to the table
if found_table_id != None:
self.overwrite_table_from_stash(found_table_id)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/glide",
"supported_destination_sync_modes": ["overwrite"],
"supportsIncremental": true,
"supportsIncremental": false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the whole incremental & state record management. I think I followed a pattern from the first day of developing it and never really got back to thinking it through or testing it.

"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Destination Glide",
Expand Down
Loading
Loading