Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Nov 12, 2024
1 parent 6bf306d commit 44ef500
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 10 deletions.
8 changes: 4 additions & 4 deletions python/hsfs/core/ingestion_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ def remaining_entries(self) -> int:
return self._remaining_entries

def wait_for_completion(self):
with open(tqdm(total=self.total_entries,
bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt} | Elapsed Time: {elapsed} | Remaining Time: {remaining}",
desc="Ingestion run progress",
mininterval=1)) as progress_bar:
with tqdm(total=self.total_entries,
bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt} | Elapsed Time: {elapsed} | Remaining Time: {remaining}",
desc="Ingestion run progress",
mininterval=1) as progress_bar:
while True:
progress_bar.n = self.total_entries - self.remaining_entries
progress_bar.refresh()
Expand Down
13 changes: 7 additions & 6 deletions python/hsfs/core/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,13 @@ def on_finished():

result = func(instance, feature_group, *args, **kwargs)

if stream:
# Start the monitoring thread
monitor_thread = Thread(target=wait_for_query, args=(result, on_finished))
monitor_thread.start()
else:
on_finished()
if not feature_group._multi_part_insert:
if stream:
# Start the monitoring thread
monitor_thread = Thread(target=wait_for_query, args=(result, on_finished))
monitor_thread.start()
else:
on_finished()

return result
return wrapper
Expand Down
8 changes: 8 additions & 0 deletions python/tests/engine/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -3503,6 +3503,7 @@ def test_materialization_kafka(self, mocker):
# Arrange
mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={})
mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
mocker.patch("hsfs.core.kafka_engine.get_encoder_func")
mocker.patch("hsfs.core.kafka_engine.encode_complex_features")
mock_python_engine_kafka_produce = mocker.patch(
Expand Down Expand Up @@ -3556,11 +3557,13 @@ def test_materialization_kafka(self, mocker):
args="defaults",
await_termination=False,
)
assert mock_save_ingestion_run.call_count == 1

def test_materialization_kafka_first_job_execution(self, mocker):
# Arrange
mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={})
mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
mocker.patch("hsfs.core.kafka_engine.get_encoder_func")
mocker.patch("hsfs.core.kafka_engine.encode_complex_features")
mock_python_engine_kafka_produce = mocker.patch(
Expand Down Expand Up @@ -3614,11 +3617,13 @@ def test_materialization_kafka_first_job_execution(self, mocker):
args="defaults -initialCheckPointString tests_offsets",
await_termination=False,
)
assert mock_save_ingestion_run.call_count == 1

def test_materialization_kafka_skip_offsets(self, mocker):
# Arrange
mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={})
mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
mocker.patch("hsfs.core.kafka_engine.get_encoder_func")
mocker.patch("hsfs.core.kafka_engine.encode_complex_features")
mock_python_engine_kafka_produce = mocker.patch(
Expand Down Expand Up @@ -3671,11 +3676,13 @@ def test_materialization_kafka_skip_offsets(self, mocker):
args="defaults -initialCheckPointString tests_offsets",
await_termination=False,
)
assert mock_save_ingestion_run.call_count == 1

def test_materialization_kafka_topic_doesnt_exist(self, mocker):
# Arrange
mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={})
mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
mocker.patch("hsfs.core.kafka_engine.get_encoder_func")
mocker.patch("hsfs.core.kafka_engine.encode_complex_features")
mock_python_engine_kafka_produce = mocker.patch(
Expand Down Expand Up @@ -3725,6 +3732,7 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker):
args="defaults -initialCheckPointString tests_offsets",
await_termination=False,
)
assert mock_save_ingestion_run.call_count == 1

def test_test(self, mocker):
fg = feature_group.FeatureGroup(
Expand Down
2 changes: 2 additions & 0 deletions python/tests/engine/test_python_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times):
avro_schema_mock = mocker.patch(
"hsfs.feature_group.FeatureGroup._get_encoded_avro_schema"
)
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
avro_schema = (
'{"type":"record","name":"test_fg","namespace":"test_featurestore.db","fields":'
'[{"name":"primary_key","type":["null","long"]},{"name":"event_date","type":'
Expand Down Expand Up @@ -113,3 +114,4 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times):
}

assert reference_record == record
assert mock_save_ingestion_run.call_count == 1
4 changes: 4 additions & 0 deletions python/tests/engine/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures):
mock_storage_connector_api = mocker.patch(
"hsfs.core.storage_connector_api.StorageConnectorApi"
)
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"]
sc = storage_connector.StorageConnector.from_response_json(json)
mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc
Expand Down Expand Up @@ -1288,6 +1289,7 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures):
mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count
== 1
)
assert mock_save_ingestion_run.call_count == 1

def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures):
# Arrange
Expand Down Expand Up @@ -1590,6 +1592,7 @@ def test_save_online_dataframe(self, mocker, backend_fixtures):
mock_storage_connector_api = mocker.patch(
"hsfs.core.storage_connector_api.StorageConnectorApi"
)
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"]
sc = storage_connector.StorageConnector.from_response_json(json)
mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc
Expand Down Expand Up @@ -1685,6 +1688,7 @@ def test_save_online_dataframe(self, mocker, backend_fixtures):
mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.return_value.save.call_count
== 1
)
assert mock_save_ingestion_run.call_count == 1

def test_serialize_to_avro(self, mocker):
# Arrange
Expand Down
6 changes: 6 additions & 0 deletions python/tests/test_feature_group_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class TestFeatureGroupWriter:
def test_fg_writer_context_manager(self, mocker, dataframe_fixture_basic):
mock_insert = mocker.patch("hsfs.feature_group.FeatureGroup.insert")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")

fg = feature_group.FeatureGroup(
name="test",
Expand All @@ -45,12 +46,14 @@ def test_fg_writer_context_manager(self, mocker, dataframe_fixture_basic):
validation_options={"fetch_expectation_suite": False},
)
assert fg._multi_part_insert is False
assert mock_save_ingestion_run.call_count == 0

def test_fg_writer_cache_management(self, mocker, dataframe_fixture_basic):
engine = python.Engine()
mocker.patch("hsfs.engine.get_instance", return_value=engine)
mocker.patch("hopsworks_common.client.get_instance")
mocker.patch("hsfs.core.kafka_engine.kafka_get_offsets", return_value="")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
producer, feature_writers, writer_m = (
mocker.MagicMock(),
mocker.MagicMock(),
Expand Down Expand Up @@ -104,12 +107,14 @@ def test_fg_writer_cache_management(self, mocker, dataframe_fixture_basic):
assert fg._feature_writers is None
assert fg._kafka_headers is None
assert fg._writer is None
assert mock_save_ingestion_run.call_count == 0

def test_fg_writer_without_context_manager(self, mocker, dataframe_fixture_basic):
engine = python.Engine()
mocker.patch("hsfs.engine.get_instance", return_value=engine)
mocker.patch("hopsworks_common.client.get_instance")
mocker.patch("hsfs.core.kafka_engine.kafka_get_offsets", return_value="")
mock_save_ingestion_run = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi.save_ingestion_run")
producer, feature_writers, writer_m = (
mocker.MagicMock(),
mocker.MagicMock(),
Expand Down Expand Up @@ -161,3 +166,4 @@ def test_fg_writer_without_context_manager(self, mocker, dataframe_fixture_basic
assert fg._feature_writers is None
assert fg._kafka_headers is None
assert fg._writer is None
assert mock_save_ingestion_run.call_count == 0

0 comments on commit 44ef500

Please sign in to comment.