diff --git a/kstreams/engine.py b/kstreams/engine.py index 09fe958..2c23d20 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -202,8 +202,8 @@ def add_stream(self, stream: Stream) -> None: stream.rebalance_listener.engine = self # type: ignore async def remove_stream(self, stream: Stream) -> None: - await stream.stop() self._streams.remove(stream) + await stream.stop() self.monitor.clean_stream_consumer_metrics(stream) def stream( diff --git a/kstreams/prometheus/monitor.py b/kstreams/prometheus/monitor.py index 522dfe3..8f1f4ea 100644 --- a/kstreams/prometheus/monitor.py +++ b/kstreams/prometheus/monitor.py @@ -220,4 +220,10 @@ async def _metrics_task(self) -> None: await asyncio.sleep(3) for stream in self._streams: if stream.consumer is not None: - await self.generate_consumer_metrics(stream.consumer) + try: + await self.generate_consumer_metrics(stream.consumer) + except RuntimeError: + logger.debug( + f"Metrics for stream {stream.name} can not be generated " + "probably because it has been removed" + ) diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 5c5ea9b..f286225 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -82,7 +82,7 @@ async def my_coroutine(_): ) consumer_position = await consumer.position(topic_partition) - commited_position = consumer.last_stable_offset(topic_partition) + commited_position = await consumer.committed(topic_partition) assert met_committed == commited_position assert met_position == consumer_position @@ -189,7 +189,7 @@ async def my_coroutine(_): ) consumer_position = await consumer.position(topic_partition) - commited_position = consumer.last_stable_offset(topic_partition) + commited_position = await consumer.committed(topic_partition) assert met_committed == commited_position assert met_position == consumer_position