Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jun 23, 2023
1 parent b0265ac commit 6f7d536
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 75 deletions.
6 changes: 3 additions & 3 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
//}

metricsArr := make([]*daemon.VertexMetrics, metricsCount)
for i := 0; i < metricsCount; i++ {
for i, partitionName := range abstractVertex.OwnedBufferNames(ps.pipeline.Namespace, req.GetPipeline()) {
// We can query the metrics endpoint of the (i)th pod to obtain this value.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics
//url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
Expand Down Expand Up @@ -218,11 +218,11 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
// Get the processing rate for this partition
if abstractVertex.IsReduceUDF() {
// the processing rate of this ith partition is the rate of the corresponding ith pod.
vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i)
vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName)
} else {
// if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex.
// TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices.
vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), i)
vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName)
}

metricsArr[i] = vm
Expand Down
24 changes: 11 additions & 13 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ import (
const IndexNotFound = -1

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count []float64) {
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, partitionReadCounts []PartitionReadCount) {
items := q.Items()

// find the element matching the input timestamp and update it
for _, i := range items {
if i.timestamp == time {
i.Update(podName, count)
i.Update(podName, partitionReadCounts)
return
}
}

// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
// if we cannot find a matching element, it means we need to add a new timestamped partitionReadCounts to the queue
tc := NewTimestampedCounts(time)
tc.Update(podName, count)
tc.Update(podName, partitionReadCounts)

// close the window for the most recent timestamped count
// close the window for the most recent timestamped partitionReadCounts
switch n := len(items); n {
case 0:
// if the queue is empty, we just append the new timestamped count
// if the queue is empty, we just append the new timestamped partitionReadCounts
case 1:
// if the queue has only one element, we close the window for this element
items[0].CloseWindow(nil)
Expand All @@ -55,7 +55,7 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p
}

// CalculateRate calculates the rate of the vertex in the last lookback seconds
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionID int) float64 {
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
Expand All @@ -75,18 +75,16 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
return 0
}
for i := startIndex; i < endIndex; i++ {
if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() {
delta += calculatePartitionDelta(c1, c2, partitionID)
}
delta += calculatePartitionDelta(counts[i], counts[i+1], partitionName)
}
return delta / float64(timeDiff)
}

