Skip to content

Commit

Permalink
Merge branch 'main' into metrics-multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 authored Jun 27, 2023
2 parents b2bee19 + cf47315 commit 28ddb14
Show file tree
Hide file tree
Showing 19 changed files with 1,148 additions and 1,263 deletions.
13 changes: 13 additions & 0 deletions pkg/reconciler/vertex/scaling/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type options struct {
taskInterval int
// Threshold of considering there's back pressure, a float value less than 1.
backPressureThreshold float64
// size of the daemon clients cache.
clientsCacheSize int
}

type Option func(*options)
Expand All @@ -32,23 +34,34 @@ func defaultOptions() *options {
workers: 20,
taskInterval: 30000,
backPressureThreshold: 0.9,
clientsCacheSize: 100,
}
}

// WithWorkers sets the number of workers working on autoscaling.
func WithWorkers(n int) Option {
return func(o *options) {
o.workers = n
}
}

// WithTaskInterval sets the interval of picking up a task from the work queue.
func WithTaskInterval(n int) Option {
return func(o *options) {
o.taskInterval = n
}
}

// WithBackPressureThreshold sets the threshold of considering there's back pressure, a float value less than 1.
func WithBackPressureThreshold(n float64) Option {
return func(o *options) {
o.backPressureThreshold = n
}
}

// WithClientsCacheSize sets the size of the daemon clients cache.
func WithClientsCacheSize(n int) Option {
return func(o *options) {
o.clientsCacheSize = n
}
}
131 changes: 76 additions & 55 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,30 @@ type Scaler struct {
options *options
// Cache to store the vertex metrics such as pending message number
vertexMetricsCache *lru.Cache
daemonClientsCache *lru.Cache
}

// NewScaler returns a Scaler instance.
func NewScaler(client client.Client, opts ...Option) *Scaler {
scalerOpts := defaultOptions()
for _, opt := range opts {
if opt != nil {
opt(scalerOpts)
}
}
s := &Scaler{
client: client,
options: defaultOptions(),
options: scalerOpts,
vertexMap: make(map[string]*list.Element),
vertexList: list.New(),
lock: new(sync.RWMutex),
}
// cache top 100 daemon clients
s.daemonClientsCache, _ = lru.NewWithEvict(s.options.clientsCacheSize, func(key, value interface{}) {
_ = value.(*daemonclient.DaemonClient).Close()
})
vertexMetricsCache, _ := lru.New(10000)
s.vertexMetricsCache = vertexMetricsCache
for _, opt := range opts {
if opt != nil {
opt(s.options)
}
}
return s
}

Expand Down Expand Up @@ -202,50 +208,66 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
return nil
}
}
// TODO: cache it.
dClient, err := daemonclient.NewDaemonServiceClient(pl.GetDaemonServiceURL())
if err != nil {
return fmt.Errorf("failed to get daemon service client for pipeline %s, %w", pl.Name, err)
var err error
dClient, _ := s.daemonClientsCache.Get(pl.GetDaemonServiceURL())
if dClient == nil {
dClient, err = daemonclient.NewDaemonServiceClient(pl.GetDaemonServiceURL())
if err != nil {
return fmt.Errorf("failed to get daemon service client for pipeline %s, %w", pl.Name, err)
}
s.daemonClientsCache.Add(pl.GetDaemonServiceURL(), dClient)
}
defer func() {
_ = dClient.Close()
}()
vMetrics, err := dClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)

