diff --git a/app/horus/base/db/db.go b/app/horus/base/db/db.go index b1afe246..0408b0ac 100644 --- a/app/horus/base/db/db.go +++ b/app/horus/base/db/db.go @@ -41,7 +41,7 @@ type NodeDataInfo struct { RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"` RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"` DownTimeRecoveryMark int64 `json:"downtime_recovery_mark" xorm:"downtime_recovery_mark"` - DownTimeRecoveryQL string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"` + DownTimeRecoveryQL []string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"` } type PodDataInfo struct { diff --git a/app/horus/core/horuser/node_downtime.go b/app/horus/core/horuser/node_downtime.go index c1ce6bd1..8a4a7ef8 100644 --- a/app/horus/core/horuser/node_downtime.go +++ b/app/horus/core/horuser/node_downtime.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" + "strings" "sync" "time" ) @@ -171,13 +172,30 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) { klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count) continue } + abnormalRecoveryQL := fmt.Sprintf(strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " "), node) + res, err := h.InstantQuery(addr, abnormalRecoveryQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond) + if len(res) == 0 { + klog.Errorf("no results returned for query:%s", abnormalRecoveryQL) + continue + } + if err != nil { + klog.Errorf("downtimeNodes InstantQuery NodeName To IPs empty err:%v", err) + klog.Infof("clusterName:%v\n AbnormalInfoSystemQL:%v, err:%v", clusterName, abnormalRecoveryQL, err) + continue + } + str := "" + for _, v := range res { + str = string(v.Metric["instance"]) + } + WithDownNodeIPs[node] = str } - + msg = fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机恢复临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs)) write := db.NodeDataInfo{ - NodeName: nodeName, - NodeIP: nodeIP, - ClusterName: clusterName, - ModuleName: NODE_DOWN, + NodeName: nodeName, + NodeIP: nodeIP, + ClusterName: clusterName, + ModuleName: NODE_DOWN, + DownTimeRecoveryQL: h.cc.NodeDownTime.AbnormalRecoveryQL, } exist, _ := write.Check() if exist { diff --git a/app/horus/core/horuser/node_recovery.go b/app/horus/core/horuser/node_recovery.go index 91650618..5f588ec2 100644 --- a/app/horus/core/horuser/node_recovery.go +++ b/app/horus/core/horuser/node_recovery.go @@ -23,6 +23,7 @@ import ( "github.com/gammazero/workerpool" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + "strings" "time" ) @@ -32,7 +33,13 @@ func (h *Horuser) RecoveryManager(ctx context.Context) error { return nil } -func (h *Horuser) recoveryCheck(ctx context.Context) { +func (h *Horuser) DownTimeRecoveryManager(ctx context.Context) error { + go wait.UntilWithContext(ctx, h.recoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second) + <-ctx.Done() + return nil +} + +func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) { data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber) if err != nil { klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err) @@ -47,6 +54,28 @@ func (h *Horuser) recoveryCheck(ctx context.Context) { d := d wp.Submit(func() { h.recoveryNodes(d) + h.downTimeRecoveryNodes(d) + }) + + } + wp.StopWait() +} + +func (h *Horuser) recoveryCheck(ctx context.Context) { + data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber) + if err != nil { + klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err) + return + } + if len(data) == 0 { + klog.Info("recovery check GetRecoveryNodeDataInfoDate zero.") + return + } + wp := workerpool.New(50) + for _, d := range data { + d := d + wp.Submit(func() { + h.downTimeRecoveryNodes(d) }) } @@ -94,17 +123,17 @@ func (h *Horuser) recoveryNodes(n db.NodeDataInfo) { klog.Infof("RecoveryMarker result success:%v", success) } -func (h *Horuser) DownTimeRecoveryNodes(n db.NodeDataInfo) { +func (h *Horuser) downTimeRecoveryNodes(n db.NodeDataInfo) { promAddr := h.cc.PromMultiple[n.ClusterName] if promAddr == "" { klog.Error("recoveryNodes promAddr by clusterName empty.") klog.Infof("clusterName:%v nodeName:%v", n.ClusterName, n.NodeName) return } - vecs, err := h.InstantQuery(promAddr, n.DownTimeRecoveryQL, n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond) + vecs, err := h.InstantQuery(promAddr, strings.Join(n.DownTimeRecoveryQL, " "), n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond) if err != nil { klog.Errorf("recoveryNodes InstantQuery err:%v", err) - klog.Infof("recoveryQL:%v", n.DownTimeRecoveryQL) + klog.Infof("downTimeRecoveryQL:%v", n.DownTimeRecoveryQL) return } if len(vecs) != 1 { diff --git a/app/horus/core/horuser/node_restart.go b/app/horus/core/horuser/node_restart.go index b5b08faa..e48ab8e4 100644 --- a/app/horus/core/horuser/node_restart.go +++ b/app/horus/core/horuser/node_restart.go @@ -87,4 +87,5 @@ func (h *Horuser) TryRestart(node db.NodeDataInfo) { klog.Error("It's been rebooted once.") return } + }