Skip to content

Commit

Permalink
chore: API cluster summary improvement (#1169)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
Co-authored-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 and jyu6 committed Oct 6, 2023
1 parent d4b5f1b commit 3a42390
Showing 1 changed file with 43 additions and 37 deletions.
80 changes: 43 additions & 37 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,57 +94,63 @@ func (h *handler) ListNamespaces(c *gin.Context) {

// GetClusterSummary summarizes information of all the namespaces in a cluster and wrapped the result in a list.
func (h *handler) GetClusterSummary(c *gin.Context) {
namespaces, err := getAllNamespaces(h)
type namespaceSummary struct {
pipelineSummary PipelineSummary
isbsvcSummary IsbServiceSummary
}
var namespaceSummaryMap = make(map[string]namespaceSummary)

// get pipeline summary
pipelineList, err := h.numaflowClient.Pipelines("").List(context.Background(), metav1.ListOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
}
var clusterSummary ClusterSummaryResponse
// TODO(API): need a more efficient solution
// Loop over the namespaces to get status
for _, ns := range namespaces {
// Fetch pipeline summary
pipelines, err := getPipelines(h, ns)
for _, pipeline := range pipelineList.Items {
var summary namespaceSummary
if value, ok := namespaceSummaryMap[pipeline.Namespace]; ok {
summary = value
}
status, err := getPipelineStatus(&pipeline)
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
}

var pipeSummary PipelineSummary
var pipeActiveSummary ActiveStatus
// Loop over the pipelines and get the status
for _, pl := range pipelines {
if pl.Status == PipelineStatusInactive {
pipeSummary.Inactive++
} else {
pipeActiveSummary.increment(pl.Status)

}
if status == PipelineStatusInactive {
summary.pipelineSummary.Inactive++
} else {
summary.pipelineSummary.Active.increment(status)
}
pipeSummary.Active = pipeActiveSummary
namespaceSummaryMap[pipeline.Namespace] = summary
}

// Fetch ISB service summary
isbSvcs, err := getIsbServices(h, ns)
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
// get isbsvc summary
isbsvcList, err := h.numaflowClient.InterStepBufferServices("").List(context.Background(), metav1.ListOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
}
for _, isbsvc := range isbsvcList.Items {
var summary namespaceSummary
if value, ok := namespaceSummaryMap[isbsvc.Namespace]; ok {
summary = value
}

var isbSummary IsbServiceSummary
var isbActiveSummary ActiveStatus
// loop over the ISB services and get the status
for _, isb := range isbSvcs {
if isb.Status == ISBServiceStatusInactive {
isbSummary.Inactive++
} else {
isbActiveSummary.increment(isb.Status)
}
// TODO(API) : Get the current status of the ISB service
status := ISBServiceStatusHealthy
if status == ISBServiceStatusInactive {
summary.isbsvcSummary.Inactive++
} else {
summary.isbsvcSummary.Active.increment(status)
}
isbSummary.Active = isbActiveSummary
clusterSummary = append(clusterSummary, NewClusterSummary(ns, pipeSummary, isbSummary))
namespaceSummaryMap[isbsvc.Namespace] = summary
}
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, clusterSummary))

// get cluster summary
var clusterSummary ClusterSummaryResponse
for name, summary := range namespaceSummaryMap {
clusterSummary = append(clusterSummary, NewClusterSummary(name, summary.pipelineSummary, summary.isbsvcSummary))
}
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, clusterSummary))
}

// CreatePipeline is used to create a given pipeline
Expand Down

0 comments on commit 3a42390

Please sign in to comment.