Skip to content

Commit

Permalink
[horus] Building downtime restart recovery logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mfordjody committed Oct 10, 2024
1 parent 296bd23 commit a3dda4a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 86 deletions.
2 changes: 1 addition & 1 deletion app/horus/base/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type NodeDataInfo struct {
RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"`
RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"`
DownTimeRecoveryMark int64 `json:"downtime_recovery_mark" xorm:"downtime_recovery_mark"`
DownTimeRecoveryQL []string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"`
DownTimeRecoveryQL string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"`
}

type PodDataInfo struct {
Expand Down
11 changes: 11 additions & 0 deletions app/horus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ func main() {
}
return nil
})
group.Add(func() error {
if c.NodeDownTime.Enabled {
klog.Info("horus node downtime recovery manager start success.")
err := horus.DownTimeRecoveryManager(ctx)
if err != nil {
klog.Errorf("horus node downtime recovery manager start failed err:%v", err)
return err
}
}
return nil
})
group.Add(func() error {
if c.PodStagnationCleaner.Enabled {
klog.Info("horus pod stagnation clean manager start success.")
Expand Down
64 changes: 11 additions & 53 deletions app/horus/core/horuser/node_downtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -73,29 +72,8 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {

nodeDownTimeRes := make(map[string]int)
aq := len(h.cc.NodeDownTime.AbnormalityQL)
rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
for _, ql := range h.cc.NodeDownTime.AbnormalityQL {
ql := ql
res, err := h.InstantQuery(addr, ql, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
klog.Errorf("downtimeNodes InstantQuery err:%v", err)
klog.Infof("clusterName:%v\n", clusterName)
continue
}

for _, v := range res {
v := v
nodeName := string(v.Metric["node"])
if nodeName == "" {
klog.Error("downtimeNodes InstantQuery nodeName empty.")
klog.Infof("clusterName:%v\n metric:%v\n", clusterName, v.Metric)
continue
}
nodeDownTimeRes[nodeName]++
}
}

for _, ql := range h.cc.NodeDownTime.AbnormalRecoveryQL {
for _, ql := range h.cc.NodeDownTime.AbnormalityQL {
ql := ql
res, err := h.InstantQuery(addr, ql, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
Expand All @@ -118,13 +96,14 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {

WithDownNodeIPs := make(map[string]string)

for node, count := range nodeDownTimeRes {
for nodeName, count := range nodeDownTimeRes {
if count < aq {
klog.Error("downtimeNodes not reach threshold")
klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count)
klog.Error("downtimeNodes not reach threshold.")
klog.Infof("clusterName:%v nodeName:%v threshold:%v count:%v", clusterName, nodeName, aq, count)
continue
}
abnormalInfoSystemQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalInfoSystemQL, node)
abnormalInfoSystemQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalInfoSystemQL, nodeName)

res, err := h.InstantQuery(addr, abnormalInfoSystemQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if len(res) == 0 {
klog.Errorf("no results returned for query:%s", abnormalInfoSystemQL)
Expand All @@ -139,7 +118,7 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
for _, v := range res {
str = string(v.Metric["instance"])
}
WithDownNodeIPs[node] = str
WithDownNodeIPs[nodeName] = str
}

msg := fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs))
Expand All @@ -154,6 +133,7 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
return
}
klog.Info("Cordon node success.")

klog.Infof("clusterName:%v\n nodeName:%v\n", clusterName, nodeName)

node, err := kubeClient.CoreV1().Nodes().Get(ctxFirst, nodeName, metav1.GetOptions{})
Expand All @@ -166,36 +146,14 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
return "", nil
}()

for node, count := range nodeDownTimeRes {
if count < rq {
klog.Error("downtimeNodes not reach recovery threshold")
klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count)
continue
}
abnormalRecoveryQL := fmt.Sprintf(strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " "), node)
res, err := h.InstantQuery(addr, abnormalRecoveryQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if len(res) == 0 {
klog.Errorf("no results returned for query:%s", abnormalRecoveryQL)
continue
}
if err != nil {
klog.Errorf("downtimeNodes InstantQuery NodeName To IPs empty err:%v", err)
klog.Infof("clusterName:%v\n AbnormalInfoSystemQL:%v, err:%v", clusterName, abnormalRecoveryQL, err)
continue
}
str := ""
for _, v := range res {
str = string(v.Metric["instance"])
}
WithDownNodeIPs[node] = str
}
msg = fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机恢复临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs))
moduleName := 0
abnormalRecoveryQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalRecoveryQL[moduleName], nodeName)
write := db.NodeDataInfo{
NodeName: nodeName,
NodeIP: nodeIP,
ClusterName: clusterName,
ModuleName: NODE_DOWN,
DownTimeRecoveryQL: h.cc.NodeDownTime.AbnormalRecoveryQL,
DownTimeRecoveryQL: abnormalRecoveryQL,
}
exist, _ := write.Check()
if exist {
Expand Down
60 changes: 32 additions & 28 deletions app/horus/core/horuser/node_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/gammazero/workerpool"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"strings"
"time"
)

