Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add engine params to engine API #641

Merged
merged 4 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion engine/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
type Engine struct {
client dockerapi.APIClient
config coretypes.Config
ep *enginetypes.Params
}

// MakeClient make docker cli
Expand All @@ -45,7 +46,18 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint
}

logger.Debugf(ctx, "Create new http.Client for %s, %s", endpoint, config.Docker.APIVersion)
return makeDockerClient(ctx, config, client, endpoint)
e, err := makeDockerClient(ctx, config, client, endpoint)
if err != nil {
return nil, err
}
e.ep = &enginetypes.Params{
Nodename: nodename,
Endpoint: endpoint,
CA: ca,
Cert: cert,
Key: key,
}
return e, nil
}

// Info show node info
Expand All @@ -59,6 +71,10 @@ func (e *Engine) Info(ctx context.Context) (*enginetypes.Info, error) {
return &enginetypes.Info{Type: Type, ID: r.ID, NCPU: r.NCPU, MemTotal: r.MemTotal}, nil
}

func (e *Engine) GetParams() *enginetypes.Params {
return e.ep
}

// Ping test connection
func (e *Engine) Ping(ctx context.Context) error {
_, err := e.client.Ping(ctx)
Expand Down
4 changes: 2 additions & 2 deletions engine/docker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ func GetIP(ctx context.Context, daemonHost string) string {
return u.Hostname()
}

func makeDockerClient(_ context.Context, config coretypes.Config, client *http.Client, endpoint string) (engine.API, error) {
func makeDockerClient(_ context.Context, config coretypes.Config, client *http.Client, endpoint string) (*Engine, error) {
CMGS marked this conversation as resolved.
Show resolved Hide resolved
cli, err := dockerapi.NewClientWithOpts(
dockerapi.WithHost(endpoint),
dockerapi.WithVersion(config.Docker.APIVersion),
dockerapi.WithHTTPClient(client))
if err != nil {
return nil, err
}
return &Engine{cli, config}, nil
return &Engine{client: cli, config: config}, nil
}

func useCNI(labels map[string]string) bool {
Expand Down
1 change: 1 addition & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type API interface {
Info(ctx context.Context) (*enginetypes.Info, error)
Ping(ctx context.Context) error
CloseConn() error
GetParams() *enginetypes.Params

Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (execID string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error)
ExecResize(ctx context.Context, execID string, height, width uint) (err error)
Expand Down
126 changes: 55 additions & 71 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package factory

import (
"context"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/engine/virt"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
Expand All @@ -38,22 +38,20 @@ var (

// EngineCache .
type EngineCache struct {
cache *utils.EngineCache
keysToCheck *haxmap.Map[string, engineParams]
pool *ants.PoolWithFunc
config types.Config
stor store.Store
cache *haxmap.Map[string, engine.API]
pool *ants.PoolWithFunc
config types.Config
stor store.Store
}

// NewEngineCache .
func NewEngineCache(config types.Config, stor store.Store) *EngineCache {
pool, _ := utils.NewPool(config.MaxConcurrency)
return &EngineCache{
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
keysToCheck: haxmap.New[string, engineParams](),
pool: pool,
config: config,
stor: stor,
cache: haxmap.New[string, engine.API](),
pool: pool,
config: config,
stor: stor,
}
}

Expand All @@ -66,28 +64,28 @@ func InitEngineCache(ctx context.Context, config types.Config, stor store.Store)
All: true,
})
}
go engineCache.CheckAlive(ctx)
go engineCache.CheckNodeStatus(ctx)
go engineCache.checkAlive(ctx)
go engineCache.checkNodeStatus(ctx)
}

// Get .
func (e *EngineCache) Get(key string) engine.API {
return e.cache.Get(key)
api, _ := e.cache.Get(key)
return api
}

// Set .
func (e *EngineCache) Set(params engineParams, client engine.API) {
e.cache.Set(params.getCacheKey(), client)
e.keysToCheck.Set(params.getCacheKey(), params)
func (e *EngineCache) Set(key string, client engine.API) {
e.cache.Set(key, client)
}

// Delete .
func (e *EngineCache) Delete(key string) {
e.cache.Delete(key)
e.cache.Del(key)
}

// CheckAlive checks if the engine in cache is available
func (e *EngineCache) CheckAlive(ctx context.Context) {
// checkAlive checks if the engine in cache is available
func (e *EngineCache) checkAlive(ctx context.Context) {
logger := log.WithFunc("engine.factory.CheckAlive")
logger.Info(ctx, "check alive starts")
defer logger.Info(ctx, "check alive ends")
Expand All @@ -99,52 +97,43 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
default:
}

paramsChan := make(chan engineParams)
go func() {
e.keysToCheck.ForEach(func(_ string, v engineParams) bool {
paramsChan <- v
return true
})
close(paramsChan)
}()

wg := &sync.WaitGroup{}
for params := range paramsChan {
e.cache.ForEach(func(_ string, v engine.API) bool {
wg.Add(1)
params := params
params := v.GetParams()
_ = e.pool.Invoke(func() {
defer wg.Done()
cacheKey := params.getCacheKey()
client := e.cache.Get(cacheKey)
cacheKey := params.CacheKey()
client := e.Get(cacheKey)
if client == nil {
e.cache.Delete(params.getCacheKey())
e.keysToCheck.Del(cacheKey)
e.Delete(cacheKey)
return
}
if _, ok := client.(*fake.EngineWithErr); ok {
if newClient, err := newEngine(ctx, e.config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil {
if newClient, err := newEngine(ctx, e.config, params.Nodename, params.Endpoint, params.CA, params.Key, params.Cert); err != nil {
logger.Errorf(ctx, err, "engine %+v is still unavailable", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
e.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params})
// check node status
e.checkOneNodeStatus(ctx, &params)
e.checkOneNodeStatus(ctx, params)
} else {
e.cache.Set(cacheKey, newClient)
e.Set(cacheKey, newClient)
}
return
}
if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil {
logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced and removed", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
e.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params})
}
logger.Debugf(ctx, "engine %+v is available", cacheKey)
})
}
return true
})
wg.Wait()
time.Sleep(e.config.ConnectionTimeout)
}
}

func (e *EngineCache) CheckNodeStatus(ctx context.Context) {
func (e *EngineCache) checkNodeStatus(ctx context.Context) {
logger := log.WithFunc("engine.factory.CheckNodeStatus")
logger.Info(ctx, "check NodeStatus starts")
defer logger.Info(ctx, "check NodeStatus ends")
Expand All @@ -169,10 +158,11 @@ func (e *EngineCache) CheckNodeStatus(ctx context.Context) {
continue
}
// a node may have multiple engines, so we need check all key here
e.keysToCheck.ForEach(func(_ string, ep engineParams) bool {
if ep.nodename == ns.Nodename {
logger.Infof(ctx, "remove engine %+v from cache", ep.getCacheKey())
RemoveEngineFromCache(ctx, ep.endpoint, ep.ca, ep.cert, ep.key)
e.cache.ForEach(func(_ string, v engine.API) bool {
ep := v.GetParams()
if ep.Nodename == ns.Nodename {
logger.Infof(ctx, "remove engine %+v from cache", ep.CacheKey())
RemoveEngineFromCache(ctx, ep.Endpoint, ep.CA, ep.Cert, ep.Key)
}
return true
})
Expand Down Expand Up @@ -200,38 +190,26 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
}

defer func() {
params := engineParams{
nodename: nodename,
endpoint: endpoint,
ca: ca,
cert: cert,
key: key,
params := &enginetypes.Params{
Nodename: nodename,
Endpoint: endpoint,
CA: ca,
Cert: cert,
Key: key,
}
cacheKey := params.getCacheKey()
cacheKey := params.CacheKey()
if err == nil {
engineCache.Set(params, client)
engineCache.Set(cacheKey, client)
logger.Infof(ctx, "store engine %+v in cache", cacheKey)
} else {
engineCache.Set(params, &fake.EngineWithErr{DefaultErr: err})
engineCache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params})
logger.Infof(ctx, "store fake engine %+v in cache", cacheKey)
}
}()

return newEngine(ctx, config, nodename, endpoint, ca, cert, key)
}

type engineParams struct {
nodename string
endpoint string
ca string
cert string
key string
}

func (ep engineParams) getCacheKey() string {
return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key)
}

func validateEngine(ctx context.Context, engine engine.API, timeout time.Duration) (err error) {
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
err = engine.Ping(ctx)
Expand All @@ -249,7 +227,13 @@ func getEnginePrefix(endpoint string) (string, error) {
}

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return fmt.Sprintf("%+v-%+v", endpoint, utils.SHA256(fmt.Sprintf(":%+v:%+v:%+v", ca, cert, key))[:8])
p := enginetypes.Params{
Endpoint: endpoint,
CA: ca,
Cert: cert,
Key: key,
}
return p.CacheKey()
}

// newEngine get engine
Expand All @@ -275,13 +259,13 @@ func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
return client, nil
}

func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *engineParams) {
func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *enginetypes.Params) {
if e.stor == nil {
return
}
logger := log.WithFunc("engine.factory.checkOneNodeStatus")
nodename := params.nodename
cacheKey := params.getCacheKey()
nodename := params.Nodename
cacheKey := params.CacheKey()
if ns, err := e.stor.GetNodeStatus(ctx, nodename); (err != nil && errors.Is(err, types.ErrInvaildCount)) || (!ns.Alive) {
logger.Warnf(ctx, "node %s is offline, the cache will be removed", nodename)
e.Delete(cacheKey)
Expand Down
5 changes: 5 additions & 0 deletions engine/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ import (
// EngineWithErr use to mock the nil engine
type EngineWithErr struct {
DefaultErr error
EP *enginetypes.Params
}

// Info .
func (f *EngineWithErr) Info(_ context.Context) (*enginetypes.Info, error) {
return nil, f.DefaultErr
}

func (f *EngineWithErr) GetParams() *enginetypes.Params {
return f.EP
}

// Ping .
func (f *EngineWithErr) Ping(_ context.Context) error {
return f.DefaultErr
Expand Down
Loading
Loading