Skip to content

Commit

Permalink
[horus] Node critical point recovery test completed
Browse files Browse the repository at this point in the history
  • Loading branch information
mfordjody committed Oct 11, 2024
1 parent 8225b73 commit 48e9e0d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 39 deletions.
2 changes: 1 addition & 1 deletion app/horus/core/horuser/horuser.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewHoruser(c *config.Config) *Horuser {
km := clientset.NewForConfigOrDie(kcfg)
hr.kubeClientMap[clusterName] = km
klog.Info("NewHoruser k8sBuildConfig success.")
klog.Infof("Count:[%d/%d] KubeMultipleCluster:%v", n, i, clusterName)
klog.Infof("[KubeMultipleCluster:%v\n] Count:[%d/%d]", clusterName, n, i)
i++
}
return hr
Expand Down
15 changes: 11 additions & 4 deletions app/horus/core/horuser/node_downtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -98,7 +99,6 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
WithDownNodeIPs := make(map[string]string)

for node, count := range nodeDownTimeRes {
klog.Infof("Count:%v", count)
if count < aq {
klog.Error("downtimeNodes not reach threshold.")
klog.Infof("clusterName:%v nodeName:%v threshold:%v count:%v", clusterName, node, aq, count)
Expand Down Expand Up @@ -148,10 +148,17 @@ func (h *Horuser) DownTimeNodes(clusterName, addr string) {
return "", nil
}()

moduleName := 0
abnormalRecoveryQL := []string{}
for _, ql := range h.cc.NodeDownTime.AbnormalRecoveryQL {
count := strings.Count(ql, "%s")

abnormalRecoveryQL := []string{
fmt.Sprintf(h.cc.NodeDownTime.AbnormalRecoveryQL[moduleName], nodeName),
p := make([]interface{}, count)
for i := 0; i < count; i++ {
p[i] = nodeName
}

query := fmt.Sprintf(ql, p...)
abnormalRecoveryQL = append(abnormalRecoveryQL, query)
}

write := db.NodeDataInfo{
Expand Down
3 changes: 1 addition & 2 deletions app/horus/core/horuser/node_drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func (h *Horuser) Drain(nodeName, clusterName string) (err error) {
ctxFirst, cancelFirst := h.GetK8sContext()
defer cancelFirst()
listOpts := v1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)}
var podNamespace string
pod, err := kubeClient.CoreV1().Pods(podNamespace).List(ctxFirst, listOpts)
pod, err := kubeClient.CoreV1().Pods("").List(ctxFirst, listOpts)
if err != nil {
klog.Errorf("node Drain err:%v", err)
klog.Infof("nodeName:%v\n clusterName:%v\n", nodeName, clusterName)
Expand Down
64 changes: 34 additions & 30 deletions app/horus/core/horuser/node_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,51 +129,55 @@ func (h *Horuser) downTimeRecoveryNodes(n db.NodeDataInfo) {
return
}
rq := len(h.cc.NodeDownTime.AbnormalRecoveryQL)
for _, ql := range n.DownTimeRecoveryQL { // 遍历每个 PromQL 查询
counter := 0
for _, ql := range n.DownTimeRecoveryQL {
vecs, err := h.InstantQuery(promAddr, ql, n.ClusterName, h.cc.NodeRecovery.PromQueryTimeSecond)
if err != nil {
klog.Errorf("downTimeRecoveryNodes InstantQuery err: %v", err)
klog.Infof("DownTimeRecoveryQL: %v", ql) // 记录出错的查询
continue // 继续处理下一个查询
klog.Infof("DownTimeRecoveryQL: %v", ql)
continue
}

// 处理查询结果
if len(vecs) == 0 {
klog.Infof("No results for query: %v", ql)
continue
} else {
klog.Infof("Query successful for: %v", ql)
counter++
klog.Infof("Counter: %v", counter)
}

// 在这里处理查询成功的逻辑,比如检查结果并采取相应的操作
klog.Infof("Query successful for: %v", ql)

if len(vecs) != rq {
klog.Infof("Expected %d results, but got: %d", rq, len(vecs))
if len(vecs) > rq {
klog.Error("downTimeRecoveryNodes did not reach threshold")
}
return
if counter != rq {
klog.Infof("Expected %d results, but got: %d", rq, counter)
continue
}

err = h.UnCordon(n.NodeName, n.ClusterName)
res := "Success"
if err != nil {
res = fmt.Sprintf("result failed: %v", err)
if counter > rq {
klog.Error("downTimeRecoveryNodes did not reach threshold.")
continue
}

msg := fmt.Sprintf("\n【集群: %v】\n【封锁节点恢复调度】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n",
n.ClusterName, n.NodeName, res, n.CreateTime)
if counter == rq {
klog.Info("Reaching the downtime recovery threshold.")
err = h.UnCordon(n.NodeName, n.ClusterName)
res := "Success"
if err != nil {
res = fmt.Sprintf("result failed: %v", err)
}

alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)
msg := fmt.Sprintf("\n【集群: %v】\n【封锁宕机节点恢复调度】\n【已恢复调度节点: %v】\n【处理结果:%v】\n【日期: %v】\n",
n.ClusterName, n.NodeName, res, n.CreateTime)

success, err := n.DownTimeRecoveryMarker()
if err != nil {
klog.Errorf("DownTimeRecoveryMarker result failed: %v", err)
return
}
klog.Infof("DownTimeRecoveryMarker result success: %v", success)
alerter.DingTalkSend(h.cc.NodeDownTime.DingTalk, msg)
alerter.SlackSend(h.cc.NodeDownTime.Slack, msg)

// 查询操作成功日志
klog.Info("recoveryNodes InstantQuery success.")
success, err := n.DownTimeRecoveryMarker()
if err != nil {
klog.Errorf("DownTimeRecoveryMarker result failed: %v", err)
return
}
klog.Infof("DownTimeRecoveryMarker result success: %v", success)

klog.Info("recoveryNodes InstantQuery success.")
}
}
}
4 changes: 2 additions & 2 deletions manifests/horus/horus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ nodeDownTime:
promQueryTimeSecond: 60
abnormalityQL:
- 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 15
# - avg(node_memory_MemFree_bytes) by (node) < 20
- node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100 < 16
# - avg(node_memory_MemFree_bytes) by (node) < 20
abnormalInfoSystemQL:
node_os_info{node="%s"}
allSystemUser: "zxj"
allSystemPassword: "1"
abnormalRecoveryQL:
- 100 - (avg by (node) (rate(node_cpu_seconds_total{mode="idle",node="%s"}[5m])) * 100) < 15
# - (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 > 50
- node_filesystem_avail_bytes{mountpoint="/",node="%s"} / node_filesystem_size_bytes{mountpoint="/",node="%s"} * 100 > 15
# - (avg by (node) (node_memory_MemFree_bytes{node="%s"} / node_memory_MemTotal_bytes{node="%s"} )) * 100 > 50
kubeMultiple:
cluster: config.1
dingTalk:
Expand Down

0 comments on commit 48e9e0d

Please sign in to comment.