Expand All @@ -34,12 +33,12 @@ func (h *Horuser) RecoveryManager(ctx context.Context) error {
}

func (h *Horuser) DownTimeRecoveryManager(ctx context.Context) error {
go wait.UntilWithContext(ctx, h.recoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second)
go wait.UntilWithContext(ctx, h.downTimeRecoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second)
<-ctx.Done()
return nil
}

func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
func (h *Horuser) recoveryCheck(ctx context.Context) {
data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
if err != nil {
klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err)
Expand All @@ -54,21 +53,21 @@ func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
d := d
wp.Submit(func() {
h.recoveryNodes(d)
h.downTimeRecoveryNodes(d)

})

}
wp.StopWait()
}

func (h *Horuser) recoveryCheck(ctx context.Context) {
data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
data, err := db.GetDownTimeRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
if err != nil {
klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err)
klog.Errorf("recovery check GetDownTimeRecoveryNodeDataInfoDate err:%v", err)
return
}
if len(data) == 0 {
klog.Info("recovery check GetRecoveryNodeDataInfoDate zero.")
klog.Info("recovery check GetDownTimeRecoveryNodeDataInfoDate zero.")
return
}
wp := workerpool.New(50)
Expand All @@ -77,7 +76,6 @@ func (h *Horuser) recoveryCheck(ctx context.Context) {
wp.Submit(func() {
h.downTimeRecoveryNodes(d)
})

}
wp.StopWait()
}
Expand Down Expand Up @@ -126,40 +124,46 @@ func (h *Horuser) recoveryNodes(n db.NodeDataInfo) {
func (h *Horuser) downTimeRecoveryNodes(n db.NodeDataInfo) {
promAddr := h.cc.PromMultiple[n.ClusterName]
if promAddr == "" {
klog.Error("recoveryNodes promAddr by clusterName empty.")
klog.Error("downTimeRecoveryNodes promAddr by clusterName empty.")
klog.Infof("clusterName:%v nodeName:%v", n.ClusterName, n.NodeName)
return
}
vecs, err := h.InstantQuery(promAddr, strings.Join(n.DownTimeRecoveryQL, " "), n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
vecs, err := h.InstantQuery(promAddr, n.DownTimeRecoveryQL, n.ClusterName, h.cc.NodeRecovery.PromQueryTimeSecond)
if err != nil {
klog.Errorf("recoveryNodes InstantQuery err:%v", err)
klog.Infof("downTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
klog.Errorf("downTimeRecoveryNodes InstantQuery err:%v", err)
klog.Infof("DownTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
return
}
if len(vecs) != 1 {
klog.Infof("Expected 1 result, but got:%d", len(vecs))
return
}
if len(vecs) > rq {
klog.Error("downTimeRecoveryNodes not reach threshold")
}
if err != nil {
klog.Errorf("recoveryNodes InstantQuery err:%v", err)
klog.Infof("recoveryQL:%v", n.DownTimeRecoveryQL)
klog.Errorf("downTimeRecoveryNodes InstantQuery err:%v", err)
klog.Infof("DownTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
return
}
klog.Info("recoveryNodes InstantQuery success.")

err = h.UnCordon(n.NodeName, n.ClusterName)
res := "Success"
if err != nil {
res = fmt.Sprintf("result failed:%v", err)
}
msg := fmt.Sprintf("\n【集群: %v】\n【宕机节点已达到恢复临界点】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n", n.ClusterName, n.NodeName, res, n.CreateTime)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)
if len(vecs) == rq {
err = h.UnCordon(n.NodeName, n.ClusterName)
res := "Success"
if err != nil {
res = fmt.Sprintf("result failed:%v", err)
}
msg := fmt.Sprintf("\n【集群: %v】\n【封锁节点恢复调度】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n", n.ClusterName, n.NodeName, res, n.CreateTime)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)

success, err := n.DownTimeRecoveryMarker()
if err != nil {
klog.Errorf("DownTimeRecoveryMarker result failed err:%v", err)
return
success, err := n.DownTimeRecoveryMarker()
if err != nil {
klog.Errorf("DownTimeRecoveryMarker result failed err:%v", err)
return
}
klog.Infof("DownTimeRecoveryMarker result success:%v", success)
}
klog.Infof("DownTimeRecoveryMarker result success:%v", success)
}
30 changes: 28 additions & 2 deletions app/horus/core/horuser/node_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"os/exec"
"strings"
"time"
)

Expand Down Expand Up @@ -79,13 +80,38 @@ func (h *Horuser) TryRestart(node db.NodeDataInfo) {
return
}
klog.Infof("Successfully restarted node %v.", node.NodeName)

rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
query := strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " and ")
vecs, err := h.InstantQuery(h.cc.PromMultiple[node.ClusterName], query, node.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
klog.Errorf("Failed to query Prometheus for recovery threshold after restart: %v", err)
return
}

if len(vecs) == rq {
klog.Infof("Node %v has reached recovery threshold after restart.", node.NodeName)

err = h.UnCordon(node.NodeName, node.ClusterName)
if err != nil {
klog.Errorf("Uncordon node failed after restart: %v", err)
return
}

msg = fmt.Sprintf("\n【集群: %v】\n【宕机节点已恢复】\n【恢复节点: %v】\n【处理结果:成功】\n【日期: %v】\n", node.ClusterName, node.NodeName, node.FirstDate)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)

} else {
klog.Infof("Node %v has not reached recovery threshold after restart.", node.NodeName)
}

} else {
klog.Infof("RestartMarker did not success for node %v", node.NodeName)
}

if node.Restart > 2 {
klog.Error("It's been rebooted once.")
klog.Error("The node has already been rebooted more than twice.")
return
}

}
4 changes: 2 additions & 2 deletions manifests/horus/horus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ nodeDownTime:
promQueryTimeSecond: 60
abnormalityQL:
- 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 20
- (avg by (node) (node_memory_MemFree_bytes / node_memory_MemTotal_bytes )) * 100 > 25
- (avg by (node) (node_memory_MemFree_bytes / node_memory_MemTotal_bytes )) * 100 > 50
# - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 < 15
abnormalInfoSystemQL:
node_os_info{node="%s"}
allSystemUser: "zxj"
allSystemPassword: "1"
abnormalRecoveryQL:
- 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle",node="%s"}[5m])) * 100) < 20
- (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 < 25
- (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 < 30
# - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 > 15
kubeMultiple:
cluster: config.1
Expand Down

0 comments on commit a3dda4a

Please sign in to comment.