Skip to content

Commit

Permalink
FIX: remove stop from remove stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Reid Meyer committed Aug 10, 2023
1 parent 6aaf016 commit 9a8f65a
Show file tree
Hide file tree
Showing 13 changed files with 14 additions and 1,546 deletions.
5 changes: 0 additions & 5 deletions kstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

from .clients import Consumer, ConsumerType, Producer, ProducerType
from .create import StreamEngine, create_engine
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType
from .rebalance_listener import (
ManualCommitRebalanceListener,
MetricsRebalanceListener,
RebalanceListener,
)
from .streams import Stream, stream
from .structs import TopicPartitionOffset
from .test_utils import TestStreamClient

__all__ = [
"Consumer",
Expand All @@ -19,15 +17,12 @@
"ProducerType",
"StreamEngine",
"create_engine",
"PrometheusMonitor",
"PrometheusMonitorType",
"MetricsRebalanceListener",
"ManualCommitRebalanceListener",
"RebalanceListener",
"Stream",
"stream",
"ConsumerRecord",
"TestStreamClient",
"TopicPartition",
"TopicPartitionOffset",
]
5 changes: 0 additions & 5 deletions kstreams/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from .backends.kafka import Kafka
from .clients import Consumer, ConsumerType, Producer, ProducerType
from .engine import StreamEngine
from .prometheus.monitor import PrometheusMonitor
from .serializers import Deserializer, Serializer


Expand All @@ -14,11 +13,8 @@ def create_engine(
producer_class: Type[ProducerType] = Producer,
serializer: Optional[Serializer] = None,
deserializer: Optional[Deserializer] = None,
monitor: Optional[PrometheusMonitor] = None,
) -> StreamEngine:

if monitor is None:
monitor = PrometheusMonitor()

