diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 7628ba7ad6..a5b8938350 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -94,57 +94,63 @@ func (h *handler) ListNamespaces(c *gin.Context) { // GetClusterSummary summarizes information of all the namespaces in a cluster and wrapped the result in a list. func (h *handler) GetClusterSummary(c *gin.Context) { - namespaces, err := getAllNamespaces(h) + type namespaceSummary struct { + pipelineSummary PipelineSummary + isbsvcSummary IsbServiceSummary + } + var namespaceSummaryMap = make(map[string]namespaceSummary) + + // get pipeline summary + pipelineList, err := h.numaflowClient.Pipelines("").List(context.Background(), metav1.ListOptions{}) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error())) return } - var clusterSummary ClusterSummaryResponse - // TODO(API): need a more efficient solution - // Loop over the namespaces to get status - for _, ns := range namespaces { - // Fetch pipeline summary - pipelines, err := getPipelines(h, ns) + for _, pipeline := range pipelineList.Items { + var summary namespaceSummary + if value, ok := namespaceSummaryMap[pipeline.Namespace]; ok { + summary = value + } + status, err := getPipelineStatus(&pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error())) return } - - var pipeSummary PipelineSummary - var pipeActiveSummary ActiveStatus - // Loop over the pipelines and get the status - for _, pl := range pipelines { - if pl.Status == PipelineStatusInactive { - pipeSummary.Inactive++ - } else { - pipeActiveSummary.increment(pl.Status) - - } + if status == PipelineStatusInactive { + summary.pipelineSummary.Inactive++ + } else { + summary.pipelineSummary.Active.increment(status) } - pipeSummary.Active = pipeActiveSummary + namespaceSummaryMap[pipeline.Namespace] = summary + } - // Fetch ISB service summary - isbSvcs, err := getIsbServices(h, ns) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error())) - return + // get isbsvc summary + isbsvcList, err := h.numaflowClient.InterStepBufferServices("").List(context.Background(), metav1.ListOptions{}) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error())) + return + } + for _, isbsvc := range isbsvcList.Items { + var summary namespaceSummary + if value, ok := namespaceSummaryMap[isbsvc.Namespace]; ok { + summary = value } - - var isbSummary IsbServiceSummary - var isbActiveSummary ActiveStatus - // loop over the ISB services and get the status - for _, isb := range isbSvcs { - if isb.Status == ISBServiceStatusInactive { - isbSummary.Inactive++ - } else { - isbActiveSummary.increment(isb.Status) - } + // TODO(API) : Get the current status of the ISB service + status := ISBServiceStatusHealthy + if status == ISBServiceStatusInactive { + summary.isbsvcSummary.Inactive++ + } else { + summary.isbsvcSummary.Active.increment(status) } - isbSummary.Active = isbActiveSummary - clusterSummary = append(clusterSummary, NewClusterSummary(ns, pipeSummary, isbSummary)) + namespaceSummaryMap[isbsvc.Namespace] = summary } - c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, clusterSummary)) + // get cluster summary + var clusterSummary ClusterSummaryResponse + for name, summary := range namespaceSummaryMap { + clusterSummary = append(clusterSummary, NewClusterSummary(name, summary.pipelineSummary, summary.isbsvcSummary)) + } + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, clusterSummary)) } // CreatePipeline is used to create a given pipeline