Skip to content

Commit

Permalink
1.检测pgw存活由轮询改为Watch
Browse files Browse the repository at this point in the history
2.修复加载配置文件的bug
3.调整节点注册和初始化哈希环顺序
4.标准化import包顺序
  • Loading branch information
ningyangyang committed Aug 27, 2020
1 parent 3f2a23e commit 66da0bb
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 80 deletions.
4 changes: 3 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package config

import (
"gopkg.in/yaml.v2"
"io/ioutil"

"gopkg.in/yaml.v2"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/kit/log"
)
Expand Down Expand Up @@ -44,5 +45,6 @@ func LoadFile(filename string, logger log.Logger) (*Config, error) {
if err != nil {
level.Error(logger).Log("msg", "parsing YAML file errr...", "error", err)
}

return cfg, nil
}
47 changes: 27 additions & 20 deletions pkg/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package main

import (
"gopkg.in/alecthomas/kingpin.v2"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus/common/version"
"context"
"github.com/oklog/run"
"fmt"
"os"
"os/signal"
"context"
"syscall"
"github.com/go-kit/kit/log/level"
"dynamic-sharding/pkg/web"

"gopkg.in/alecthomas/kingpin.v2"
"github.com/gin-gonic/gin"
"github.com/oklog/run"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus/common/version"

"dynamic-sharding/pkg/config"
"dynamic-sharding/pkg/web"
"dynamic-sharding/pkg/sd"
"fmt"
)