func calculatePartitionDelta(c1, c2 *TimestampedCounts, partitionID int) float64 {
func calculatePartitionDelta(c1, c2 *TimestampedCounts, partitionName string) float64 {
tc1 := c1.SnapshotCopy()
tc2 := c2.SnapshotCopy()
count1, exist1 := tc1[partitionID]
count2, exist2 := tc2[partitionID]
count1, exist1 := tc1[partitionName]
count2, exist2 := tc2[partitionName]
if !exist2 {
return 0
} else if !exist1 {
Expand Down
80 changes: 47 additions & 33 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/tls"
"fmt"
"net/http"
"strconv"
"strings"
"time"

Expand All @@ -35,8 +34,7 @@ import (

type Ratable interface {
Start(ctx context.Context) error
GetRates(vertexName string, partitionID int) map[string]float64
GetPodRates(vertexName string, podIndex int) map[string]float64
GetRates(vertexName string, partitionName string) map[string]float64
}

// CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds
Expand Down Expand Up @@ -65,6 +63,8 @@ type Rater struct {
options *options
}

// vertex -> [timestamp(podCounts{podName: count}, partitionCounts{partitionIdx: count}, isWindowClosed, delta(across all the pods))]

func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater {
rater := Rater{
pipeline: p,
Expand Down Expand Up @@ -120,19 +120,19 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error
vertexName := podInfo[1]
vertexType := podInfo[3]
podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-")
var count []float64
var partitionReadCounts []PartitionReadCount
activePods := r.podTracker.GetActivePods()
if activePods.Contains(key) {
count = r.getTotalCount(vertexName, vertexType, podName)
if count[0] == CountNotAvailable {
log.Debugf("Failed retrieving total count for pod %s", podName)
partitionReadCounts = r.getPartitionReadCounts(vertexName, vertexType, podName)
if partitionReadCounts == nil {
log.Debugf("Failed retrieving total partitionReadCounts for pod %s", podName)
}
} else {
log.Debugf("Pod %s does not exist, updating it with CountNotAvailable...", podName)
count = []float64{CountNotAvailable}
partitionReadCounts = nil
}
now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix()
UpdateCount(r.timestampedPodCounts[vertexName], now, podName, count)
UpdateCount(r.timestampedPodCounts[vertexName], now, podName, partitionReadCounts)
return nil
}

Expand Down Expand Up @@ -199,19 +199,38 @@ func sleep(ctx context.Context, duration time.Duration) {
}
}

// getTotalCount returns the total number of messages read by the pod
func (r *Rater) getTotalCount(vertexName, vertexType, podName string) []float64 {
type PartitionReadCount struct {
name string
value float64
}

func (p *PartitionReadCount) Name() string {
return p.name
}

func (p *PartitionReadCount) Value() float64 {
return p.value
}

// pod1
// forwarder_read_total p1 100
// forwarder_read_total p2 110
// forwarder_read_total p3 110

// [100, 110, 110]
// getPartitionReadCounts returns the total number of messages read by the pod
func (r *Rater) getPartitionReadCounts(vertexName, vertexType, podName string) []PartitionReadCount {
// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := r.httpClient.Get(url); err != nil {
r.log.Errorf("failed reading the metrics endpoint, %v", err.Error())
return []float64{CountNotAvailable}
return nil
} else {
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return []float64{CountNotAvailable}
return nil
}
var readTotalMetricName string
if vertexType == "reduce" {
Expand All @@ -221,38 +240,33 @@ func (r *Rater) getTotalCount(vertexName, vertexType, podName string) []float64
}
if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
//fmt.Println(metricsList)
//fmt.Println(metricsList[0].Counter.GetValue(), "- ", metricsList[0].Label[0].GetValue())
var partitionCount []float64
var partitionReadCounts []PartitionReadCount
for _, ele := range metricsList {
partitionCount = append(partitionCount, ele.Counter.GetValue())
partitionName := ""
for _, label := range ele.Label {
if label.GetName() == "partition_name" {
partitionName = label.GetValue()
}
}
partitionReadCounts = append(partitionReadCounts, PartitionReadCount{
name: partitionName,
value: ele.Counter.GetValue(),
})
}
return partitionCount
return partitionReadCounts
} else {
r.log.Errorf("failed getting the read total metric, the metric is not available.")
return []float64{CountNotAvailable}
return nil
}
}
}

// GetRates returns the processing rates of the vertex in the format of lookback second to rate mappings
func (r *Rater) GetRates(vertexName string, partitionID int) map[string]float64 {
var result = make(map[string]float64)
// calculate rates for each lookback seconds
for n, i := range r.buildLookbackSecondsMap(vertexName) {
r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionID)
result[n] = r
}
return result
}

// GetPodRates returns the processing rates of the pod in the format of lookback second to rate mappings
func (r *Rater) GetPodRates(vertexName string, podIndex int) map[string]float64 {
podName := r.pipeline.Name + "-" + vertexName + "-" + strconv.Itoa(podIndex)
func (r *Rater) GetRates(vertexName string, partitionName string) map[string]float64 {
var result = make(map[string]float64)
// calculate rates for each lookback seconds
for n, i := range r.buildLookbackSecondsMap(vertexName) {
r := CalculatePodRate(r.timestampedPodCounts[vertexName], i, podName)
r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionName)
result[n] = r
}
return result
Expand Down
36 changes: 21 additions & 15 deletions pkg/daemon/server/service/rater/timestamped_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type TimestampedCounts struct {
// podName to count mapping
podCounts map[string]float64
// partition to count mapping
partitionCounts map[int]float64
partitionCounts map[string]float64
// isWindowClosed indicates whether we have finished collecting pod counts for this timestamp
isWindowClosed bool
// delta is the total count change from the previous window, it's valid only when isWindowClosed is true
Expand All @@ -44,36 +44,42 @@ func NewTimestampedCounts(t int64) *TimestampedCounts {
return &TimestampedCounts{
timestamp: t,
podCounts: make(map[string]float64),
partitionCounts: make(map[int]float64),
partitionCounts: make(map[string]float64),
isWindowClosed: false,
delta: 0,
lock: new(sync.RWMutex),
}
}

// Update updates the count for a pod if the current window is not closed
func (tc *TimestampedCounts) Update(podName string, count []float64) {
func (tc *TimestampedCounts) Update(podName string, partitionReadCounts []PartitionReadCount) {
tc.lock.Lock()
defer tc.lock.Unlock()
if count[0] == CountNotAvailable {
// we choose to skip updating when count is not available for the pod, instead of removing the pod from the map.
// imagine if the getTotalCount call fails to scrape the count metric, and it's NOT because the pod is down.
// in this case getTotalCount returns CountNotAvailable.
// if we remove the pod from the map and then the next scrape successfully gets the count, we can reach a state that in the timestamped counts,
// for this single pod, at t1, count is 123456, at t2, the map doesn't contain this pod and t3, count is 123457.
if partitionReadCounts == nil {
// we choose to skip updating when partitionReadCounts is nil, instead of removing the pod from the map.
// imagine if the getPartitionReadCounts call fails to scrape the partitionReadCounts metric, and it's NOT because the pod is down.
// in this case getPartitionReadCounts returns CountNotAvailable.
// if we remove the pod from the map and then the next scrape successfully gets the partitionReadCounts, we can reach a state that in the timestamped counts,
// for this single pod, at t1, partitionReadCounts is 123456, at t2, the map doesn't contain this pod and t3, partitionReadCounts is 123457.
// when calculating the rate, as we sum up deltas among timestamps, we will get 123457 total delta instead of the real delta 1.
// one occurrence of such case can lead to extremely high rate and mess up the autoscaling.
// hence we'd rather keep the count as it is to avoid wrong rate calculation.
// hence we'd rather keep the partitionReadCounts as it is to avoid wrong rate calculation.
return
}
if tc.isWindowClosed {
// we skip updating if the window is already closed.
return
}
for idx, val := range count {
tc.partitionCounts[idx] = val
// since a pod can read from multiple partitions we should consider
// the sum of partitionReadCounts as the total count of processed messages for this pod
// for reduce we will always have one partitionReadCount (since reduce pod will read
// from single partition)
totalPartitionReadCount := 0.0
for _, ele := range partitionReadCounts {
tc.partitionCounts[ele.Name()] = ele.Value()
totalPartitionReadCount += ele.Value()
}
tc.podCounts[podName] = count[0]
tc.podCounts[podName] = totalPartitionReadCount
}

// Snapshot returns a copy of the podName to count mapping
Expand All @@ -88,10 +94,10 @@ func (tc *TimestampedCounts) Snapshot() map[string]float64 {
return counts
}

func (tc *TimestampedCounts) SnapshotCopy() map[int]float64 {
func (tc *TimestampedCounts) SnapshotCopy() map[string]float64 {
tc.lock.RLock()
defer tc.lock.RUnlock()
counts := make(map[int]float64)
counts := make(map[string]float64)
for k, v := range tc.partitionCounts {
counts[k] = v
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/daemon/server/service/rater/timestamped_counts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func TestNewTimestampedCounts(t *testing.T) {

func TestTimestampedCounts_Update(t *testing.T) {
tc := NewTimestampedCounts(1620000000)
tc.Update("pod1", 10.0)
tc.Update("pod1", []float64{10.0})
assert.Equal(t, 10.0, tc.podCounts["pod1"])
tc.Update("pod1", 20.0)
tc.Update("pod1", []float64{20.0})
assert.Equal(t, 20.0, tc.podCounts["pod1"])
tc.Update("pod2", 30.0)
tc.Update("pod2", []float64{30.0})
assert.Equal(t, 30.0, tc.podCounts["pod2"])
assert.Equal(t, 2, len(tc.podCounts))
tc.Update("pod1", CountNotAvailable)
tc.Update("pod1", nil)
assert.Equal(t, 2, len(tc.podCounts))
assert.Equal(t, 20, int(tc.podCounts["pod1"]))
assert.Equal(t, 30, int(tc.podCounts["pod2"]))
Expand All @@ -51,15 +51,15 @@ func TestTimestampedCounts_Update(t *testing.T) {
// (20-0) + (30-0) = 50
assert.Equal(t, 50.0, tc.delta)
// verify that updating pod counts doesn't take effect if the window is already closed
tc.Update("pod1", 10.0)
tc.Update("pod1", []float64{10.0})
assert.Equal(t, 20, int(tc.podCounts["pod1"]))
tc.Update("pod2", 20.0)
tc.Update("pod2", []float64{20.0})
assert.Equal(t, 30, int(tc.podCounts["pod2"]))

tc2 := NewTimestampedCounts(1620000001)
tc2.Update("pod1", 40.0)
tc2.Update("pod1", []float64{40.0})
assert.Equal(t, 40.0, tc2.podCounts["pod1"])
tc2.Update("pod2", 10.0)
tc2.Update("pod2", []float64{10.0})
assert.Equal(t, 10.0, tc2.podCounts["pod2"])
tc2.CloseWindow(tc)
assert.Equal(t, true, tc2.isWindowClosed)
Expand All @@ -69,8 +69,8 @@ func TestTimestampedCounts_Update(t *testing.T) {

func TestTimestampedCounts_Snapshot(t *testing.T) {
tc := NewTimestampedCounts(1620000000)
tc.Update("pod1", 10.0)
tc.Update("pod2", 20.0)
tc.Update("pod3", 30.0)
tc.Update("pod1", []float64{10.0})
tc.Update("pod2", []float64{20.0})
tc.Update("pod3", []float64{30.0})
assert.Equal(t, map[string]float64{"pod1": 10.0, "pod2": 20.0, "pod3": 30.0}, tc.Snapshot())
}

0 comments on commit 6f7d536

Please sign in to comment.