Skip to content

Commit

Permalink
chore: implement APIs for mono vertex UI server (#1931)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
  • Loading branch information
KeranYang and yhl25 committed Aug 13, 2024
1 parent 97f9428 commit e1fba01
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 51 deletions.
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ const (
PipelineStatusDeleting = "deleting"
PipelineStatusUnhealthy = "unhealthy"

// MonoVertex health status
// TODO - more statuses to be added
MonoVertexStatusHealthy = "healthy"

// Callback annotation keys
CallbackEnabledKey = "numaflow.numaproj.io/callback"
CallbackURLKey = "numaflow.numaproj.io/callback-url"
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ var (
Help: "A metric indicates the replicas of a Redis ISB Service",
}, []string{metrics.LabelNamespace, metrics.LabelISBService})

// VertexDisiredReplicas indicates the desired replicas of a Vertex.
VertexDisiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
// VertexDesiredReplicas indicates the desired replicas of a Vertex.
VertexDesiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "controller",
Name: "vertex_desired_replicas",
Help: "A metric indicates the desired replicas of a Vertex",
Expand All @@ -75,5 +75,5 @@ var (
)

func init() {
ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas, VertexDisiredReplicas, VertexCurrentReplicas)
ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas, VertexDesiredReplicas, VertexCurrentReplicas)
}
4 changes: 2 additions & 2 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
// Clean up metrics
_ = reconciler.PipelineHealth.DeleteLabelValues(pl.Namespace, pl.Name)
// Delete corresponding vertex metrics
_ = reconciler.VertexDisiredReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
_ = reconciler.VertexDesiredReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
_ = reconciler.VertexCurrentReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
}
return ctrl.Result{}, nil
Expand Down Expand Up @@ -292,7 +292,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
log.Infow("Deleted stale vertex successfully", zap.String("vertex", v.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "DeleteStaleVertexSuccess", "Deleted stale vertex %s successfully", v.Name)
// Clean up vertex replica metrics
reconciler.VertexDisiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexDesiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexCurrentReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
desiredReplicas := vertex.GetReplicas()
// Set metrics
defer func() {
reconciler.VertexDisiredReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(desiredReplicas))
reconciler.VertexDesiredReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(desiredReplicas))
reconciler.VertexCurrentReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(vertex.Status.Replicas))
}()

Expand Down
4 changes: 4 additions & 0 deletions server/apis/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ type Handler interface {
PodLogs(c *gin.Context)
GetNamespaceEvents(c *gin.Context)
GetPipelineStatus(c *gin.Context)
ListMonoVertices(c *gin.Context)
GetMonoVertex(c *gin.Context)
ListMonoVertexPods(c *gin.Context)
CreateMonoVertex(c *gin.Context)
}
193 changes: 164 additions & 29 deletions server/apis/v1/handler.go

Large diffs are not rendered by default.

21 changes: 18 additions & 3 deletions server/apis/v1/response_cluster_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ func (is *IsbServiceSummary) hasIsbService() bool {
return is.Inactive > 0 || !is.Active.isEmpty()
}

// MonoVertexSummary summarizes the number of active and inactive mono vertices.
type MonoVertexSummary struct {
Active ActiveStatus `json:"active"`
Inactive int `json:"inactive"`
}

func (mvs *MonoVertexSummary) hasMonoVertex() bool {
return mvs.Inactive > 0 || !mvs.Active.isEmpty()
}

// ClusterSummaryResponse is a list of NamespaceSummary
// of all the namespaces in a cluster wrapped in a list.
type ClusterSummaryResponse []NamespaceSummary
Expand All @@ -72,15 +82,20 @@ type NamespaceSummary struct {
Namespace string `json:"namespace"`
PipelineSummary PipelineSummary `json:"pipelineSummary"`
IsbServiceSummary IsbServiceSummary `json:"isbServiceSummary"`
MonoVertexSummary MonoVertexSummary `json:"monoVertexSummary"`
}

