Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: API cluster summary improvement #1169

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,57 +94,63 @@ func (h *handler) ListNamespaces(c *gin.Context) {

// GetClusterSummary summarizes information of all the namespaces in a cluster and wrapped the result in a list.
func (h *handler) GetClusterSummary(c *gin.Context) {
namespaces, err := getAllNamespaces(h)
type namespaceSummary struct {
pipelineSummary PipelineSummary
isbsvcSummary IsbServiceSummary
}
var namespaceSummaryMap = make(map[string]namespaceSummary)

// get pipeline summary
pipelineList, err := h.numaflowClient.Pipelines("").List(context.Background(), metav1.ListOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
}
var clusterSummary ClusterSummaryResponse
// TODO(API): need a more efficient solution
// Loop over the namespaces to get status
for _, ns := range namespaces {
// Fetch pipeline summary
pipelines, err := getPipelines(h, ns)
for _, pipeline := range pipelineList.Items {
var summary namespaceSummary
if value, ok := namespaceSummaryMap[pipeline.Namespace]; ok {
summary = value
}
status, err := getPipelineStatus(&pipeline)
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
}

var pipeSummary PipelineSummary
var pipeActiveSummary ActiveStatus
// Loop over the pipelines and get the status
for _, pl := range pipelines {
if pl.Status == PipelineStatusInactive {
pipeSummary.Inactive++
} else {
pipeActiveSummary.increment(pl.Status)

}
if status == PipelineStatusInactive {
summary.pipelineSummary.Inactive++
} else {
summary.pipelineSummary.Active.increment(status)
}
pipeSummary.Active = pipeActiveSummary
namespaceSummaryMap[pipeline.Namespace] = summary
}

// Fetch ISB service summary
isbSvcs, err := getIsbServices(h, ns)
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
// get isbsvc summary
isbsvcList, err := h.numaflowClient.InterStepBufferServices("").List(context.Background(), metav1.ListOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch cluster summary, %s", err.Error()))
return
}
for _, isbsvc := range isbsvcList.Items {
var summary namespaceSummary
if value, ok := namespaceSummaryMap[isbsvc.Namespace]; ok {
summary = value
}

var isbSummary IsbServiceSummary
var isbActiveSummary ActiveStatus
// loop over the ISB services and get the status
for _, isb := range isbSvcs {
if isb.Status == ISBServiceStatusInactive {
isbSummary.Inactive++
} else {
isbActiveSummary.increment(isb.Status)
}
// TODO(API) : Get the current status of the ISB service
status := ISBServiceStatusHealthy
if status == ISBServiceStatusInactive {
summary.isbsvcSummary.Inactive++
} else {
summary.isbsvcSummary.Active.increment(status)
}
isbSummary.Active = isbActiveSummary
clusterSummary = append(clusterSummary, NewClusterSummary(ns, pipeSummary, isbSummary))
namespaceSummaryMap[isbsvc.Namespace] = summary
}
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, clusterSummary))

// get cluster summary
var clusterSummary ClusterSummaryResponse
for name, summary := range namespaceSummaryMap {
clusterSummary = append(clusterSummary, NewClusterSummary(name, summary.pipelineSummary, summary.isbsvcSummary))
}
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, clusterSummary))
}

// CreatePipeline is used to create a given pipeline
Expand Down
Loading