Skip to content

Commit

Permalink
improve perf metrics for slow-starting streams
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Sep 20, 2024
1 parent f4a3170 commit 8b1ee47
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,17 @@ def tally_records_read(
self.stream_read_counts[message.record.stream] += 1

if message.record.stream not in self.stream_read_start_times:
self._log_stream_read_start(stream_name=message.record.stream)
self.log_stream_start(stream_name=message.record.stream)

elif (
message.trace
and message.trace.stream_status
and message.trace.stream_status.status is AirbyteStreamStatus.COMPLETE
):
self._log_stream_read_end(
stream_name=message.trace.stream_status.stream_descriptor.name
)
elif message.trace and message.trace.stream_status:
if message.trace.stream_status.status is AirbyteStreamStatus.STARTED:
self.log_stream_start(
stream_name=message.trace.stream_status.stream_descriptor.name
)
if message.trace.stream_status.status is AirbyteStreamStatus.COMPLETE:
self._log_stream_read_end(
stream_name=message.trace.stream_status.stream_descriptor.name
)

# Bail if we're not due for a progress update.
if count % update_period != 0:
Expand Down Expand Up @@ -345,12 +346,12 @@ def tally_confirmed_writes(
"""
self._start_rich_view() # Start Rich's live view if not already running.
for message in messages:
if message.type is Type.STATE:
if message.state:
# This is a state message from the destination. Tally the records written.
if message.state.stream and message.state.destinationStats:
stream_name = message.state.stream.stream_descriptor.name
self.destination_stream_records_confirmed[stream_name] += (
message.state.destinationStats.recordCount
self.destination_stream_records_confirmed[stream_name] += int(
message.state.destinationStats.recordCount or 0
)
self._update_display()

Expand Down Expand Up @@ -418,11 +419,13 @@ def _log_sync_start(self) -> None:
event_type=EventType.SYNC,
)

def _log_stream_read_start(self, stream_name: str) -> None:
self._print_info_message(
f"Read started on stream `{stream_name}` at `{pendulum.now().format('HH:mm:ss')}`..."
)
self.stream_read_start_times[stream_name] = time.time()
def log_stream_start(self, stream_name: str) -> None:
"""Log that a stream has started reading."""
if stream_name not in self.stream_read_start_times:
self._print_info_message(
f"Read started on stream `{stream_name}` at `{pendulum.now().format('HH:mm:ss')}`..."
)
self.stream_read_start_times[stream_name] = time.time()

def _log_stream_read_end(self, stream_name: str) -> None:
self._print_info_message(
Expand Down

0 comments on commit 8b1ee47

Please sign in to comment.