diff --git a/app/horus/base/db/db.go b/app/horus/base/db/db.go index 0408b0ac1..b1afe2469 100644 --- a/app/horus/base/db/db.go +++ b/app/horus/base/db/db.go @@ -41,7 +41,7 @@ type NodeDataInfo struct { RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"` RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"` DownTimeRecoveryMark int64 `json:"downtime_recovery_mark" xorm:"downtime_recovery_mark"` - DownTimeRecoveryQL []string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"` + DownTimeRecoveryQL string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"` } type PodDataInfo struct { diff --git a/app/horus/cmd/main.go b/app/horus/cmd/main.go index ef6f8a380..997044c56 100644 --- a/app/horus/cmd/main.go +++ b/app/horus/cmd/main.go @@ -136,6 +136,17 @@ func main() { } return nil }) + group.Add(func() error { + if c.NodeDownTime.Enabled { + klog.Info("horus node downtime recovery manager start success.") + err := horus.DownTimeRecoveryManager(ctx) + if err != nil { + klog.Errorf("horus node downtime recovery manager start failed err:%v", err) + return err + } + } + return nil + }) group.Add(func() error { if c.PodStagnationCleaner.Enabled { klog.Info("horus pod stagnation clean manager start success.") diff --git a/app/horus/core/horuser/node_downtime.go b/app/horus/core/horuser/node_downtime.go index 8a4a7ef82..9e523bc49 100644 --- a/app/horus/core/horuser/node_downtime.go +++ b/app/horus/core/horuser/node_downtime.go @@ -23,7 +23,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" - "strings" "sync" "time" ) @@ -73,29 +72,8 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) { nodeDownTimeRes := make(map[string]int) aq := len(h.cc.NodeDownTime.AbnormalityQL) - rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL) - for _, ql := range h.cc.NodeDownTime.AbnormalityQL { - ql := ql - res, err := h.InstantQuery(addr, ql, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond) - if err != nil { - klog.Errorf("downtimeNodes InstantQuery err:%v", err) - klog.Infof("clusterName:%v\n", clusterName) - continue - } - for _, v := range res { - v := v - nodeName := string(v.Metric["node"]) - if nodeName == "" { - klog.Error("downtimeNodes InstantQuery nodeName empty.") - klog.Infof("clusterName:%v\n metric:%v\n", clusterName, v.Metric) - continue - } - nodeDownTimeRes[nodeName]++ - } - } - - for _, ql := range h.cc.NodeDownTime.AbnormalRecoveryQL { + for _, ql := range h.cc.NodeDownTime.AbnormalityQL { ql := ql res, err := h.InstantQuery(addr, ql, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond) if err != nil { @@ -118,13 +96,14 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) { WithDownNodeIPs := make(map[string]string) - for node, count := range nodeDownTimeRes { + for nodeName, count := range nodeDownTimeRes { if count < aq { - klog.Error("downtimeNodes not reach threshold") - klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count) + klog.Error("downtimeNodes not reach threshold.") + klog.Infof("clusterName:%v nodeName:%v threshold:%v count:%v", clusterName, nodeName, aq, count) continue } - abnormalInfoSystemQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalInfoSystemQL, node) + abnormalInfoSystemQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalInfoSystemQL, nodeName) + res, err := h.InstantQuery(addr, abnormalInfoSystemQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond) if len(res) == 0 { klog.Errorf("no results returned for query:%s", abnormalInfoSystemQL) @@ -139,7 +118,7 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) { for _, v := range res { str = string(v.Metric["instance"]) } - WithDownNodeIPs[node] = str + WithDownNodeIPs[nodeName] = str } msg := fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs)) @@ -154,6 +133,7 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) { return } klog.Info("Cordon node success.") + klog.Infof("clusterName:%v\n nodeName:%v\n", clusterName, nodeName) node, err := kubeClient.CoreV1().Nodes().Get(ctxFirst, nodeName, metav1.GetOptions{}) @@ -166,36 +146,14 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) { return "", nil }() - for node, count := range nodeDownTimeRes { - if count < rq { - klog.Error("downtimeNodes not reach recovery threshold") - klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count) - continue - } - abnormalRecoveryQL := fmt.Sprintf(strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " "), node) - res, err := h.InstantQuery(addr, abnormalRecoveryQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond) - if len(res) == 0 { - klog.Errorf("no results returned for query:%s", abnormalRecoveryQL) - continue - } - if err != nil { - klog.Errorf("downtimeNodes InstantQuery NodeName To IPs empty err:%v", err) - klog.Infof("clusterName:%v\n AbnormalInfoSystemQL:%v, err:%v", clusterName, abnormalRecoveryQL, err) - continue - } - str := "" - for _, v := range res { - str = string(v.Metric["instance"]) - } - WithDownNodeIPs[node] = str - } - msg = fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机恢复临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs)) + moduleName := 0 + abnormalRecoveryQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalRecoveryQL[moduleName], nodeName) write := db.NodeDataInfo{ NodeName: nodeName, NodeIP: nodeIP, ClusterName: clusterName, ModuleName: NODE_DOWN, - DownTimeRecoveryQL: h.cc.NodeDownTime.AbnormalRecoveryQL, + DownTimeRecoveryQL: abnormalRecoveryQL, } exist, _ := write.Check() if exist { diff --git a/app/horus/core/horuser/node_recovery.go b/app/horus/core/horuser/node_recovery.go index 5f588ec2d..9c82dd022 100644 --- a/app/horus/core/horuser/node_recovery.go +++ b/app/horus/core/horuser/node_recovery.go @@ -23,7 +23,6 @@ import ( "github.com/gammazero/workerpool" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" - "strings" "time" ) @@ -34,12 +33,12 @@ func (h *Horuser) RecoveryManager(ctx context.Context) error { } func (h *Horuser) DownTimeRecoveryManager(ctx context.Context) error { - go wait.UntilWithContext(ctx, h.recoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second) + go wait.UntilWithContext(ctx, h.downTimeRecoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second) <-ctx.Done() return nil } -func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) { +func (h *Horuser) recoveryCheck(ctx context.Context) { data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber) if err != nil { klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err) @@ -54,21 +53,21 @@ func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) { d := d wp.Submit(func() { h.recoveryNodes(d) - h.downTimeRecoveryNodes(d) + }) } wp.StopWait() } -func (h *Horuser) recoveryCheck(ctx context.Context) { - data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber) +func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) { + data, err := db.GetDownTimeRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber) if err != nil { - klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err) + klog.Errorf("recovery check GetDownTimeRecoveryNodeDataInfoDate err:%v", err) return } if len(data) == 0 { - klog.Info("recovery check GetRecoveryNodeDataInfoDate zero.") + klog.Info("recovery check GetDownTimeRecoveryNodeDataInfoDate zero.") return } wp := workerpool.New(50) @@ -77,7 +76,6 @@ func (h *Horuser) recoveryCheck(ctx context.Context) { wp.Submit(func() { h.downTimeRecoveryNodes(d) }) - } wp.StopWait() } @@ -126,40 +124,46 @@ func (h *Horuser) recoveryNodes(n db.NodeDataInfo) { func (h *Horuser) downTimeRecoveryNodes(n db.NodeDataInfo) { promAddr := h.cc.PromMultiple[n.ClusterName] if promAddr == "" { - klog.Error("recoveryNodes promAddr by clusterName empty.") + klog.Error("downTimeRecoveryNodes promAddr by clusterName empty.") klog.Infof("clusterName:%v nodeName:%v", n.ClusterName, n.NodeName) return } - vecs, err := h.InstantQuery(promAddr, strings.Join(n.DownTimeRecoveryQL, " "), n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond) + rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL) + vecs, err := h.InstantQuery(promAddr, n.DownTimeRecoveryQL, n.ClusterName, h.cc.NodeRecovery.PromQueryTimeSecond) if err != nil { - klog.Errorf("recoveryNodes InstantQuery err:%v", err) - klog.Infof("downTimeRecoveryQL:%v", n.DownTimeRecoveryQL) + klog.Errorf("downTimeRecoveryNodes InstantQuery err:%v", err) + klog.Infof("DownTimeRecoveryQL:%v", n.DownTimeRecoveryQL) return } if len(vecs) != 1 { klog.Infof("Expected 1 result, but got:%d", len(vecs)) return } + if len(vecs) > rq { + klog.Error("downTimeRecoveryNodes not reach threshold") + } if err != nil { - klog.Errorf("recoveryNodes InstantQuery err:%v", err) - klog.Infof("recoveryQL:%v", n.DownTimeRecoveryQL) + klog.Errorf("downTimeRecoveryNodes InstantQuery err:%v", err) + klog.Infof("DownTimeRecoveryQL:%v", n.DownTimeRecoveryQL) return } klog.Info("recoveryNodes InstantQuery success.") - err = h.UnCordon(n.NodeName, n.ClusterName) - res := "Success" - if err != nil { - res = fmt.Sprintf("result failed:%v", err) - } - msg := fmt.Sprintf("\n【集群: %v】\n【宕机节点已达到恢复临界点】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n", n.ClusterName, n.NodeName, res, n.CreateTime) - alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg) - alerter.SlackSend(h.cc.NodeDownTime.Slack, msg) + if len(vecs) == rq { + err = h.UnCordon(n.NodeName, n.ClusterName) + res := "Success" + if err != nil { + res = fmt.Sprintf("result failed:%v", err) + } + msg := fmt.Sprintf("\n【集群: %v】\n【封锁节点恢复调度】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n", n.ClusterName, n.NodeName, res, n.CreateTime) + alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg) + alerter.SlackSend(h.cc.NodeDownTime.Slack, msg) - success, err := n.DownTimeRecoveryMarker() - if err != nil { - klog.Errorf("DownTimeRecoveryMarker result failed err:%v", err) - return + success, err := n.DownTimeRecoveryMarker() + if err != nil { + klog.Errorf("DownTimeRecoveryMarker result failed err:%v", err) + return + } + klog.Infof("DownTimeRecoveryMarker result success:%v", success) } - klog.Infof("DownTimeRecoveryMarker result success:%v", success) } diff --git a/app/horus/core/horuser/node_restart.go b/app/horus/core/horuser/node_restart.go index e48ab8e4b..db6adcd93 100644 --- a/app/horus/core/horuser/node_restart.go +++ b/app/horus/core/horuser/node_restart.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "os/exec" + "strings" "time" ) @@ -79,13 +80,38 @@ func (h *Horuser) TryRestart(node db.NodeDataInfo) { return } klog.Infof("Successfully restarted node %v.", node.NodeName) + + rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL) + query := strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " and ") + vecs, err := h.InstantQuery(h.cc.PromMultiple[node.ClusterName], query, node.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond) + if err != nil { + klog.Errorf("Failed to query Prometheus for recovery threshold after restart: %v", err) + return + } + + if len(vecs) == rq { + klog.Infof("Node %v has reached recovery threshold after restart.", node.NodeName) + + err = h.UnCordon(node.NodeName, node.ClusterName) + if err != nil { + klog.Errorf("Uncordon node failed after restart: %v", err) + return + } + + msg = fmt.Sprintf("\n【集群: %v】\n【宕机节点已恢复】\n【恢复节点: %v】\n【处理结果:成功】\n【日期: %v】\n", node.ClusterName, node.NodeName, node.FirstDate) + alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg) + alerter.SlackSend(h.cc.NodeDownTime.Slack, msg) + + } else { + klog.Infof("Node %v has not reached recovery threshold after restart.", node.NodeName) + } + } else { klog.Infof("RestartMarker did not success for node %v", node.NodeName) } if node.Restart > 2 { - klog.Error("It's been rebooted once.") + klog.Error("The node has already been rebooted more than twice.") return } - } diff --git a/manifests/horus/horus.yaml b/manifests/horus/horus.yaml index 02ce3bc02..8c3625406 100644 --- a/manifests/horus/horus.yaml +++ b/manifests/horus/horus.yaml @@ -69,7 +69,7 @@ nodeDownTime: promQueryTimeSecond: 60 abnormalityQL: - 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 20 - - (avg by (node) (node_memory_MemFree_bytes / node_memory_MemTotal_bytes )) * 100 > 25 + - (avg by (node) (node_memory_MemFree_bytes / node_memory_MemTotal_bytes )) * 100 > 50 # - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 < 15 abnormalInfoSystemQL: node_os_info{node="%s"} @@ -77,7 +77,7 @@ nodeDownTime: allSystemPassword: "1" abnormalRecoveryQL: - 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle",node="%s"}[5m])) * 100) < 20 - - (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 < 25 + - (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 < 30 # - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 > 15 kubeMultiple: cluster: config.1