Skip to content

Commit

Permalink
fix: destination would fail when a source reporte
Browse files Browse the repository at this point in the history
d columns without any type
fixes glideapps/glide#29913
  • Loading branch information
activescott committed Sep 23, 2024
1 parent 3b28bb5 commit 894856c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def create_table_client_for_stream(stream_name, columns):
columns = []
properties = configured_stream.stream.json_schema["properties"]
for (prop_name, prop) in properties.items():
prop_type = prop["type"]
prop_type = prop["type"] if "type" in prop else ""
logger.debug(f"Found column/property '{prop_name}' with type '{prop_type}' in stream {configured_stream.stream.name}.") # nopep8
columns.append(
Column(prop_name, airbyteTypeToGlideType(prop_type))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,68 @@ def test_write_simple(self, mock_factory: Callable):
self.assertEqual(
"commit", mock_bigtable.mock_calls[3][CALL_METHOD_NAME_INDEX])


@patch.object(GlideBigTableFactory, "create")
def test_write_source_schema_without_type(self, mock_factory: Callable):
"""
This tests a case where the source schema has a property without a type.
This happened with Github's "issues" stream for a property.
"""
mock_bigtable = CreateMockGlideBigTable()
mock_factory.return_value = mock_bigtable

destination = DestinationGlide()

# create a schema with a property that has no type:
my_schema = {
"type": "object",
"properties": {
"key_str": {"type": "string"},
"no_type_col": {}
},
}

generator = destination.write(
config=create_default_config(),
configured_catalog=create_configured_catalog_default(self.test_table_name,
table_schema=my_schema),
input_messages=[
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=self.test_table_name, data={"key_str": f"row {0}", "no_type_col": f"row {0}"}, emitted_at=int(datetime.now().timestamp()) * 1000
),
),
]
)

# invoke the generator to get the results:
result = list(generator)
self.assertEqual(0, len(result))

# ensure it called init, multiple add_rows, followed by commit:
self.assertEqual(3, len(mock_bigtable.mock_calls))
# NOTE: the call objects in Mock.mock_calls, are three-tuples of (name, positional args, keyword args).
CALL_METHOD_NAME_INDEX = 0
EXPECTED_CALLS = ["init", "add_row", "commit"]
# now loop through each expected call and make sure it was found in the list of actual calls made by the destination:
for expected_call in EXPECTED_CALLS:
found = False
for actual_call in mock_bigtable.mock_calls:
if actual_call[CALL_METHOD_NAME_INDEX] == expected_call:
found = True
break
self.assertTrue(found, f"Expected call {expected_call} not found in actual calls")

# get the columns we passed into the big table during init and verify the type defaulted to string:
ARGS_INDEX = 1
init_call_args = mock_bigtable.mock_calls[0][ARGS_INDEX]
ARGS_INDEX_COLUMNS = 2
columns = init_call_args[ARGS_INDEX_COLUMNS]
col_no_type = [col for col in columns if col.id() == "no_type_col"]
self.assertEqual(col_no_type[0].type(), "string")


@patch.object(GlideBigTableFactory, "create")
def test_write_with_checkpoints(self, mock_factory: Callable):
"""
Expand Down

0 comments on commit 894856c

Please sign in to comment.