func main() {
Expand All @@ -34,8 +36,11 @@ func main() {

// new grpc manager
ctxAll, cancelAll := context.WithCancel(context.Background())
sc, _ := config.LoadFile(*configFile, logger)

sc, err := config.LoadFile(*configFile, logger)
if err != nil {
level.Error(logger).Log("msg", "config.LoadFil Error, exiting ...", "error", err)
return
}
// init consul client
client, err := sd.NewConsulClient(sc.ConsulServer.Addr, logger)

Expand All @@ -44,18 +49,20 @@ func main() {
return
}

// register service
errors := sd.RegisterFromFile(client, sc.PGW.Servers, sc.ConsulServer.RegisterServiceName, sc.PGW.Port)
if len(errors) > 0 {
level.Error(logger).Log("msg", "RegisterFromFile Error", "error", errors)
}
// init node hash ring
var ss []string
for _, i := range sc.PGW.Servers {
ss = append(ss, fmt.Sprintf("%s:%d", i, sc.PGW.Port))
}

sd.NewConsistentHashNodesRing(ss)

// register service
errors := sd.RegisterFromFile(client, sc.PGW.Servers, sc.ConsulServer.RegisterServiceName, sc.PGW.Port)
if len(errors) > 0 {
level.Error(logger).Log("msg", "RegisterFromFile Error", "error", errors)
}

var g run.Group
{
// Termination handler.
Expand Down Expand Up @@ -89,7 +96,7 @@ func main() {
level.Info(logger).Log("msg", "start web service Listening on address", "address", sc.HttpListenAddr)
gin.SetMode(gin.ReleaseMode)
routes := gin.Default()
errchan := make(chan error)
errchan := make(chan error, 1)

go func() {
errchan <- web.StartGin(sc.HttpListenAddr, routes)
Expand All @@ -110,11 +117,11 @@ func main() {
}

{
// service discovery manager.
// WatchService manager.
g.Add(func() error {
err := client.RunRefreshServiceNode(ctxAll, sc.ConsulServer.RegisterServiceName)
err := client.RunRefreshServiceNode(ctxAll, sc.ConsulServer.RegisterServiceName, sc.ConsulServer.Addr)
if err != nil {
level.Error(logger).Log("msg", "service discovery error", "error", err)
level.Error(logger).Log("msg", "watchService_error", "error", err)
}
return err
}, func(err error) {
Expand Down
14 changes: 9 additions & 5 deletions pkg/sd/rings.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package sd

import (
"dynamic-sharding/pkg/consistent"
"sync"
"sort"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"context"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"

"dynamic-sharding/pkg/consistent"
)

const numberOfReplicas = 500
Expand Down Expand Up @@ -81,6 +83,8 @@ func StringSliceEqualBCE(a, b []string) bool {
}

func RunReshardHashRing(ctx context.Context, logger log.Logger) {

level.Info(logger).Log("msg", "RunRefreshServiceNode start....")
for {
select {
case nodes := <-NodeUpdateChan:
Expand All @@ -91,10 +95,10 @@ func RunReshardHashRing(ctx context.Context, logger log.Logger) {
sort.Strings(oldNodes)
isEq := StringSliceEqualBCE(nodes, oldNodes)
if isEq == false {
level.Info(logger).Log("msg", "RunReshardHashRing_node_update_reshard", "oldnodes", strings.Join(oldNodes, ","), "newnodes", strings.Join(nodes, ","), )
level.Info(logger).Log("msg", "RunReshardHashRing_node_update_reshard", "old_num", len(oldNodes), "new_num", len(nodes), "oldnodes", strings.Join(oldNodes, ","), "newnodes", strings.Join(nodes, ","), )
PgwNodeRing.ReShardRing(nodes)
} else {
level.Debug(logger).Log("msg", "RunReshardHashRing_node_same", "nodes", strings.Join(nodes, ","))
level.Info(logger).Log("msg", "RunReshardHashRing_node_same", "nodes", strings.Join(nodes, ","))

}
case <-ctx.Done():
Expand Down
135 changes: 87 additions & 48 deletions pkg/sd/sd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ package sd

import (
"fmt"
"context"
"strings"

consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
time "time"
"context"
)

const GetServiceInterval = time.Second * 5

type client struct {
consul *consul.Client
logger log.Logger
}

type Client interface {
// Get a Service from consul
GetService(string, string) ([]string, error)
//GetService(string, string) ([]string, error)
// register a service with local agent
ServiceRegister(string, string, int) error
// Deregister a service with local agent
Expand All @@ -36,16 +36,15 @@ func NewConsulClient(addr string, logger log.Logger) (*client, error) {
}

// Register a service with consul local agent
func (c *client) ServiceRegister(srv_name, srv_host string, srv_port int) error {
func (c *client) ServiceRegister(srvName, srvHost string, srvPort int) error {

reg := new(consul.AgentServiceRegistration)
reg.Name = srv_name
reg.Name = srvName

thisId := fmt.Sprintf("%s_%d", srv_host, srv_port)
thisId := fmt.Sprintf("%s_%d", srvHost, srvPort)
reg.ID = thisId
reg.Port = srv_port
//registration.Tags = []string{"user-tomcat"}
reg.Address = srv_host
reg.Port = srvPort
reg.Address = srvHost
level.Info(c.logger).Log("msg", "ServiceRegisterStart", "id", thisId)
//增加check
check := new(consul.AgentServiceCheck)
Expand All @@ -66,57 +65,97 @@ func (c *client) DeRegister(id string) error {
return c.consul.Agent().ServiceDeregister(id)
}

// Service return a service
func (c *client) GetService(service, tag string) ([]string, error) {
passingOnly := true
addrs, _, err := c.consul.Health().Service(service, tag, passingOnly, nil)
if len(addrs) == 0 && err == nil {
return nil, fmt.Errorf("service ( %s ) was not found", service)
}

if err != nil {
return nil, err
}
var hs []string

for _, a := range addrs {

hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port))
}
if len(hs) > 0 {
NodeUpdateChan <- hs
}

return hs, nil
}

func RegisterFromFile(c *client, servers []string, srv_name string, srv_port int) (errors []error) {
//// Service return a service
//func (c *client) GetService(service, tag string) ([]string, error) {
// passingOnly := true
// addrs, _, err := c.consul.Health().Service(service, tag, passingOnly, nil)
// if len(addrs) == 0 && err == nil {
// return nil, fmt.Errorf("service ( %s ) was not found", service)
// }
//
// if err != nil {
// return nil, err
// }
// var hs []string
//
// for _, a := range addrs {
//
// hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port))
// }
// if len(hs) > 0 {
// NodeUpdateChan <- hs
// }
//
// return hs, nil
//}

func RegisterFromFile(c *client, servers []string, srvName string, srvPort int) (errors []error) {

for _, addr := range servers {

e := c.ServiceRegister(srv_name, addr, srv_port)
e := c.ServiceRegister(srvName, addr, srvPort)
if e != nil {
errors = append(errors, e)
}

}
return
}
func (c *client) RunRefreshServiceNode(ctx context.Context, srv_name string) error {
ticker := time.NewTicker(GetServiceInterval)
func (c *client) RunRefreshServiceNode(ctx context.Context, srvName string, consulServerAddr string) error {
level.Info(c.logger).Log("msg", "RunRefreshServiceNode start....")
go RunReshardHashRing(ctx, c.logger)
c.GetService(srv_name, "")
defer ticker.Stop()
for {
select {
case <-ctx.Done():
level.Info(c.logger).Log("msg", "receive_quit_signal_and_quit")
return nil
case <-ticker.C:
c.GetService(srv_name, "")

errchan := make(chan error, 1)
go func() {
errchan <- c.WatchService(ctx, srvName, consulServerAddr)

}()
select {
case <-ctx.Done():
level.Info(c.logger).Log("msg", "RunRefreshServiceNode_receive_quit_signal_and_quit")
return nil
case err := <-errchan:
level.Error(c.logger).Log("msg", "WatchService_get_error", "err", err)
return err
}
return nil
}

func (c *client) WatchService(ctx context.Context, srvName string, consulServerAddr string) error {

watchConfig := make(map[string]interface{})

watchConfig["type"] = "service"
watchConfig["service"] = srvName
watchConfig["handler_type"] = "script"
watchConfig["passingonly"] = true
watchPlan, err := watch.Parse(watchConfig)
if err != nil {
level.Error(c.logger).Log("msg", "create_Watch_by_watch_config_error", "srv_name", srvName, "error", err)
return err

}

watchPlan.Handler = func(lastIndex uint64, result interface{}) {
if entries, ok := result.([]*consul.ServiceEntry); ok {
var hs []string

for _, a := range entries {

hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port))
}
if len(hs) > 0 {
level.Info(c.logger).Log("msg", "service_node_change_by_healthy_check", "srv_name", srvName, "num", len(hs), "detail", strings.Join(hs, " "))
NodeUpdateChan <- hs
}

}

}
if err := watchPlan.Run(consulServerAddr); err != nil {
level.Error(c.logger).Log("msg", "watchPlan_run_error", "srv_name", srvName, "error", err)
return err
}
return nil

}
5 changes: 3 additions & 2 deletions pkg/web/controller/pushgateway/pgw_controller.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package pushgateway

import (
"log"
"net/http"

"github.com/gin-gonic/gin"

"net/http"
"dynamic-sharding/pkg/sd"
"log"
)

func PushMetricsGetHash(c *gin.Context) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/web/controller/pushgateway/pgw_route.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package pushgateway

import (
"github.com/gin-gonic/gin"
"net/http"

"github.com/gin-gonic/gin"
)

func Routes(r *gin.Engine) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/web/gin.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package web

import (
"dynamic-sharding/pkg/web/controller/pushgateway"
"github.com/gin-gonic/gin"
"net/http"
"time"
"net/http"

"github.com/gin-gonic/gin"

"dynamic-sharding/pkg/web/controller/pushgateway"
)

func StartGin(port string, r *gin.Engine) error {
Expand Down

0 comments on commit 66da0bb

Please sign in to comment.