Skip to content

Commit

Permalink
fix: race condition when creating metrics at the same time that a str…
Browse files Browse the repository at this point in the history
…eam is removed
  • Loading branch information
marcosschroh committed Jul 25, 2023
1 parent bab1531 commit f76be2c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
2 changes: 1 addition & 1 deletion kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ def add_stream(self, stream: Stream) -> None:
stream.rebalance_listener.engine = self # type: ignore

async def remove_stream(self, stream: Stream) -> None:
await stream.stop()
self._streams.remove(stream)
await stream.stop()
self.monitor.clean_stream_consumer_metrics(stream)

def stream(
Expand Down
8 changes: 7 additions & 1 deletion kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,10 @@ async def _metrics_task(self) -> None:
await asyncio.sleep(3)
for stream in self._streams:
if stream.consumer is not None:
await self.generate_consumer_metrics(stream.consumer)
try:
await self.generate_consumer_metrics(stream.consumer)
except RuntimeError:
logger.debug(

Check warning on line 226 in kstreams/prometheus/monitor.py

View check run for this annotation

Codecov / codecov/patch

kstreams/prometheus/monitor.py#L225-L226

Added lines #L225 - L226 were not covered by tests
f"Metrics for stream {stream.name} can not be generated "
"probably because it has been removed"
)
4 changes: 2 additions & 2 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def my_coroutine(_):
)

consumer_position = await consumer.position(topic_partition)
commited_position = consumer.last_stable_offset(topic_partition)
commited_position = await consumer.committed(topic_partition)

assert met_committed == commited_position
assert met_position == consumer_position
Expand Down Expand Up @@ -189,7 +189,7 @@ async def my_coroutine(_):
)

consumer_position = await consumer.position(topic_partition)
commited_position = consumer.last_stable_offset(topic_partition)
commited_position = await consumer.committed(topic_partition)

assert met_committed == commited_position
assert met_position == consumer_position
Expand Down

0 comments on commit f76be2c

Please sign in to comment.