daemonClient := dClient.(daemonclient.DaemonClient)
vMetrics, err := daemonClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
}
// Avg rate and pending for autoscaling are both in the map with key "default", see "pkg/metrics/metrics.go".
rate, existing := vMetrics[0].ProcessingRates["default"]
if !existing || rate < 0 { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
}
pending, existing := vMetrics[0].Pendings["default"]
if !existing || pending < 0 || pending == isb.PendingNotAvailable {
// Pending not available, we don't do anything
log.Debugf("Vertex %s has no pending messages information, skip scaling", vertex.Name)
return nil
// vMetrics is a map which contains metrics of all the partitions of a vertex.
// We need to aggregate them to get the total rate and pending of the vertex.
// If any of the partition doesn't have the rate or pending information, we skip scaling.
totalRate := float64(0)
totalPending := int64(0)
for _, m := range vMetrics {
rate, existing := m.ProcessingRates["default"]
if !existing || rate < 0 { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
}
totalRate += rate

pending, existing := m.Pendings["default"]
if !existing || pending < 0 || pending == isb.PendingNotAvailable {
// Pending not available, we don't do anything
log.Debugf("Vertex %s has no pending messages information, skip scaling", vertex.Name)
return nil
}
totalPending += pending
}
// Add to cache for back pressure calculation
_ = s.vertexMetricsCache.Add(key+"/pending", pending)

// Add pending information to cache for back pressure calculation
_ = s.vertexMetricsCache.Add(key+"/pending", totalPending)
totalBufferLength := int64(0)
targetAvailableBufferLength := int64(0)
if !vertex.IsASource() { // Only non-source vertex has buffer to read
bufferName := vertex.OwnedBuffers()[0]
if bInfo, err := dClient.GetPipelineBuffer(ctx, pl.Name, bufferName); err != nil {
return fmt.Errorf("failed to get the read buffer information of vertex %q, %w", vertex.Name, err)
} else {
if bInfo.BufferLength == nil || bInfo.BufferUsageLimit == nil {
return fmt.Errorf("invalid read buffer information of vertex %q, length or usage limit is missing", vertex.Name)
for _, bufferName := range vertex.OwnedBuffers() {
if bInfo, err := daemonClient.GetPipelineBuffer(ctx, pl.Name, bufferName); err != nil {
return fmt.Errorf("failed to get the read buffer information of vertex %q, %w", vertex.Name, err)
} else {
if bInfo.BufferLength == nil || bInfo.BufferUsageLimit == nil {
return fmt.Errorf("invalid read buffer information of vertex %q, length or usage limit is missing", vertex.Name)
}
totalBufferLength += int64(float64(*bInfo.BufferLength) * *bInfo.BufferUsageLimit)
targetAvailableBufferLength += int64(float64(*bInfo.BufferLength) * float64(vertex.Spec.Scale.GetTargetBufferAvailability()) / 100)
// Add to cache for back pressure calculation
}
totalBufferLength = int64(float64(*bInfo.BufferLength) * *bInfo.BufferUsageLimit)
targetAvailableBufferLength = int64(float64(*bInfo.BufferLength) * float64(vertex.Spec.Scale.GetTargetBufferAvailability()) / 100)
// Add to cache for back pressure calculation
_ = s.vertexMetricsCache.Add(*bInfo.BufferName+"/length", totalBufferLength)
}
// Add processing rate information to cache for back pressure calculation
_ = s.vertexMetricsCache.Add(key+"/length", totalBufferLength)
}
current := int32(vertex.GetReplicas())
desired := s.desiredReplicas(ctx, vertex, rate, pending, totalBufferLength, targetAvailableBufferLength)
desired := s.desiredReplicas(ctx, vertex, totalRate, totalPending, totalBufferLength, targetAvailableBufferLength)
log.Debugf("Calculated desired replica number of vertex %q is: %t", vertex.Name, desired)
max := vertex.Spec.Scale.GetMaxReplicas()
min := vertex.Spec.Scale.GetMinReplicas()
Expand Down Expand Up @@ -362,6 +384,8 @@ func (s *Scaler) Start(ctx context.Context) error {
select {
case <-ctx.Done():
log.Info("Shutting down scaling job assigner")
// clear the daemon clients cache
s.daemonClientsCache.Purge()
return nil
default:
assign()
Expand Down Expand Up @@ -391,24 +415,21 @@ func (s *Scaler) hasBackPressure(pl dfv1.Pipeline, vertex dfv1.Vertex) (bool, bo
loop:
for _, e := range downstreamEdges {
vertexKey := pl.Namespace + "/" + pl.Name + "-" + e.To
bufferNames := dfv1.GenerateBufferNames(pl.Namespace, pl.Name, e.To, pl.NumOfPartitions(e.To))
for _, bufferName := range bufferNames {
pendingVal, ok := s.vertexMetricsCache.Get(vertexKey + "/pending")
if !ok { // Vertex key has not been cached, skip it.
continue
}
pending := pendingVal.(int64)
bufferLengthVal, ok := s.vertexMetricsCache.Get(bufferName + "/length")
if !ok { // Buffer length has not been cached, skip it.
continue
}
length := bufferLengthVal.(int64)
if float64(pending)/float64(length) >= s.options.backPressureThreshold {
downstreamPressure = true
if e.From == vertex.Spec.Name {
directPressure = true
break loop
}
pendingVal, ok := s.vertexMetricsCache.Get(vertexKey + "/pending")
if !ok { // Vertex key has not been cached, skip it.
continue
}
pending := pendingVal.(int64)
bufferLengthVal, ok := s.vertexMetricsCache.Get(vertexKey + "/length")
if !ok { // Buffer length has not been cached, skip it.
continue
}
length := bufferLengthVal.(int64)
if float64(pending)/float64(length) >= s.options.backPressureThreshold {
downstreamPressure = true
if e.From == vertex.Spec.Name {
directPressure = true
break loop
}
}
}
Expand Down
1 change: 1 addition & 0 deletions ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"@types/d3-selection": "^3.0.2",
"@types/dagre": "^0.7.47",
"@types/jest": "^27.0.1",
"@types/lodash": "^4.14.195",
"@types/node": "^16.7.13",
"@types/react": "^18.0.0",
"@types/react-dom": "^18.0.0",
Expand Down
2 changes: 1 addition & 1 deletion ui/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import ListItemIcon from "@mui/material/ListItemIcon";
import ListItemText from "@mui/material/ListItemText";
import { Breadcrumbs } from "./components/common/Breadcrumbs";
import { Namespaces } from "./components/pages/Namespace";
import { Pipeline } from "./components/pipeline/Pipeline";
import { Pipeline } from "./components/pages/Pipeline";
import logo from "./images/icon-on-blue-bg.png";
import "./App.css";
import { Slide, ToastContainer } from "react-toastify";
Expand Down
Loading

0 comments on commit 28ddb14

Please sign in to comment.