// NewNamespaceSummary creates a new NamespaceSummary object with the given specifications.
func NewNamespaceSummary(namespace string, pipelineSummary PipelineSummary,
isbSummary IsbServiceSummary) NamespaceSummary {
func NewNamespaceSummary(
namespace string,
pipelineSummary PipelineSummary,
isbSummary IsbServiceSummary,
monoVertexSummary MonoVertexSummary) NamespaceSummary {
return NamespaceSummary{
IsEmpty: !(pipelineSummary.hasPipeline() || isbSummary.hasIsbService()),
IsEmpty: !(pipelineSummary.hasPipeline() || isbSummary.hasIsbService() || monoVertexSummary.hasMonoVertex()),
Namespace: namespace,
PipelineSummary: pipelineSummary,
IsbServiceSummary: isbSummary,
MonoVertexSummary: monoVertexSummary,
}
}
39 changes: 39 additions & 0 deletions server/apis/v1/response_mono_vertex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"

// MonoVertices is a list of mono vertices
type MonoVertices []MonoVertexInfo

type MonoVertexInfo struct {
Name string `json:"name"`
// Status shows whether the mono vertex is healthy, warning, critical or inactive.
Status string `json:"status"`
// MonoVertex contains the detailed mono vertex spec.
MonoVertex v1alpha1.MonoVertex `json:"monoVertex"`
}

// NewMonoVertexInfo creates a new MonoVertexInfo object with the given status and lag
func NewMonoVertexInfo(status string, mvt *v1alpha1.MonoVertex) MonoVertexInfo {
return MonoVertexInfo{
Name: mvt.Name,
Status: status,
MonoVertex: *mvt,
}
}
2 changes: 1 addition & 1 deletion server/apis/v1/response_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type PipelineInfo struct {
Pipeline v1alpha1.Pipeline `json:"pipeline"`
}

// NewPipelineInfo creates a new PipelineInfo object with the given status
// NewPipelineInfo creates a new PipelineInfo object with the given status and lag
func NewPipelineInfo(status string, lag *int64, pl *v1alpha1.Pipeline) PipelineInfo {
return PipelineInfo{
Name: pl.Name,
Expand Down
11 changes: 6 additions & 5 deletions server/authz/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ const (
// PolicyMapPath is the path to the policy map.
policyMapPath = "/etc/numaflow/rbac-policy.csv"

// rbacPropertiesPath is the path to the rbac properties file. It includes configuraion for authorization like
// rbacPropertiesPath is the path to the rbac properties file. It includes configuration for authorization like
// scope, default policy etc.
rbacPropertiesPath = "/etc/numaflow/rbac-conf.yaml"

// Objects for the RBAC policy
ObjectAll = "*"
ObjectPipeline = "pipeline"
ObjectISBSvc = "isbsvc"
ObjectEvents = "events"
ObjectAll = "*"
ObjectPipeline = "pipeline"
ObjectMonoVertex = "mono-vertex"
ObjectISBSvc = "isbsvc"
ObjectEvents = "events"

// Resouces for the RBAC policy
ResourceAll = "*"
Expand Down
4 changes: 4 additions & 0 deletions server/cmd/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,9 @@ func CreateAuthRouteMap(baseHref string) authz.RouteMap {
"GET:" + baseHref + "api/v1/metrics/namespaces/:namespace/pods": authz.NewRouteInfo(authz.ObjectPipeline, true),
"GET:" + baseHref + "api/v1/namespaces/:namespace/pods/:pod/logs": authz.NewRouteInfo(authz.ObjectPipeline, true),
"GET:" + baseHref + "api/v1/namespaces/:namespace/events": authz.NewRouteInfo(authz.ObjectEvents, true),
"GET:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices": authz.NewRouteInfo(authz.ObjectMonoVertex, true),
"GET:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices/:mono-vertex": authz.NewRouteInfo(authz.ObjectMonoVertex, true),
"GET:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices/:mono-vertex/pods": authz.NewRouteInfo(authz.ObjectMonoVertex, true),
"POST:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices": authz.NewRouteInfo(authz.ObjectMonoVertex, true),
}
}
4 changes: 2 additions & 2 deletions server/cmd/server/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
func TestCreateAuthRouteMap(t *testing.T) {
t.Run("empty base", func(t *testing.T) {
got := CreateAuthRouteMap("")
assert.Equal(t, 24, len(got))
assert.Equal(t, 28, len(got))
})

t.Run("customize base", func(t *testing.T) {
got := CreateAuthRouteMap("abcdefg")
assert.Equal(t, 24, len(got))
assert.Equal(t, 28, len(got))
for k := range got {
assert.Contains(t, k, "abcdefg")
}
Expand Down
12 changes: 10 additions & 2 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse
r.GET("/cluster-summary", handler.GetClusterSummary)
// Create a Pipeline.
r.POST("/namespaces/:namespace/pipelines", handler.CreatePipeline)
// All pipelines for a given namespace.
// List all pipelines for a given namespace.
r.GET("/namespaces/:namespace/pipelines", handler.ListPipelines)
// Get a Pipeline information.
// Get the pipeline information.
r.GET("/namespaces/:namespace/pipelines/:pipeline", handler.GetPipeline)
// Get a Pipeline health information.
r.GET("/namespaces/:namespace/pipelines/:pipeline/health", handler.GetPipelineStatus)
Expand Down Expand Up @@ -153,6 +153,14 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse
r.GET("/namespaces/:namespace/pods/:pod/logs", handler.PodLogs)
// List of the Kubernetes events of a namespace.
r.GET("/namespaces/:namespace/events", handler.GetNamespaceEvents)
// List all mono vertices for a given namespace.
r.GET("/namespaces/:namespace/mono-vertices", handler.ListMonoVertices)
// Get the mono vertex information.
r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex", handler.GetMonoVertex)
// Get all the pods of a mono vertex.
r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex/pods", handler.ListMonoVertexPods)
// Create a mono vertex.
r.POST("/namespaces/:namespace/mono-vertices", handler.CreateMonoVertex)
}

// authMiddleware is the middleware for AuthN/AuthZ.
Expand Down
21 changes: 18 additions & 3 deletions test/api-e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *APISuite) TestISBSVCReplica1() {
assert.Contains(s.T(), deleteISBSVC, deleteISBSVCSuccessExpect)
}

func (s *APISuite) TestPipeline0() {
func (s *APISuite) TestAPIsForIsbAndPipelineAndMonoVertex() {
defer s.Given().When().UXServerPodPortForward(8145, 8443).TerminateAllPodPortForwards()

namespaceBody := HTTPExpect(s.T(), "https://localhost:8145").GET("/api/v1/namespaces").
Expand Down Expand Up @@ -158,10 +158,20 @@ func (s *APISuite) TestPipeline0() {
Status(200).Body().Raw()
assert.Contains(s.T(), resumePipeline1, patchPipelineSuccessExpect)

// create a mono vertex
var mv1 v1alpha1.MonoVertex
err = json.Unmarshal(testMonoVertex1, &mv1)
assert.NoError(s.T(), err)
createMonoVertex := HTTPExpect(s.T(), "https://localhost:8145").POST(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices", Namespace)).WithJSON(mv1).
Expect().
Status(200).Body().Raw()
var createMonoVertexSuccessExpect = `"data":null`
assert.Contains(s.T(), createMonoVertex, createMonoVertexSuccessExpect)

clusterSummaryBody := HTTPExpect(s.T(), "https://localhost:8145").GET("/api/v1/cluster-summary").
Expect().
Status(200).Body().Raw()
var clusterSummaryExpect = `{"isEmpty":false,"namespace":"numaflow-system","pipelineSummary":{"active":{"Healthy":2,"Warning":0,"Critical":0},"inactive":0},"isbServiceSummary":{"active":{"Healthy":1,"Warning":0,"Critical":0},"inactive":0}}`
var clusterSummaryExpect = `{"isEmpty":false,"namespace":"numaflow-system","pipelineSummary":{"active":{"Healthy":2,"Warning":0,"Critical":0},"inactive":0},"isbServiceSummary":{"active":{"Healthy":1,"Warning":0,"Critical":0},"inactive":0},"monoVertexSummary":{"active":{"Healthy":1,"Warning":0,"Critical":0},"inactive":0}}`
assert.Contains(s.T(), clusterSummaryBody, clusterSummaryExpect)

listPipelineBody := HTTPExpect(s.T(), "https://localhost:8145").GET(fmt.Sprintf("/api/v1/namespaces/%s/pipelines", Namespace)).
Expand All @@ -179,9 +189,14 @@ func (s *APISuite) TestPipeline0() {
var deletePipelineSuccessExpect = `"data":null`
assert.Contains(s.T(), deletePipeline1, deletePipelineSuccessExpect)
assert.Contains(s.T(), deletePipeline2, deletePipelineSuccessExpect)

listMonoVertexBody := HTTPExpect(s.T(), "https://localhost:8145").GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices", Namespace)).
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), listMonoVertexBody, testMonoVertex1Name)
}

func (s *APISuite) TestPipeline1() {
func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPods() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

Expand Down
30 changes: 30 additions & 0 deletions test/api-e2e/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,34 @@ var (
}
}
`)
testMonoVertex1Name = "test-mono-vertex-1"
testMonoVertex1 = []byte(`
{
"apiVersion": "numaflow.numaproj.io/v1alpha1",
"kind": "MonoVertex",
"metadata": {
"name": "test-mono-vertex-1"
},
"spec": {
"source": {
"udsource": {
"container": {
"image": "quay.io/numaio/numaflow-java/source-simple-source:stable"
}
},
"transformer": {
"container": {
"image": "quay.io/numaio/numaflow-rs/source-transformer-now:stable"
}
}
},
"sink": {
"udsink": {
"container": {
"image": "quay.io/numaio/numaflow-java/simple-sink:stable"
}
}
}
}
}`)
)

0 comments on commit e1fba01

Please sign in to comment.