diff --git a/pkg/config/config.go b/pkg/config/config.go index f0cf445..eafc36c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,8 +1,9 @@ package config import ( - "gopkg.in/yaml.v2" "io/ioutil" + + "gopkg.in/yaml.v2" "github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log" ) @@ -44,5 +45,6 @@ func LoadFile(filename string, logger log.Logger) (*Config, error) { if err != nil { level.Error(logger).Log("msg", "parsing YAML file errr...", "error", err) } + return cfg, nil } diff --git a/pkg/main.go b/pkg/main.go index 4dac40e..68a3968 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -1,21 +1,23 @@ package main import ( - "gopkg.in/alecthomas/kingpin.v2" - "github.com/prometheus/common/promlog" - "github.com/prometheus/common/promlog/flag" - "github.com/prometheus/common/version" - "context" - "github.com/oklog/run" + "fmt" "os" "os/signal" + "context" "syscall" - "github.com/go-kit/kit/log/level" - "dynamic-sharding/pkg/web" + + "gopkg.in/alecthomas/kingpin.v2" "github.com/gin-gonic/gin" + "github.com/oklog/run" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/promlog" + "github.com/prometheus/common/promlog/flag" + "github.com/prometheus/common/version" + "dynamic-sharding/pkg/config" + "dynamic-sharding/pkg/web" "dynamic-sharding/pkg/sd" - "fmt" ) func main() { @@ -34,8 +36,11 @@ func main() { // new grpc manager ctxAll, cancelAll := context.WithCancel(context.Background()) - sc, _ := config.LoadFile(*configFile, logger) - + sc, err := config.LoadFile(*configFile, logger) + if err != nil { + level.Error(logger).Log("msg", "config.LoadFil Error, exiting ...", "error", err) + return + } // init consul client client, err := sd.NewConsulClient(sc.ConsulServer.Addr, logger) @@ -44,11 +49,6 @@ func main() { return } - // register service - errors := sd.RegisterFromFile(client, sc.PGW.Servers, sc.ConsulServer.RegisterServiceName, sc.PGW.Port) - if len(errors) > 0 { - level.Error(logger).Log("msg", "RegisterFromFile Error", "error", errors) - } // init node hash ring var ss []string for _, i := range sc.PGW.Servers { @@ -56,6 +56,13 @@ func main() { } sd.NewConsistentHashNodesRing(ss) + + // register service + errors := sd.RegisterFromFile(client, sc.PGW.Servers, sc.ConsulServer.RegisterServiceName, sc.PGW.Port) + if len(errors) > 0 { + level.Error(logger).Log("msg", "RegisterFromFile Error", "error", errors) + } + var g run.Group { // Termination handler. @@ -89,7 +96,7 @@ func main() { level.Info(logger).Log("msg", "start web service Listening on address", "address", sc.HttpListenAddr) gin.SetMode(gin.ReleaseMode) routes := gin.Default() - errchan := make(chan error) + errchan := make(chan error, 1) go func() { errchan <- web.StartGin(sc.HttpListenAddr, routes) @@ -110,11 +117,11 @@ func main() { } { - // service discovery manager. + // WatchService manager. g.Add(func() error { - err := client.RunRefreshServiceNode(ctxAll, sc.ConsulServer.RegisterServiceName) + err := client.RunRefreshServiceNode(ctxAll, sc.ConsulServer.RegisterServiceName, sc.ConsulServer.Addr) if err != nil { - level.Error(logger).Log("msg", "service discovery error", "error", err) + level.Error(logger).Log("msg", "watchService_error", "error", err) } return err }, func(err error) { diff --git a/pkg/sd/rings.go b/pkg/sd/rings.go index ee365d2..7b256e2 100644 --- a/pkg/sd/rings.go +++ b/pkg/sd/rings.go @@ -1,13 +1,15 @@ package sd import ( - "dynamic-sharding/pkg/consistent" "sync" "sort" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "context" "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "dynamic-sharding/pkg/consistent" ) const numberOfReplicas = 500 @@ -81,6 +83,8 @@ func StringSliceEqualBCE(a, b []string) bool { } func RunReshardHashRing(ctx context.Context, logger log.Logger) { + + level.Info(logger).Log("msg", "RunRefreshServiceNode start....") for { select { case nodes := <-NodeUpdateChan: @@ -91,10 +95,10 @@ func RunReshardHashRing(ctx context.Context, logger log.Logger) { sort.Strings(oldNodes) isEq := StringSliceEqualBCE(nodes, oldNodes) if isEq == false { - level.Info(logger).Log("msg", "RunReshardHashRing_node_update_reshard", "oldnodes", strings.Join(oldNodes, ","), "newnodes", strings.Join(nodes, ","), ) + level.Info(logger).Log("msg", "RunReshardHashRing_node_update_reshard", "old_num", len(oldNodes), "new_num", len(nodes), "oldnodes", strings.Join(oldNodes, ","), "newnodes", strings.Join(nodes, ","), ) PgwNodeRing.ReShardRing(nodes) } else { - level.Debug(logger).Log("msg", "RunReshardHashRing_node_same", "nodes", strings.Join(nodes, ",")) + level.Info(logger).Log("msg", "RunReshardHashRing_node_same", "nodes", strings.Join(nodes, ",")) } case <-ctx.Done(): diff --git a/pkg/sd/sd.go b/pkg/sd/sd.go index 7250114..4d575f6 100644 --- a/pkg/sd/sd.go +++ b/pkg/sd/sd.go @@ -2,15 +2,15 @@ package sd import ( "fmt" + "context" + "strings" + consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - time "time" - "context" ) -const GetServiceInterval = time.Second * 5 - type client struct { consul *consul.Client logger log.Logger @@ -18,7 +18,7 @@ type client struct { type Client interface { // Get a Service from consul - GetService(string, string) ([]string, error) + //GetService(string, string) ([]string, error) // register a service with local agent ServiceRegister(string, string, int) error // Deregister a service with local agent @@ -36,16 +36,15 @@ func NewConsulClient(addr string, logger log.Logger) (*client, error) { } // Register a service with consul local agent -func (c *client) ServiceRegister(srv_name, srv_host string, srv_port int) error { +func (c *client) ServiceRegister(srvName, srvHost string, srvPort int) error { reg := new(consul.AgentServiceRegistration) - reg.Name = srv_name + reg.Name = srvName - thisId := fmt.Sprintf("%s_%d", srv_host, srv_port) + thisId := fmt.Sprintf("%s_%d", srvHost, srvPort) reg.ID = thisId - reg.Port = srv_port - //registration.Tags = []string{"user-tomcat"} - reg.Address = srv_host + reg.Port = srvPort + reg.Address = srvHost level.Info(c.logger).Log("msg", "ServiceRegisterStart", "id", thisId) //增加check check := new(consul.AgentServiceCheck) @@ -66,35 +65,35 @@ func (c *client) DeRegister(id string) error { return c.consul.Agent().ServiceDeregister(id) } -// Service return a service -func (c *client) GetService(service, tag string) ([]string, error) { - passingOnly := true - addrs, _, err := c.consul.Health().Service(service, tag, passingOnly, nil) - if len(addrs) == 0 && err == nil { - return nil, fmt.Errorf("service ( %s ) was not found", service) - } - - if err != nil { - return nil, err - } - var hs []string - - for _, a := range addrs { - - hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port)) - } - if len(hs) > 0 { - NodeUpdateChan <- hs - } - - return hs, nil -} - -func RegisterFromFile(c *client, servers []string, srv_name string, srv_port int) (errors []error) { +//// Service return a service +//func (c *client) GetService(service, tag string) ([]string, error) { +// passingOnly := true +// addrs, _, err := c.consul.Health().Service(service, tag, passingOnly, nil) +// if len(addrs) == 0 && err == nil { +// return nil, fmt.Errorf("service ( %s ) was not found", service) +// } +// +// if err != nil { +// return nil, err +// } +// var hs []string +// +// for _, a := range addrs { +// +// hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port)) +// } +// if len(hs) > 0 { +// NodeUpdateChan <- hs +// } +// +// return hs, nil +//} + +func RegisterFromFile(c *client, servers []string, srvName string, srvPort int) (errors []error) { for _, addr := range servers { - e := c.ServiceRegister(srv_name, addr, srv_port) + e := c.ServiceRegister(srvName, addr, srvPort) if e != nil { errors = append(errors, e) } @@ -102,21 +101,61 @@ func RegisterFromFile(c *client, servers []string, srv_name string, srv_port int } return } -func (c *client) RunRefreshServiceNode(ctx context.Context, srv_name string) error { - ticker := time.NewTicker(GetServiceInterval) +func (c *client) RunRefreshServiceNode(ctx context.Context, srvName string, consulServerAddr string) error { level.Info(c.logger).Log("msg", "RunRefreshServiceNode start....") go RunReshardHashRing(ctx, c.logger) - c.GetService(srv_name, "") - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - level.Info(c.logger).Log("msg", "receive_quit_signal_and_quit") - return nil - case <-ticker.C: - c.GetService(srv_name, "") + + errchan := make(chan error, 1) + go func() { + errchan <- c.WatchService(ctx, srvName, consulServerAddr) + + }() + select { + case <-ctx.Done(): + level.Info(c.logger).Log("msg", "RunRefreshServiceNode_receive_quit_signal_and_quit") + return nil + case err := <-errchan: + level.Error(c.logger).Log("msg", "WatchService_get_error", "err", err) + return err + } + return nil +} + +func (c *client) WatchService(ctx context.Context, srvName string, consulServerAddr string) error { + + watchConfig := make(map[string]interface{}) + + watchConfig["type"] = "service" + watchConfig["service"] = srvName + watchConfig["handler_type"] = "script" + watchConfig["passingonly"] = true + watchPlan, err := watch.Parse(watchConfig) + if err != nil { + level.Error(c.logger).Log("msg", "create_Watch_by_watch_config_error", "srv_name", srvName, "error", err) + return err + + } + + watchPlan.Handler = func(lastIndex uint64, result interface{}) { + if entries, ok := result.([]*consul.ServiceEntry); ok { + var hs []string + + for _, a := range entries { + + hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port)) + } + if len(hs) > 0 { + level.Info(c.logger).Log("msg", "service_node_change_by_healthy_check", "srv_name", srvName, "num", len(hs), "detail", strings.Join(hs, " ")) + NodeUpdateChan <- hs + } + } } + if err := watchPlan.Run(consulServerAddr); err != nil { + level.Error(c.logger).Log("msg", "watchPlan_run_error", "srv_name", srvName, "error", err) + return err + } return nil + } diff --git a/pkg/web/controller/pushgateway/pgw_controller.go b/pkg/web/controller/pushgateway/pgw_controller.go index 6ae2acf..ff41e31 100644 --- a/pkg/web/controller/pushgateway/pgw_controller.go +++ b/pkg/web/controller/pushgateway/pgw_controller.go @@ -1,11 +1,12 @@ package pushgateway import ( + "log" + "net/http" + "github.com/gin-gonic/gin" - "net/http" "dynamic-sharding/pkg/sd" - "log" ) func PushMetricsGetHash(c *gin.Context) { diff --git a/pkg/web/controller/pushgateway/pgw_route.go b/pkg/web/controller/pushgateway/pgw_route.go index a764688..c903b98 100644 --- a/pkg/web/controller/pushgateway/pgw_route.go +++ b/pkg/web/controller/pushgateway/pgw_route.go @@ -1,8 +1,9 @@ package pushgateway import ( - "github.com/gin-gonic/gin" "net/http" + + "github.com/gin-gonic/gin" ) func Routes(r *gin.Engine) { diff --git a/pkg/web/gin.go b/pkg/web/gin.go index 33e7699..681568f 100644 --- a/pkg/web/gin.go +++ b/pkg/web/gin.go @@ -1,10 +1,12 @@ package web import ( - "dynamic-sharding/pkg/web/controller/pushgateway" - "github.com/gin-gonic/gin" - "net/http" "time" + "net/http" + + "github.com/gin-gonic/gin" + + "dynamic-sharding/pkg/web/controller/pushgateway" ) func StartGin(port string, r *gin.Engine) error {