Skip to content

Commit

Permalink
[horus] Downtime recovery build logic (#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfordjody authored Oct 10, 2024
1 parent 06b5e00 commit 296bd23
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 10 deletions.
2 changes: 1 addition & 1 deletion app/horus/base/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type NodeDataInfo struct {
RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"`
RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"`
DownTimeRecoveryMark int64 `json:"downtime_recovery_mark" xorm:"downtime_recovery_mark"`
DownTimeRecoveryQL string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"`
DownTimeRecoveryQL []string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"`
}

type PodDataInfo struct {
Expand Down
28 changes: 23 additions & 5 deletions app/horus/core/horuser/node_downtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -171,13 +172,30 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count)
continue
}
abnormalRecoveryQL := fmt.Sprintf(strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " "), node)
res, err := h.InstantQuery(addr, abnormalRecoveryQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if len(res) == 0 {
klog.Errorf("no results returned for query:%s", abnormalRecoveryQL)
continue
}
if err != nil {
klog.Errorf("downtimeNodes InstantQuery NodeName To IPs empty err:%v", err)
klog.Infof("clusterName:%v\n AbnormalInfoSystemQL:%v, err:%v", clusterName, abnormalRecoveryQL, err)
continue
}
str := ""
for _, v := range res {
str = string(v.Metric["instance"])
}
WithDownNodeIPs[node] = str
}

msg = fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机恢复临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs))
write := db.NodeDataInfo{
NodeName: nodeName,
NodeIP: nodeIP,
ClusterName: clusterName,
ModuleName: NODE_DOWN,
NodeName: nodeName,
NodeIP: nodeIP,
ClusterName: clusterName,
ModuleName: NODE_DOWN,
DownTimeRecoveryQL: h.cc.NodeDownTime.AbnormalRecoveryQL,
}
exist, _ := write.Check()
if exist {
Expand Down
37 changes: 33 additions & 4 deletions app/horus/core/horuser/node_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gammazero/workerpool"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"strings"
"time"
)

Expand All @@ -32,7 +33,13 @@ func (h *Horuser) RecoveryManager(ctx context.Context) error {
return nil
}

func (h *Horuser) recoveryCheck(ctx context.Context) {
func (h *Horuser) DownTimeRecoveryManager(ctx context.Context) error {
go wait.UntilWithContext(ctx, h.recoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second)
<-ctx.Done()
return nil
}

func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
if err != nil {
klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err)
Expand All @@ -47,6 +54,28 @@ func (h *Horuser) recoveryCheck(ctx context.Context) {
d := d
wp.Submit(func() {
h.recoveryNodes(d)
h.downTimeRecoveryNodes(d)
})

}
wp.StopWait()
}

func (h *Horuser) recoveryCheck(ctx context.Context) {
data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
if err != nil {
klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err)
return
}
if len(data) == 0 {
klog.Info("recovery check GetRecoveryNodeDataInfoDate zero.")
return
}
wp := workerpool.New(50)
for _, d := range data {
d := d
wp.Submit(func() {
h.downTimeRecoveryNodes(d)
})

}
Expand Down Expand Up @@ -94,17 +123,17 @@ func (h *Horuser) recoveryNodes(n db.NodeDataInfo) {
klog.Infof("RecoveryMarker result success:%v", success)
}

func (h *Horuser) DownTimeRecoveryNodes(n db.NodeDataInfo) {
func (h *Horuser) downTimeRecoveryNodes(n db.NodeDataInfo) {
promAddr := h.cc.PromMultiple[n.ClusterName]
if promAddr == "" {
klog.Error("recoveryNodes promAddr by clusterName empty.")
klog.Infof("clusterName:%v nodeName:%v", n.ClusterName, n.NodeName)
return
}
vecs, err := h.InstantQuery(promAddr, n.DownTimeRecoveryQL, n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
vecs, err := h.InstantQuery(promAddr, strings.Join(n.DownTimeRecoveryQL, " "), n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
klog.Errorf("recoveryNodes InstantQuery err:%v", err)
klog.Infof("recoveryQL:%v", n.DownTimeRecoveryQL)
klog.Infof("downTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
return
}
if len(vecs) != 1 {
Expand Down
1 change: 1 addition & 0 deletions app/horus/core/horuser/node_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,5 @@ func (h *Horuser) TryRestart(node db.NodeDataInfo) {
klog.Error("It's been rebooted once.")
return
}

}

0 comments on commit 296bd23

Please sign in to comment.