Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[horus] Building downtime restart recovery logic #447

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/horus/base/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type NodeDataInfo struct {
RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"`
RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"`
DownTimeRecoveryMark int64 `json:"downtime_recovery_mark" xorm:"downtime_recovery_mark"`
DownTimeRecoveryQL []string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"`
DownTimeRecoveryQL string `json:"downtime_recovery_ql" xorm:"downtime_recovery_ql"`
}

type PodDataInfo struct {
Expand Down
11 changes: 11 additions & 0 deletions app/horus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ func main() {
}
return nil
})
group.Add(func() error {
if c.NodeDownTime.Enabled {
klog.Info("horus node downtime recovery manager start success.")
err := horus.DownTimeRecoveryManager(ctx)
if err != nil {
klog.Errorf("horus node downtime recovery manager start failed err:%v", err)
return err
}
}
return nil
})
group.Add(func() error {
if c.PodStagnationCleaner.Enabled {
klog.Info("horus pod stagnation clean manager start success.")
Expand Down
64 changes: 11 additions & 53 deletions app/horus/core/horuser/node_downtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -73,29 +72,8 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {

nodeDownTimeRes := make(map[string]int)
aq := len(h.cc.NodeDownTime.AbnormalityQL)
rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
for _, ql := range h.cc.NodeDownTime.AbnormalityQL {
ql := ql
res, err := h.InstantQuery(addr, ql, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
klog.Errorf("downtimeNodes InstantQuery err:%v", err)
klog.Infof("clusterName:%v\n", clusterName)
continue
}

for _, v := range res {
v := v
nodeName := string(v.Metric["node"])
if nodeName == "" {
klog.Error("downtimeNodes InstantQuery nodeName empty.")
klog.Infof("clusterName:%v\n metric:%v\n", clusterName, v.Metric)
continue
}
nodeDownTimeRes[nodeName]++
}
}

for _, ql := range h.cc.NodeDownTime.AbnormalRecoveryQL {
for _, ql := range h.cc.NodeDownTime.AbnormalityQL {
ql := ql
res, err := h.InstantQuery(addr, ql, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
Expand All @@ -118,13 +96,14 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {

WithDownNodeIPs := make(map[string]string)

for node, count := range nodeDownTimeRes {
for nodeName, count := range nodeDownTimeRes {
if count < aq {
klog.Error("downtimeNodes not reach threshold")
klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count)
klog.Error("downtimeNodes not reach threshold.")
klog.Infof("clusterName:%v nodeName:%v threshold:%v count:%v", clusterName, nodeName, aq, count)
continue
}
abnormalInfoSystemQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalInfoSystemQL, node)
abnormalInfoSystemQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalInfoSystemQL, nodeName)

res, err := h.InstantQuery(addr, abnormalInfoSystemQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if len(res) == 0 {
klog.Errorf("no results returned for query:%s", abnormalInfoSystemQL)
Expand All @@ -139,7 +118,7 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
for _, v := range res {
str = string(v.Metric["instance"])
}
WithDownNodeIPs[node] = str
WithDownNodeIPs[nodeName] = str
}

msg := fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs))
Expand All @@ -154,6 +133,7 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
return
}
klog.Info("Cordon node success.")

klog.Infof("clusterName:%v\n nodeName:%v\n", clusterName, nodeName)

node, err := kubeClient.CoreV1().Nodes().Get(ctxFirst, nodeName, metav1.GetOptions{})
Expand All @@ -166,36 +146,14 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
return "", nil
}()

for node, count := range nodeDownTimeRes {
if count < rq {
klog.Error("downtimeNodes not reach recovery threshold")
klog.Infof("clusterName:%v\n nodeName:%v\n threshold:%v count:%v", clusterName, node, aq, count)
continue
}
abnormalRecoveryQL := fmt.Sprintf(strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " "), node)
res, err := h.InstantQuery(addr, abnormalRecoveryQL, clusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if len(res) == 0 {
klog.Errorf("no results returned for query:%s", abnormalRecoveryQL)
continue
}
if err != nil {
klog.Errorf("downtimeNodes InstantQuery NodeName To IPs empty err:%v", err)
klog.Infof("clusterName:%v\n AbnormalInfoSystemQL:%v, err:%v", clusterName, abnormalRecoveryQL, err)
continue
}
str := ""
for _, v := range res {
str = string(v.Metric["instance"])
}
WithDownNodeIPs[node] = str
}
msg = fmt.Sprintf("\n【%s】\n【集群:%v】\n【已达到宕机恢复临界点:%v】", h.cc.NodeDownTime.DingTalk.Title, clusterName, len(WithDownNodeIPs))
moduleName := 0
abnormalRecoveryQL := fmt.Sprintf(h.cc.NodeDownTime.AbnormalRecoveryQL[moduleName], nodeName)
write := db.NodeDataInfo{
NodeName: nodeName,
NodeIP: nodeIP,
ClusterName: clusterName,
ModuleName: NODE_DOWN,
DownTimeRecoveryQL: h.cc.NodeDownTime.AbnormalRecoveryQL,
DownTimeRecoveryQL: abnormalRecoveryQL,
}
exist, _ := write.Check()
if exist {
Expand Down
60 changes: 32 additions & 28 deletions app/horus/core/horuser/node_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/gammazero/workerpool"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"strings"
"time"
)

Expand All @@ -34,12 +33,12 @@ func (h *Horuser) RecoveryManager(ctx context.Context) error {
}

func (h *Horuser) DownTimeRecoveryManager(ctx context.Context) error {
go wait.UntilWithContext(ctx, h.recoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second)
go wait.UntilWithContext(ctx, h.downTimeRecoveryCheck, time.Duration(h.cc.NodeRecovery.IntervalSecond)*time.Second)
<-ctx.Done()
return nil
}

func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
func (h *Horuser) recoveryCheck(ctx context.Context) {
data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
if err != nil {
klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err)
Expand All @@ -54,21 +53,21 @@ func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
d := d
wp.Submit(func() {
h.recoveryNodes(d)
h.downTimeRecoveryNodes(d)

})

}
wp.StopWait()
}

func (h *Horuser) recoveryCheck(ctx context.Context) {
data, err := db.GetRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
func (h *Horuser) downTimeRecoveryCheck(ctx context.Context) {
data, err := db.GetDownTimeRecoveryNodeDataInfoDate(h.cc.NodeRecovery.DayNumber)
if err != nil {
klog.Errorf("recovery check GetRecoveryNodeDataInfoDate err:%v", err)
klog.Errorf("recovery check GetDownTimeRecoveryNodeDataInfoDate err:%v", err)
return
}
if len(data) == 0 {
klog.Info("recovery check GetRecoveryNodeDataInfoDate zero.")
klog.Info("recovery check GetDownTimeRecoveryNodeDataInfoDate zero.")
return
}
wp := workerpool.New(50)
Expand All @@ -77,7 +76,6 @@ func (h *Horuser) recoveryCheck(ctx context.Context) {
wp.Submit(func() {
h.downTimeRecoveryNodes(d)
})

}
wp.StopWait()
}
Expand Down Expand Up @@ -126,40 +124,46 @@ func (h *Horuser) recoveryNodes(n db.NodeDataInfo) {
func (h *Horuser) downTimeRecoveryNodes(n db.NodeDataInfo) {
promAddr := h.cc.PromMultiple[n.ClusterName]
if promAddr == "" {
klog.Error("recoveryNodes promAddr by clusterName empty.")
klog.Error("downTimeRecoveryNodes promAddr by clusterName empty.")
klog.Infof("clusterName:%v nodeName:%v", n.ClusterName, n.NodeName)
return
}
vecs, err := h.InstantQuery(promAddr, strings.Join(n.DownTimeRecoveryQL, " "), n.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
vecs, err := h.InstantQuery(promAddr, n.DownTimeRecoveryQL, n.ClusterName, h.cc.NodeRecovery.PromQueryTimeSecond)
if err != nil {
klog.Errorf("recoveryNodes InstantQuery err:%v", err)
klog.Infof("downTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
klog.Errorf("downTimeRecoveryNodes InstantQuery err:%v", err)
klog.Infof("DownTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
return
}
if len(vecs) != 1 {
klog.Infof("Expected 1 result, but got:%d", len(vecs))
return
}
if len(vecs) > rq {
klog.Error("downTimeRecoveryNodes not reach threshold")
}
if err != nil {
klog.Errorf("recoveryNodes InstantQuery err:%v", err)
klog.Infof("recoveryQL:%v", n.DownTimeRecoveryQL)
klog.Errorf("downTimeRecoveryNodes InstantQuery err:%v", err)
klog.Infof("DownTimeRecoveryQL:%v", n.DownTimeRecoveryQL)
return
}
klog.Info("recoveryNodes InstantQuery success.")

err = h.UnCordon(n.NodeName, n.ClusterName)
res := "Success"
if err != nil {
res = fmt.Sprintf("result failed:%v", err)
}
msg := fmt.Sprintf("\n【集群: %v】\n【宕机节点已达到恢复临界点】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n", n.ClusterName, n.NodeName, res, n.CreateTime)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)
if len(vecs) == rq {
err = h.UnCordon(n.NodeName, n.ClusterName)
res := "Success"
if err != nil {
res = fmt.Sprintf("result failed:%v", err)
}
msg := fmt.Sprintf("\n【集群: %v】\n【封锁节点恢复调度】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n", n.ClusterName, n.NodeName, res, n.CreateTime)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)

success, err := n.DownTimeRecoveryMarker()
if err != nil {
klog.Errorf("DownTimeRecoveryMarker result failed err:%v", err)
return
success, err := n.DownTimeRecoveryMarker()
if err != nil {
klog.Errorf("DownTimeRecoveryMarker result failed err:%v", err)
return
}
klog.Infof("DownTimeRecoveryMarker result success:%v", success)
}
klog.Infof("DownTimeRecoveryMarker result success:%v", success)
}
30 changes: 28 additions & 2 deletions app/horus/core/horuser/node_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"os/exec"
"strings"
"time"
)

Expand Down Expand Up @@ -79,13 +80,38 @@ func (h *Horuser) TryRestart(node db.NodeDataInfo) {
return
}
klog.Infof("Successfully restarted node %v.", node.NodeName)

rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
query := strings.Join(h.cc.NodeDownTime.AbnormalRecoveryQL, " and ")
vecs, err := h.InstantQuery(h.cc.PromMultiple[node.ClusterName], query, node.ClusterName, h.cc.NodeDownTime.PromQueryTimeSecond)
if err != nil {
klog.Errorf("Failed to query Prometheus for recovery threshold after restart: %v", err)
return
}

if len(vecs) == rq {
klog.Infof("Node %v has reached recovery threshold after restart.", node.NodeName)

err = h.UnCordon(node.NodeName, node.ClusterName)
if err != nil {
klog.Errorf("Uncordon node failed after restart: %v", err)
return
}

msg = fmt.Sprintf("\n【集群: %v】\n【宕机节点已恢复】\n【恢复节点: %v】\n【处理结果:成功】\n【日期: %v】\n", node.ClusterName, node.NodeName, node.FirstDate)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)

} else {
klog.Infof("Node %v has not reached recovery threshold after restart.", node.NodeName)
}

} else {
klog.Infof("RestartMarker did not success for node %v", node.NodeName)
}

if node.Restart > 2 {
klog.Error("It's been rebooted once.")
klog.Error("The node has already been rebooted more than twice.")
return
}

}
4 changes: 2 additions & 2 deletions manifests/horus/horus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ nodeDownTime:
promQueryTimeSecond: 60
abnormalityQL:
- 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 20
- (avg by (node) (node_memory_MemFree_bytes / node_memory_MemTotal_bytes )) * 100 > 25
- (avg by (node) (node_memory_MemFree_bytes / node_memory_MemTotal_bytes )) * 100 > 50
# - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 < 15
abnormalInfoSystemQL:
node_os_info{node="%s"}
allSystemUser: "zxj"
allSystemPassword: "1"
abnormalRecoveryQL:
- 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle",node="%s"}[5m])) * 100) < 20
- (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 < 25
- (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 < 30
# - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 > 15
kubeMultiple:
cluster: config.1
Expand Down
Loading