diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index 19a3274cef..a1a8e518fa 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -228,6 +228,10 @@ const ( PipelineStatusDeleting = "deleting" PipelineStatusUnhealthy = "unhealthy" + // MonoVertex health status + // TODO - more statuses to be added + MonoVertexStatusHealthy = "healthy" + // Callback annotation keys CallbackEnabledKey = "numaflow.numaproj.io/callback" CallbackURLKey = "numaflow.numaproj.io/callback-url" diff --git a/pkg/reconciler/metrics.go b/pkg/reconciler/metrics.go index 37d4cc5148..5f92049f2d 100644 --- a/pkg/reconciler/metrics.go +++ b/pkg/reconciler/metrics.go @@ -59,8 +59,8 @@ var ( Help: "A metric indicates the replicas of a Redis ISB Service", }, []string{metrics.LabelNamespace, metrics.LabelISBService}) - // VertexDisiredReplicas indicates the desired replicas of a Vertex. - VertexDisiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + // VertexDesiredReplicas indicates the desired replicas of a Vertex. + VertexDesiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "controller", Name: "vertex_desired_replicas", Help: "A metric indicates the desired replicas of a Vertex", @@ -75,5 +75,5 @@ var ( ) func init() { - ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas, VertexDisiredReplicas, VertexCurrentReplicas) + ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas, VertexDesiredReplicas, VertexCurrentReplicas) } diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index c9d824e941..0be76a8d0b 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -129,7 +129,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) ( // Clean up metrics _ = reconciler.PipelineHealth.DeleteLabelValues(pl.Namespace, pl.Name) // Delete corresponding vertex metrics - _ = reconciler.VertexDisiredReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name}) + _ = reconciler.VertexDesiredReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name}) _ = reconciler.VertexCurrentReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name}) } return ctrl.Result{}, nil @@ -292,7 +292,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p log.Infow("Deleted stale vertex successfully", zap.String("vertex", v.Name)) r.recorder.Eventf(pl, corev1.EventTypeNormal, "DeleteStaleVertexSuccess", "Deleted stale vertex %s successfully", v.Name) // Clean up vertex replica metrics - reconciler.VertexDisiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name) + reconciler.VertexDesiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name) reconciler.VertexCurrentReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name) } diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index b88b3b94f8..f9e82436d1 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -126,7 +126,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( desiredReplicas := vertex.GetReplicas() // Set metrics defer func() { - reconciler.VertexDisiredReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(desiredReplicas)) + reconciler.VertexDesiredReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(desiredReplicas)) reconciler.VertexCurrentReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(vertex.Status.Replicas)) }() diff --git a/server/apis/interface.go b/server/apis/interface.go index b618063637..ff41eb3ebe 100644 --- a/server/apis/interface.go +++ b/server/apis/interface.go @@ -42,4 +42,8 @@ type Handler interface { PodLogs(c *gin.Context) GetNamespaceEvents(c *gin.Context) GetPipelineStatus(c *gin.Context) + ListMonoVertices(c *gin.Context) + GetMonoVertex(c *gin.Context) + ListMonoVertexPods(c *gin.Context) + CreateMonoVertex(c *gin.Context) } diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 12fe7403b1..59dc00c810 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -241,13 +241,14 @@ func (h *handler) GetClusterSummary(c *gin.Context) { } type namespaceSummary struct { - pipelineSummary PipelineSummary - isbsvcSummary IsbServiceSummary + pipelineSummary PipelineSummary + isbsvcSummary IsbServiceSummary + monoVertexSummary MonoVertexSummary } var namespaceSummaryMap = make(map[string]namespaceSummary) // get pipeline summary - pipelineList, err := h.numaflowClient.Pipelines("").List(context.Background(), metav1.ListOptions{}) + pipelineList, err := h.numaflowClient.Pipelines("").List(c, metav1.ListOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error())) return @@ -271,7 +272,7 @@ func (h *handler) GetClusterSummary(c *gin.Context) { } // get isbsvc summary - isbsvcList, err := h.numaflowClient.InterStepBufferServices("").List(context.Background(), metav1.ListOptions{}) + isbsvcList, err := h.numaflowClient.InterStepBufferServices("").List(c, metav1.ListOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error())) return @@ -294,19 +295,45 @@ func (h *handler) GetClusterSummary(c *gin.Context) { namespaceSummaryMap[isbsvc.Namespace] = summary } + // get mono vertex summary + mvtList, err := h.numaflowClient.MonoVertices("").List(c, metav1.ListOptions{}) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, failed to fetch mono vertex list, %s", err.Error())) + return + } + for _, monoVertex := range mvtList.Items { + var summary namespaceSummary + if value, ok := namespaceSummaryMap[monoVertex.Namespace]; ok { + summary = value + } + status, err := getMonoVertexStatus(&monoVertex) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, failed to get the status of the mono vertex %s, %s", monoVertex.Name, err.Error())) + return + } + // if the mono vertex is healthy, increment the active count, otherwise increment the inactive count + // TODO - add more status types for mono vertex and update the logic here + if status == dfv1.MonoVertexStatusHealthy { + summary.monoVertexSummary.Active.increment(status) + } else { + summary.monoVertexSummary.Inactive++ + } + namespaceSummaryMap[monoVertex.Namespace] = summary + } + // get cluster summary var clusterSummary ClusterSummaryResponse // at this moment, if a namespace has neither pipeline nor isbsvc, it will not be included in the namespacedSummaryMap. // since we still want to pass these empty namespaces to the frontend, we add them here. for _, ns := range namespaces { if _, ok := namespaceSummaryMap[ns]; !ok { - // if the namespace is not in the namespaceSummaryMap, it means it has neither pipeline nor isbsvc + // if the namespace is not in the namespaceSummaryMap, it means it has none of the pipelines, isbsvc, or mono vertex // taking advantage of golang by default initializing the struct with zero value namespaceSummaryMap[ns] = namespaceSummary{} } } for name, summary := range namespaceSummaryMap { - clusterSummary = append(clusterSummary, NewNamespaceSummary(name, summary.pipelineSummary, summary.isbsvcSummary)) + clusterSummary = append(clusterSummary, NewNamespaceSummary(name, summary.pipelineSummary, summary.isbsvcSummary, summary.monoVertexSummary)) } // sort the cluster summary by namespace in alphabetical order, @@ -351,7 +378,7 @@ func (h *handler) CreatePipeline(c *gin.Context) { c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) return } - if _, err := h.numaflowClient.Pipelines(ns).Create(context.Background(), &pipelineSpec, metav1.CreateOptions{}); err != nil { + if _, err := h.numaflowClient.Pipelines(ns).Create(c, &pipelineSpec, metav1.CreateOptions{}); err != nil { h.respondWithError(c, fmt.Sprintf("Failed to create pipeline %q, %s", pipelineSpec.Name, err.Error())) return } @@ -377,7 +404,7 @@ func (h *handler) GetPipeline(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") // get general pipeline info - pl, err := h.numaflowClient.Pipelines(ns).Get(context.Background(), pipeline, metav1.GetOptions{}) + pl, err := h.numaflowClient.Pipelines(ns).Get(c, pipeline, metav1.GetOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline %q namespace %q, %s", pipeline, ns, err.Error())) return @@ -417,7 +444,7 @@ func (h *handler) GetPipeline(c *gin.Context) { minWM int64 = math.MaxInt64 maxWM int64 = math.MinInt64 ) - watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) + watermarks, err := client.GetPipelineWatermarks(c, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) return @@ -464,7 +491,7 @@ func (h *handler) UpdatePipeline(c *gin.Context) { // dryRun is used to check if the operation is just a validation or an actual update dryRun := strings.EqualFold("true", c.DefaultQuery("dry-run", "false")) - oldSpec, err := h.numaflowClient.Pipelines(ns).Get(context.Background(), pipeline, metav1.GetOptions{}) + oldSpec, err := h.numaflowClient.Pipelines(ns).Get(c, pipeline, metav1.GetOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline %q namespace %q, %s", pipeline, ns, err.Error())) return @@ -500,7 +527,7 @@ func (h *handler) UpdatePipeline(c *gin.Context) { } oldSpec.Spec = updatedSpec.Spec - if _, err := h.numaflowClient.Pipelines(ns).Update(context.Background(), oldSpec, metav1.UpdateOptions{}); err != nil { + if _, err := h.numaflowClient.Pipelines(ns).Update(c, oldSpec, metav1.UpdateOptions{}); err != nil { h.respondWithError(c, fmt.Sprintf("Failed to update pipeline %q, %s", pipeline, err.Error())) return } @@ -518,7 +545,7 @@ func (h *handler) DeletePipeline(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - if err := h.numaflowClient.Pipelines(ns).Delete(context.Background(), pipeline, metav1.DeleteOptions{}); err != nil { + if err := h.numaflowClient.Pipelines(ns).Delete(c, pipeline, metav1.DeleteOptions{}); err != nil { h.respondWithError(c, fmt.Sprintf("Failed to delete pipeline %q, %s", pipeline, err.Error())) return } @@ -551,7 +578,7 @@ func (h *handler) PatchPipeline(c *gin.Context) { return } - if _, err := h.numaflowClient.Pipelines(ns).Patch(context.Background(), pipeline, types.MergePatchType, patchSpec, metav1.PatchOptions{}); err != nil { + if _, err := h.numaflowClient.Pipelines(ns).Patch(c, pipeline, types.MergePatchType, patchSpec, metav1.PatchOptions{}); err != nil { h.respondWithError(c, fmt.Sprintf("Failed to patch pipeline %q, %s", pipeline, err.Error())) return } @@ -593,7 +620,7 @@ func (h *handler) CreateInterStepBufferService(c *gin.Context) { return } - if _, err := h.numaflowClient.InterStepBufferServices(ns).Create(context.Background(), &isbsvcSpec, metav1.CreateOptions{}); err != nil { + if _, err := h.numaflowClient.InterStepBufferServices(ns).Create(c, &isbsvcSpec, metav1.CreateOptions{}); err != nil { h.respondWithError(c, fmt.Sprintf("Failed to create interstepbuffer service %q, %s", isbsvcSpec.Name, err.Error())) return } @@ -616,7 +643,7 @@ func (h *handler) ListInterStepBufferServices(c *gin.Context) { func (h *handler) GetInterStepBufferService(c *gin.Context) { ns, isbsvcName := c.Param("namespace"), c.Param("isb-service") - isbsvc, err := h.numaflowClient.InterStepBufferServices(ns).Get(context.Background(), isbsvcName, metav1.GetOptions{}) + isbsvc, err := h.numaflowClient.InterStepBufferServices(ns).Get(c, isbsvcName, metav1.GetOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch interstepbuffer service %q namespace %q, %s", isbsvcName, ns, err.Error())) return @@ -645,7 +672,7 @@ func (h *handler) UpdateInterStepBufferService(c *gin.Context) { // dryRun is used to check if the operation is just a validation or an actual update dryRun := strings.EqualFold("true", c.DefaultQuery("dry-run", "false")) - isbSVC, err := h.numaflowClient.InterStepBufferServices(ns).Get(context.Background(), isbsvcName, metav1.GetOptions{}) + isbSVC, err := h.numaflowClient.InterStepBufferServices(ns).Get(c, isbsvcName, metav1.GetOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get the interstep buffer service: namespace %q isb-services %q: %s", ns, isbsvcName, err.Error())) return @@ -668,7 +695,7 @@ func (h *handler) UpdateInterStepBufferService(c *gin.Context) { return } isbSVC.Spec = updatedSpec.Spec - updatedISBSvc, err := h.numaflowClient.InterStepBufferServices(ns).Update(context.Background(), isbSVC, metav1.UpdateOptions{}) + updatedISBSvc, err := h.numaflowClient.InterStepBufferServices(ns).Update(c, isbSVC, metav1.UpdateOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to update the interstep buffer service: namespace %q isb-services %q: %s", ns, isbsvcName, err.Error())) return @@ -685,7 +712,7 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) { ns, isbsvcName := c.Param("namespace"), c.Param("isb-service") - pipelines, err := h.numaflowClient.Pipelines(ns).List(context.Background(), metav1.ListOptions{}) + pipelines, err := h.numaflowClient.Pipelines(ns).List(c, metav1.ListOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get pipelines in namespace %q, %s", ns, err.Error())) return @@ -698,7 +725,7 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) { } } - err = h.numaflowClient.InterStepBufferServices(ns).Delete(context.Background(), isbsvcName, metav1.DeleteOptions{}) + err = h.numaflowClient.InterStepBufferServices(ns).Delete(c, isbsvcName, metav1.DeleteOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to delete the interstep buffer service: namespace %q isb-service %q: %s", ns, isbsvcName, err.Error())) @@ -718,7 +745,7 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { return } - buffers, err := client.ListPipelineBuffers(context.Background(), pipeline) + buffers, err := client.ListPipelineBuffers(c, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get the Inter-Step buffers for pipeline %q: %s", pipeline, err.Error())) return @@ -737,7 +764,7 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) { return } - watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) + watermarks, err := client.GetPipelineWatermarks(c, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get the watermarks for pipeline %q: %s", pipeline, err.Error())) return @@ -766,7 +793,7 @@ func (h *handler) UpdateVertex(c *gin.Context) { dryRun = strings.EqualFold("true", c.DefaultQuery("dry-run", "false")) ) - oldPipelineSpec, err := h.numaflowClient.Pipelines(ns).Get(context.Background(), pipeline, metav1.GetOptions{}) + oldPipelineSpec, err := h.numaflowClient.Pipelines(ns).Get(c, pipeline, metav1.GetOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to update the vertex: namespace %q pipeline %q vertex %q: %s", ns, pipeline, inputVertexName, err.Error())) @@ -804,7 +831,7 @@ func (h *handler) UpdateVertex(c *gin.Context) { } oldPipelineSpec.Spec = newPipelineSpec.Spec - if _, err := h.numaflowClient.Pipelines(ns).Update(context.Background(), oldPipelineSpec, metav1.UpdateOptions{}); err != nil { + if _, err := h.numaflowClient.Pipelines(ns).Update(c, oldPipelineSpec, metav1.UpdateOptions{}); err != nil { h.respondWithError(c, fmt.Sprintf("Failed to update the vertex: namespace %q pipeline %q vertex %q: %s", ns, pipeline, inputVertexName, err.Error())) return @@ -817,7 +844,7 @@ func (h *handler) UpdateVertex(c *gin.Context) { func (h *handler) GetVerticesMetrics(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - pl, err := h.numaflowClient.Pipelines(ns).Get(context.Background(), pipeline, metav1.GetOptions{}) + pl, err := h.numaflowClient.Pipelines(ns).Get(c, pipeline, metav1.GetOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get the vertices metrics: namespace %q pipeline %q: %s", ns, pipeline, err.Error())) return @@ -831,7 +858,7 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { var results = make(map[string][]*daemon.VertexMetrics) for _, vertex := range pl.Spec.Vertices { - metrics, err := client.GetVertexMetrics(context.Background(), pipeline, vertex.Name) + metrics, err := client.GetVertexMetrics(c, pipeline, vertex.Name) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get the vertices metrics: namespace %q pipeline %q vertex %q: %s", ns, pipeline, vertex.Name, err.Error())) return @@ -847,7 +874,7 @@ func (h *handler) ListVertexPods(c *gin.Context) { ns, pipeline, vertex := c.Param("namespace"), c.Param("pipeline"), c.Param("vertex") limit, _ := strconv.ParseInt(c.Query("limit"), 10, 64) - pods, err := h.kubeClient.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ + pods, err := h.kubeClient.CoreV1().Pods(ns).List(c, metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, pipeline, dfv1.KeyVertexName, vertex), Limit: limit, Continue: c.Query("continue"), @@ -866,7 +893,7 @@ func (h *handler) ListPodsMetrics(c *gin.Context) { ns := c.Param("namespace") limit, _ := strconv.ParseInt(c.Query("limit"), 10, 64) - metrics, err := h.metricsClient.MetricsV1beta1().PodMetricses(ns).List(context.Background(), metav1.ListOptions{ + metrics, err := h.metricsClient.MetricsV1beta1().PodMetricses(ns).List(c, metav1.ListOptions{ Limit: limit, Continue: c.Query("continue"), }) @@ -936,7 +963,7 @@ func (h *handler) GetNamespaceEvents(c *gin.Context) { limit, _ := strconv.ParseInt(c.Query("limit"), 10, 64) var err error var events *corev1.EventList - if events, err = h.kubeClient.CoreV1().Events(ns).List(context.Background(), metav1.ListOptions{ + if events, err = h.kubeClient.CoreV1().Events(ns).List(c, metav1.ListOptions{ Limit: limit, Continue: c.Query("continue"), }); err != nil { @@ -992,7 +1019,7 @@ func (h *handler) GetPipelineStatus(c *gin.Context) { return } // Get the data criticality for the given pipeline - dataStatus, err := client.GetPipelineStatus(context.Background(), pipeline) + dataStatus, err := client.GetPipelineStatus(c, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to get the dataStatus for pipeline %q: %s", pipeline, err.Error())) return @@ -1006,6 +1033,91 @@ func (h *handler) GetPipelineStatus(c *gin.Context) { c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, response)) } +// ListMonoVertices is used to provide all the mono vertices in a namespace. +func (h *handler) ListMonoVertices(c *gin.Context) { + ns := c.Param("namespace") + mvtList, err := getMonoVertices(h, ns) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch all mono vertices for namespace %q, %s", ns, err.Error())) + return + } + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, mvtList)) +} + +// GetMonoVertex is used to provide the spec of a given mono vertex +func (h *handler) GetMonoVertex(c *gin.Context) { + ns, monoVertex := c.Param("namespace"), c.Param("mono-vertex") + // get general mono vertex info + mvt, err := h.numaflowClient.MonoVertices(ns).Get(c, monoVertex, metav1.GetOptions{}) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch mono vertex %q in namespace %q, %s", mvt, ns, err.Error())) + return + } + // set mono vertex kind and apiVersion + mvt.Kind = dfv1.MonoVertexGroupVersionKind.Kind + mvt.APIVersion = dfv1.SchemeGroupVersion.String() + // get mono vertex status + status, err := getMonoVertexStatus(mvt) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch mono vertex %q from namespace %q, %s", monoVertex, ns, err.Error())) + return + } + monoVertexResp := NewMonoVertexInfo(status, mvt) + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, monoVertexResp)) +} + +// CreateMonoVertex is used to create a mono vertex +func (h *handler) CreateMonoVertex(c *gin.Context) { + if h.opts.readonly { + errMsg := "Failed to perform this operation in read only mode" + c.JSON(http.StatusForbidden, NewNumaflowAPIResponse(&errMsg, nil)) + return + } + + ns := c.Param("namespace") + // dryRun is used to check if the operation is just a validation or an actual creation + dryRun := strings.EqualFold("true", c.DefaultQuery("dry-run", "false")) + + var monoVertexSpec dfv1.MonoVertex + if err := bindJson(c, &monoVertexSpec); err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to decode JSON request body to mono vertex spec, %s", err.Error())) + return + } + + if requestedNs := monoVertexSpec.Namespace; !isValidNamespaceSpec(requestedNs, ns) { + h.respondWithError(c, fmt.Sprintf("namespace mismatch, expected %s, got %s", ns, requestedNs)) + return + } + monoVertexSpec.Namespace = ns + // if the validation flag "dryRun" is set to true, return without creating the pipeline + if dryRun { + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) + return + } + if _, err := h.numaflowClient.MonoVertices(ns).Create(c, &monoVertexSpec, metav1.CreateOptions{}); err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to create mono vertex %q, %s", monoVertexSpec.Name, err.Error())) + return + } + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) +} + +// ListMonoVertexPods is used to provide all the pods of a mono vertex +func (h *handler) ListMonoVertexPods(c *gin.Context) { + ns, monoVertex := c.Param("namespace"), c.Param("mono-vertex") + limit, _ := strconv.ParseInt(c.Query("limit"), 10, 64) + pods, err := h.kubeClient.CoreV1().Pods(ns).List(c, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", dfv1.KeyMonoVertexName, monoVertex), + Limit: limit, + Continue: c.Query("continue"), + }) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to get a list of pods: namespace %q mono vertex %q: %s", + ns, monoVertex, err.Error())) + return + } + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, pods.Items)) +} + // getAllNamespaces is a utility used to fetch all the namespaces in the cluster // except the kube system namespaces func getAllNamespaces(h *handler) ([]string, error) { @@ -1062,6 +1174,24 @@ func getIsbServices(h *handler, namespace string) (ISBServices, error) { return isbList, nil } +// getMonoVertices is a utility used to fetch all the mono vertices in a given namespace +func getMonoVertices(h *handler, namespace string) (MonoVertices, error) { + mvtList, err := h.numaflowClient.MonoVertices(namespace).List(context.Background(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + var resList MonoVertices + for _, mvt := range mvtList.Items { + status, err := getMonoVertexStatus(&mvt) + if err != nil { + return nil, err + } + resp := NewMonoVertexInfo(status, &mvt) + resList = append(resList, resp) + } + return resList, nil +} + // GetPipelineStatus is used to provide the status of a given pipeline // TODO(API): Change the Daemon service to return the consolidated status of the pipeline // to save on multiple calls to the daemon service @@ -1092,6 +1222,11 @@ func getIsbServiceStatus(isbsvc *dfv1.InterStepBufferService) (string, error) { return retStatus, nil } +func getMonoVertexStatus(mvt *dfv1.MonoVertex) (string, error) { + // TODO - add more logic to determine the status of a mono vertex + return dfv1.MonoVertexStatusHealthy, nil +} + // validatePipelineSpec is used to validate the pipeline spec during create and update func validatePipelineSpec(h *handler, oldPipeline *dfv1.Pipeline, newPipeline *dfv1.Pipeline, validType string) error { ns := newPipeline.Namespace diff --git a/server/apis/v1/response_cluster_summary.go b/server/apis/v1/response_cluster_summary.go index fd7b033330..4d89c47e45 100644 --- a/server/apis/v1/response_cluster_summary.go +++ b/server/apis/v1/response_cluster_summary.go @@ -59,6 +59,16 @@ func (is *IsbServiceSummary) hasIsbService() bool { return is.Inactive > 0 || !is.Active.isEmpty() } +// MonoVertexSummary summarizes the number of active and inactive mono vertices. +type MonoVertexSummary struct { + Active ActiveStatus `json:"active"` + Inactive int `json:"inactive"` +} + +func (mvs *MonoVertexSummary) hasMonoVertex() bool { + return mvs.Inactive > 0 || !mvs.Active.isEmpty() +} + // ClusterSummaryResponse is a list of NamespaceSummary // of all the namespaces in a cluster wrapped in a list. type ClusterSummaryResponse []NamespaceSummary @@ -72,15 +82,20 @@ type NamespaceSummary struct { Namespace string `json:"namespace"` PipelineSummary PipelineSummary `json:"pipelineSummary"` IsbServiceSummary IsbServiceSummary `json:"isbServiceSummary"` + MonoVertexSummary MonoVertexSummary `json:"monoVertexSummary"` } // NewNamespaceSummary creates a new NamespaceSummary object with the given specifications. -func NewNamespaceSummary(namespace string, pipelineSummary PipelineSummary, - isbSummary IsbServiceSummary) NamespaceSummary { +func NewNamespaceSummary( + namespace string, + pipelineSummary PipelineSummary, + isbSummary IsbServiceSummary, + monoVertexSummary MonoVertexSummary) NamespaceSummary { return NamespaceSummary{ - IsEmpty: !(pipelineSummary.hasPipeline() || isbSummary.hasIsbService()), + IsEmpty: !(pipelineSummary.hasPipeline() || isbSummary.hasIsbService() || monoVertexSummary.hasMonoVertex()), Namespace: namespace, PipelineSummary: pipelineSummary, IsbServiceSummary: isbSummary, + MonoVertexSummary: monoVertexSummary, } } diff --git a/server/apis/v1/response_mono_vertex.go b/server/apis/v1/response_mono_vertex.go new file mode 100644 index 0000000000..290db26917 --- /dev/null +++ b/server/apis/v1/response_mono_vertex.go @@ -0,0 +1,39 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + +// MonoVertices is a list of mono vertices +type MonoVertices []MonoVertexInfo + +type MonoVertexInfo struct { + Name string `json:"name"` + // Status shows whether the mono vertex is healthy, warning, critical or inactive. + Status string `json:"status"` + // MonoVertex contains the detailed mono vertex spec. + MonoVertex v1alpha1.MonoVertex `json:"monoVertex"` +} + +// NewMonoVertexInfo creates a new MonoVertexInfo object with the given status and lag +func NewMonoVertexInfo(status string, mvt *v1alpha1.MonoVertex) MonoVertexInfo { + return MonoVertexInfo{ + Name: mvt.Name, + Status: status, + MonoVertex: *mvt, + } +} diff --git a/server/apis/v1/response_pipeline.go b/server/apis/v1/response_pipeline.go index 0a091c4cd7..d7c0cb0902 100644 --- a/server/apis/v1/response_pipeline.go +++ b/server/apis/v1/response_pipeline.go @@ -31,7 +31,7 @@ type PipelineInfo struct { Pipeline v1alpha1.Pipeline `json:"pipeline"` } -// NewPipelineInfo creates a new PipelineInfo object with the given status +// NewPipelineInfo creates a new PipelineInfo object with the given status and lag func NewPipelineInfo(status string, lag *int64, pl *v1alpha1.Pipeline) PipelineInfo { return PipelineInfo{ Name: pl.Name, diff --git a/server/authz/consts.go b/server/authz/consts.go index d141a5b064..4f43fb1129 100644 --- a/server/authz/consts.go +++ b/server/authz/consts.go @@ -20,15 +20,16 @@ const ( // PolicyMapPath is the path to the policy map. policyMapPath = "/etc/numaflow/rbac-policy.csv" - // rbacPropertiesPath is the path to the rbac properties file. It includes configuraion for authorization like + // rbacPropertiesPath is the path to the rbac properties file. It includes configuration for authorization like // scope, default policy etc. rbacPropertiesPath = "/etc/numaflow/rbac-conf.yaml" // Objects for the RBAC policy - ObjectAll = "*" - ObjectPipeline = "pipeline" - ObjectISBSvc = "isbsvc" - ObjectEvents = "events" + ObjectAll = "*" + ObjectPipeline = "pipeline" + ObjectMonoVertex = "mono-vertex" + ObjectISBSvc = "isbsvc" + ObjectEvents = "events" // Resouces for the RBAC policy ResourceAll = "*" diff --git a/server/cmd/server/start.go b/server/cmd/server/start.go index 24f8e25179..eccfca45a3 100644 --- a/server/cmd/server/start.go +++ b/server/cmd/server/start.go @@ -200,5 +200,9 @@ func CreateAuthRouteMap(baseHref string) authz.RouteMap { "GET:" + baseHref + "api/v1/metrics/namespaces/:namespace/pods": authz.NewRouteInfo(authz.ObjectPipeline, true), "GET:" + baseHref + "api/v1/namespaces/:namespace/pods/:pod/logs": authz.NewRouteInfo(authz.ObjectPipeline, true), "GET:" + baseHref + "api/v1/namespaces/:namespace/events": authz.NewRouteInfo(authz.ObjectEvents, true), + "GET:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices": authz.NewRouteInfo(authz.ObjectMonoVertex, true), + "GET:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices/:mono-vertex": authz.NewRouteInfo(authz.ObjectMonoVertex, true), + "GET:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices/:mono-vertex/pods": authz.NewRouteInfo(authz.ObjectMonoVertex, true), + "POST:" + baseHref + "api/v1/namespaces/:namespace/mono-vertices": authz.NewRouteInfo(authz.ObjectMonoVertex, true), } } diff --git a/server/cmd/server/start_test.go b/server/cmd/server/start_test.go index 645e53a3ff..1df0d5b8f9 100644 --- a/server/cmd/server/start_test.go +++ b/server/cmd/server/start_test.go @@ -25,12 +25,12 @@ import ( func TestCreateAuthRouteMap(t *testing.T) { t.Run("empty base", func(t *testing.T) { got := CreateAuthRouteMap("") - assert.Equal(t, 24, len(got)) + assert.Equal(t, 28, len(got)) }) t.Run("customize base", func(t *testing.T) { got := CreateAuthRouteMap("abcdefg") - assert.Equal(t, 24, len(got)) + assert.Equal(t, 28, len(got)) for k := range got { assert.Contains(t, k, "abcdefg") } diff --git a/server/routes/routes.go b/server/routes/routes.go index 580a819079..deb0cbcb6e 100644 --- a/server/routes/routes.go +++ b/server/routes/routes.go @@ -115,9 +115,9 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse r.GET("/cluster-summary", handler.GetClusterSummary) // Create a Pipeline. r.POST("/namespaces/:namespace/pipelines", handler.CreatePipeline) - // All pipelines for a given namespace. + // List all pipelines for a given namespace. r.GET("/namespaces/:namespace/pipelines", handler.ListPipelines) - // Get a Pipeline information. + // Get the pipeline information. r.GET("/namespaces/:namespace/pipelines/:pipeline", handler.GetPipeline) // Get a Pipeline health information. r.GET("/namespaces/:namespace/pipelines/:pipeline/health", handler.GetPipelineStatus) @@ -153,6 +153,14 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse r.GET("/namespaces/:namespace/pods/:pod/logs", handler.PodLogs) // List of the Kubernetes events of a namespace. r.GET("/namespaces/:namespace/events", handler.GetNamespaceEvents) + // List all mono vertices for a given namespace. + r.GET("/namespaces/:namespace/mono-vertices", handler.ListMonoVertices) + // Get the mono vertex information. + r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex", handler.GetMonoVertex) + // Get all the pods of a mono vertex. + r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex/pods", handler.ListMonoVertexPods) + // Create a mono vertex. + r.POST("/namespaces/:namespace/mono-vertices", handler.CreateMonoVertex) } // authMiddleware is the middleware for AuthN/AuthZ. diff --git a/test/api-e2e/api_test.go b/test/api-e2e/api_test.go index 819dd39261..ae927e51d4 100644 --- a/test/api-e2e/api_test.go +++ b/test/api-e2e/api_test.go @@ -122,7 +122,7 @@ func (s *APISuite) TestISBSVCReplica1() { assert.Contains(s.T(), deleteISBSVC, deleteISBSVCSuccessExpect) } -func (s *APISuite) TestPipeline0() { +func (s *APISuite) TestAPIsForIsbAndPipelineAndMonoVertex() { defer s.Given().When().UXServerPodPortForward(8145, 8443).TerminateAllPodPortForwards() namespaceBody := HTTPExpect(s.T(), "https://localhost:8145").GET("/api/v1/namespaces"). @@ -158,10 +158,20 @@ func (s *APISuite) TestPipeline0() { Status(200).Body().Raw() assert.Contains(s.T(), resumePipeline1, patchPipelineSuccessExpect) + // create a mono vertex + var mv1 v1alpha1.MonoVertex + err = json.Unmarshal(testMonoVertex1, &mv1) + assert.NoError(s.T(), err) + createMonoVertex := HTTPExpect(s.T(), "https://localhost:8145").POST(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices", Namespace)).WithJSON(mv1). + Expect(). + Status(200).Body().Raw() + var createMonoVertexSuccessExpect = `"data":null` + assert.Contains(s.T(), createMonoVertex, createMonoVertexSuccessExpect) + clusterSummaryBody := HTTPExpect(s.T(), "https://localhost:8145").GET("/api/v1/cluster-summary"). Expect(). Status(200).Body().Raw() - var clusterSummaryExpect = `{"isEmpty":false,"namespace":"numaflow-system","pipelineSummary":{"active":{"Healthy":2,"Warning":0,"Critical":0},"inactive":0},"isbServiceSummary":{"active":{"Healthy":1,"Warning":0,"Critical":0},"inactive":0}}` + var clusterSummaryExpect = `{"isEmpty":false,"namespace":"numaflow-system","pipelineSummary":{"active":{"Healthy":2,"Warning":0,"Critical":0},"inactive":0},"isbServiceSummary":{"active":{"Healthy":1,"Warning":0,"Critical":0},"inactive":0},"monoVertexSummary":{"active":{"Healthy":1,"Warning":0,"Critical":0},"inactive":0}}` assert.Contains(s.T(), clusterSummaryBody, clusterSummaryExpect) listPipelineBody := HTTPExpect(s.T(), "https://localhost:8145").GET(fmt.Sprintf("/api/v1/namespaces/%s/pipelines", Namespace)). @@ -179,9 +189,14 @@ func (s *APISuite) TestPipeline0() { var deletePipelineSuccessExpect = `"data":null` assert.Contains(s.T(), deletePipeline1, deletePipelineSuccessExpect) assert.Contains(s.T(), deletePipeline2, deletePipelineSuccessExpect) + + listMonoVertexBody := HTTPExpect(s.T(), "https://localhost:8145").GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices", Namespace)). + Expect(). + Status(200).Body().Raw() + assert.Contains(s.T(), listMonoVertexBody, testMonoVertex1Name) } -func (s *APISuite) TestPipeline1() { +func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPods() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() diff --git a/test/api-e2e/testdata.go b/test/api-e2e/testdata.go index 24623c54eb..5322b38ce8 100644 --- a/test/api-e2e/testdata.go +++ b/test/api-e2e/testdata.go @@ -141,4 +141,34 @@ var ( } } `) + testMonoVertex1Name = "test-mono-vertex-1" + testMonoVertex1 = []byte(` +{ + "apiVersion": "numaflow.numaproj.io/v1alpha1", + "kind": "MonoVertex", + "metadata": { + "name": "test-mono-vertex-1" + }, + "spec": { + "source": { + "udsource": { + "container": { + "image": "quay.io/numaio/numaflow-java/source-simple-source:stable" + } + }, + "transformer": { + "container": { + "image": "quay.io/numaio/numaflow-rs/source-transformer-now:stable" + } + } + }, + "sink": { + "udsink": { + "container": { + "image": "quay.io/numaio/numaflow-java/simple-sink:stable" + } + } + } + } +}`) )