if backend is None:
backend = Kafka()
Expand All @@ -30,5 +26,4 @@ def create_engine(
producer_class=producer_class,
serializer=serializer,
deserializer=deserializer,
monitor=monitor,
)
23 changes: 10 additions & 13 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from .backends.kafka import Kafka
from .clients import ConsumerType, ProducerType
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .prometheus.monitor import PrometheusMonitor
from .rebalance_listener import MetricsRebalanceListener, RebalanceListener
from .serializers import Deserializer, Serializer
from .streams import Stream, StreamFunc, stream
Expand Down Expand Up @@ -61,7 +60,6 @@ def __init__(
backend: Kafka,
consumer_class: Type[ConsumerType],
producer_class: Type[ProducerType],
monitor: PrometheusMonitor,
title: Optional[str] = None,
deserializer: Optional[Deserializer] = None,
serializer: Optional[Serializer] = None,
Expand All @@ -72,7 +70,6 @@ def __init__(
self.producer_class = producer_class
self.deserializer = deserializer
self.serializer = serializer
self.monitor = monitor
self._producer: Optional[Type[ProducerType]] = None
self._streams: List[Stream] = []

Expand Down Expand Up @@ -123,23 +120,23 @@ async def send(
headers=encoded_headers,
)
metadata: RecordMetadata = await fut
self.monitor.add_topic_partition_offset(
topic, metadata.partition, metadata.offset
)
# self.monitor.add_topic_partition_offset(
# topic, metadata.partition, metadata.offset
# )

return metadata

async def start(self) -> None:
await self.start_producer()
await self.start_streams()

# add the producer and streams to the Monitor
self.monitor.add_producer(self._producer)
self.monitor.add_streams(self._streams)
self.monitor.start()
# # add the producer and streams to the Monitor
# self.monitor.add_producer(self._producer)
# self.monitor.add_streams(self._streams)
# self.monitor.start()

async def stop(self) -> None:
await self.monitor.stop()
# await self.monitor.stop()
await self.stop_producer()
await self.stop_streams()

Expand Down Expand Up @@ -203,8 +200,8 @@ def add_stream(self, stream: Stream) -> None:

async def remove_stream(self, stream: Stream) -> None:
self._streams.remove(stream)
await stream.stop()
self.monitor.clean_stream_consumer_metrics(stream)
del stream
# self.monitor.clean_stream_consumer_metrics(stream)

def stream(
self,
Expand Down
239 changes: 0 additions & 239 deletions kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
@@ -1,239 +0,0 @@
import asyncio
import logging
from typing import DefaultDict, Dict, List, Optional, TypeVar

from prometheus_client import Gauge

from kstreams import TopicPartition
from kstreams.clients import ConsumerType
from kstreams.streams import Stream

logger = logging.getLogger(__name__)

PrometheusMonitorType = TypeVar("PrometheusMonitorType", bound="PrometheusMonitor")
MetricsType = Dict[TopicPartition, Dict[str, Optional[int]]]


class PrometheusMonitor:
"""
Metrics monitor to keep track of Producers and Consumers.
Attributes:
metrics_scrape_time float: Amount of seconds that the monitor
will wait until next scrape iteration
"""

# Producer metrics
MET_OFFSETS = Gauge(
"topic_partition_offsets", "help producer offsets", ["topic", "partition"]
)

# Consumer metrics
MET_COMMITTED = Gauge(
"consumer_committed",
"help consumer committed",
["topic", "partition", "consumer_group"],
)
MET_POSITION = Gauge(
"consumer_position",
"help consumer position",
["topic", "partition", "consumer_group"],
)
MET_HIGHWATER = Gauge(
"consumer_highwater",
"help consumer highwater",
["topic", "partition", "consumer_group"],
)
MET_LAG = Gauge(
"consumer_lag",
"help consumer lag calculated using the last commited offset",
["topic", "partition", "consumer_group"],
)
MET_POSITION_LAG = Gauge(
"position_lag",
"help consumer position lag calculated using the consumer position",
["topic", "partition", "consumer_group"],
)

def __init__(self, metrics_scrape_time: float = 3):
self.metrics_scrape_time = metrics_scrape_time
self.running = False
self._producer = None
self._streams: List[Stream] = []
self._task: Optional[asyncio.Task] = None

def start(self) -> None:
logger.info("Starting Prometheus metrics...")
self.running = True
self._task = asyncio.create_task(self._metrics_task())

async def stop(self) -> None:
logger.info("Stoping Prometheus metrics...")
self.running = False

if self._task is not None:
# we need to make sure that the task is `done`
# to clean up properly
while not self._task.done():
await asyncio.sleep(0.1)

self._clean_consumer_metrics()

def add_topic_partition_offset(
self, topic: str, partition: int, offset: int
) -> None:
self.MET_OFFSETS.labels(topic=topic, partition=partition).set(offset)

def _add_consumer_metrics(self, metrics_dict: MetricsType):
for topic_partition, partitions_metadata in metrics_dict.items():
group_id = partitions_metadata["group_id"]
position = partitions_metadata["position"]
committed = partitions_metadata["committed"]
highwater = partitions_metadata["highwater"]
lag = partitions_metadata["lag"]
position_lag = partitions_metadata["position_lag"]

self.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(committed or 0)
self.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(position or -1)
self.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(highwater or 0)
self.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(lag or 0)
self.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(position_lag or 0)

def _clean_consumer_metrics(self) -> None:
"""
This method should be called when a rebalance takes place
to clean all consumers metrics. When the rebalance finishes
new metrics will be generated per consumer based on the
consumer assigments
"""
self.MET_LAG.clear()
self.MET_POSITION_LAG.clear()
self.MET_COMMITTED.clear()
self.MET_POSITION.clear()
self.MET_HIGHWATER.clear()

def clean_stream_consumer_metrics(self, stream: Stream) -> None:
if stream.consumer is not None:
topic_partitions = stream.consumer.assignment()
group_id = stream.consumer._group_id
for topic_partition in topic_partitions:
topic = topic_partition.topic
partition = topic_partition.partition

metrics_found = False
for sample in self.MET_LAG.collect()[0].samples:
if {
"topic": topic,
"partition": str(partition),
"consumer_group": group_id,
} == sample.labels:
metrics_found = True

if metrics_found:
self.MET_LAG.remove(topic, partition, group_id)
self.MET_POSITION_LAG.remove(topic, partition, group_id)
self.MET_COMMITTED.remove(topic, partition, group_id)
self.MET_POSITION.remove(topic, partition, group_id)
self.MET_HIGHWATER.remove(topic, partition, group_id)
else:
logger.debug(f"Metrics for stream: {stream.name} not found")

def add_producer(self, producer):
self._producer = producer

def add_streams(self, streams):
self._streams = streams

async def generate_consumer_metrics(self, consumer: ConsumerType):
"""
Generate Consumer Metrics for Prometheus
Format:
{
"topic-1": {
"1": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
},
...
"topic-n": {
"1": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
}
}
"""
metrics: MetricsType = DefaultDict(dict)

topic_partitions = consumer.assignment()

for topic_partition in topic_partitions:
committed = await consumer.committed(topic_partition) or 0
position = await consumer.position(topic_partition)
highwater = consumer.highwater(topic_partition)

lag = position_lag = None
if highwater:
lag = highwater - committed
position_lag = highwater - position

metrics[topic_partition] = {
"group_id": consumer._group_id,
"committed": committed,
"position": position,
"highwater": highwater,
"lag": lag,
"position_lag": position_lag,
}

self._add_consumer_metrics(metrics)

async def _metrics_task(self) -> None:
"""
Asyncio Task that runs in `backgroud` to generate
consumer metrics.
When self.running is False the task will finish and it
will be safe to stop consumers and producers.
"""
while self.running:
await asyncio.sleep(self.metrics_scrape_time)
for stream in self._streams:
if stream.consumer is not None:
try:
await self.generate_consumer_metrics(stream.consumer)
except RuntimeError:
logger.debug(
f"Metrics for stream {stream.name} can not be generated "
"probably because it has been removed"
)
8 changes: 4 additions & 4 deletions kstreams/rebalance_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
to the consumer on the last rebalance
"""
# lock all asyncio Tasks so no new metrics will be added to the Monitor
if revoked and self.engine is not None:
async with asyncio.Lock():
await self.engine.monitor.stop()
# if revoked and self.engine is not None:
# async with asyncio.Lock():
# await self.engine.monitor.stop()

async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:
"""
Expand All @@ -128,7 +128,7 @@ async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:
# lock all asyncio Tasks so no new metrics will be added to the Monitor
if assigned and self.engine is not None:
async with asyncio.Lock():
self.engine.monitor.start()
# self.engine.monitor.start()

stream = self.stream
if stream is not None:
Expand Down
2 changes: 0 additions & 2 deletions kstreams/test_utils/__init__.py

This file was deleted.

8 changes: 0 additions & 8 deletions kstreams/test_utils/structs.py

This file was deleted.

Loading

0 comments on commit 9a8f65a

Please sign in to comment.