From 606f643116e82ea705e8c3938cf63777a003c681 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 8 Jul 2024 11:43:05 +0800 Subject: [PATCH] add engine params to engine API (#641) * add engine params to engine API * replace go-cache with haxmap * remove cache utility * fix review issue --- engine/docker/docker.go | 18 +++- engine/docker/helper.go | 4 +- engine/engine.go | 1 + engine/factory/factory.go | 126 +++++++++++------------ engine/fake/fake.go | 5 + engine/mocks/API.go | 170 +++++++++++++++++++++++++++++++- engine/mocks/fakeengine/mock.go | 10 +- engine/systemd/systemd.go | 14 +++ engine/types/params.go | 27 +++++ engine/virt/virt.go | 16 ++- go.mod | 1 - go.sum | 2 - store/etcdv3/node.go | 10 +- store/redis/node.go | 10 +- utils/cache.go | 41 -------- utils/cache_test.go | 23 ----- 16 files changed, 331 insertions(+), 147 deletions(-) create mode 100644 engine/types/params.go delete mode 100644 utils/cache.go delete mode 100644 utils/cache_test.go diff --git a/engine/docker/docker.go b/engine/docker/docker.go index 8029a2b70..50259614c 100644 --- a/engine/docker/docker.go +++ b/engine/docker/docker.go @@ -27,6 +27,7 @@ const ( type Engine struct { client dockerapi.APIClient config coretypes.Config + ep *enginetypes.Params } // MakeClient make docker cli @@ -45,7 +46,18 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint } logger.Debugf(ctx, "Create new http.Client for %s, %s", endpoint, config.Docker.APIVersion) - return makeDockerClient(ctx, config, client, endpoint) + e, err := makeDockerClient(ctx, config, client, endpoint) + if err != nil { + return nil, err + } + e.ep = &enginetypes.Params{ + Nodename: nodename, + Endpoint: endpoint, + CA: ca, + Cert: cert, + Key: key, + } + return e, nil } // Info show node info @@ -59,6 +71,10 @@ func (e *Engine) Info(ctx context.Context) (*enginetypes.Info, error) { return &enginetypes.Info{Type: Type, ID: r.ID, NCPU: r.NCPU, MemTotal: r.MemTotal}, nil } +func (e *Engine) GetParams() *enginetypes.Params { + return e.ep +} + // Ping test connection func (e *Engine) Ping(ctx context.Context) error { _, err := e.client.Ping(ctx) diff --git a/engine/docker/helper.go b/engine/docker/helper.go index 77cac73b4..0e33bf43f 100644 --- a/engine/docker/helper.go +++ b/engine/docker/helper.go @@ -299,7 +299,7 @@ func GetIP(ctx context.Context, daemonHost string) string { return u.Hostname() } -func makeDockerClient(_ context.Context, config coretypes.Config, client *http.Client, endpoint string) (engine.API, error) { +func makeDockerClient(_ context.Context, config coretypes.Config, client *http.Client, endpoint string) (*Engine, error) { cli, err := dockerapi.NewClientWithOpts( dockerapi.WithHost(endpoint), dockerapi.WithVersion(config.Docker.APIVersion), @@ -307,7 +307,7 @@ func makeDockerClient(_ context.Context, config coretypes.Config, client *http.C if err != nil { return nil, err } - return &Engine{cli, config}, nil + return &Engine{client: cli, config: config}, nil } func useCNI(labels map[string]string) bool { diff --git a/engine/engine.go b/engine/engine.go index caff9d16a..48ff5657c 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -15,6 +15,7 @@ type API interface { Info(ctx context.Context) (*enginetypes.Info, error) Ping(ctx context.Context) error CloseConn() error + GetParams() *enginetypes.Params Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (execID string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) ExecResize(ctx context.Context, execID string, height, width uint) (err error) diff --git a/engine/factory/factory.go b/engine/factory/factory.go index 8d9472c41..d9b4dc54d 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -2,7 +2,6 @@ package factory import ( "context" - "fmt" "strings" "sync" "time" @@ -16,6 +15,7 @@ import ( "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/engine/mocks/fakeengine" "github.com/projecteru2/core/engine/systemd" + enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/engine/virt" "github.com/projecteru2/core/log" "github.com/projecteru2/core/store" @@ -38,22 +38,20 @@ var ( // EngineCache . type EngineCache struct { - cache *utils.EngineCache - keysToCheck *haxmap.Map[string, engineParams] - pool *ants.PoolWithFunc - config types.Config - stor store.Store + cache *haxmap.Map[string, engine.API] + pool *ants.PoolWithFunc + config types.Config + stor store.Store } // NewEngineCache . func NewEngineCache(config types.Config, stor store.Store) *EngineCache { pool, _ := utils.NewPool(config.MaxConcurrency) return &EngineCache{ - cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute), - keysToCheck: haxmap.New[string, engineParams](), - pool: pool, - config: config, - stor: stor, + cache: haxmap.New[string, engine.API](), + pool: pool, + config: config, + stor: stor, } } @@ -66,28 +64,28 @@ func InitEngineCache(ctx context.Context, config types.Config, stor store.Store) All: true, }) } - go engineCache.CheckAlive(ctx) - go engineCache.CheckNodeStatus(ctx) + go engineCache.checkAlive(ctx) + go engineCache.checkNodeStatus(ctx) } // Get . func (e *EngineCache) Get(key string) engine.API { - return e.cache.Get(key) + api, _ := e.cache.Get(key) + return api } // Set . -func (e *EngineCache) Set(params engineParams, client engine.API) { - e.cache.Set(params.getCacheKey(), client) - e.keysToCheck.Set(params.getCacheKey(), params) +func (e *EngineCache) Set(key string, client engine.API) { + e.cache.Set(key, client) } // Delete . func (e *EngineCache) Delete(key string) { - e.cache.Delete(key) + e.cache.Del(key) } -// CheckAlive checks if the engine in cache is available -func (e *EngineCache) CheckAlive(ctx context.Context) { +// checkAlive checks if the engine in cache is available +func (e *EngineCache) checkAlive(ctx context.Context) { logger := log.WithFunc("engine.factory.CheckAlive") logger.Info(ctx, "check alive starts") defer logger.Info(ctx, "check alive ends") @@ -99,52 +97,43 @@ func (e *EngineCache) CheckAlive(ctx context.Context) { default: } - paramsChan := make(chan engineParams) - go func() { - e.keysToCheck.ForEach(func(_ string, v engineParams) bool { - paramsChan <- v - return true - }) - close(paramsChan) - }() - wg := &sync.WaitGroup{} - for params := range paramsChan { + e.cache.ForEach(func(_ string, v engine.API) bool { wg.Add(1) - params := params + params := v.GetParams() _ = e.pool.Invoke(func() { defer wg.Done() - cacheKey := params.getCacheKey() - client := e.cache.Get(cacheKey) + cacheKey := params.CacheKey() + client := e.Get(cacheKey) if client == nil { - e.cache.Delete(params.getCacheKey()) - e.keysToCheck.Del(cacheKey) + e.Delete(cacheKey) return } if _, ok := client.(*fake.EngineWithErr); ok { - if newClient, err := newEngine(ctx, e.config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil { + if newClient, err := newEngine(ctx, e.config, params.Nodename, params.Endpoint, params.CA, params.Key, params.Cert); err != nil { logger.Errorf(ctx, err, "engine %+v is still unavailable", cacheKey) - e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err}) + e.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params}) // check node status - e.checkOneNodeStatus(ctx, ¶ms) + e.checkOneNodeStatus(ctx, params) } else { - e.cache.Set(cacheKey, newClient) + e.Set(cacheKey, newClient) } return } if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil { logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced and removed", cacheKey) - e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err}) + e.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params}) } logger.Debugf(ctx, "engine %+v is available", cacheKey) }) - } + return true + }) wg.Wait() time.Sleep(e.config.ConnectionTimeout) } } -func (e *EngineCache) CheckNodeStatus(ctx context.Context) { +func (e *EngineCache) checkNodeStatus(ctx context.Context) { logger := log.WithFunc("engine.factory.CheckNodeStatus") logger.Info(ctx, "check NodeStatus starts") defer logger.Info(ctx, "check NodeStatus ends") @@ -169,10 +158,11 @@ func (e *EngineCache) CheckNodeStatus(ctx context.Context) { continue } // a node may have multiple engines, so we need check all key here - e.keysToCheck.ForEach(func(_ string, ep engineParams) bool { - if ep.nodename == ns.Nodename { - logger.Infof(ctx, "remove engine %+v from cache", ep.getCacheKey()) - RemoveEngineFromCache(ctx, ep.endpoint, ep.ca, ep.cert, ep.key) + e.cache.ForEach(func(_ string, v engine.API) bool { + ep := v.GetParams() + if ep.Nodename == ns.Nodename { + logger.Infof(ctx, "remove engine %+v from cache", ep.CacheKey()) + RemoveEngineFromCache(ctx, ep.Endpoint, ep.CA, ep.Cert, ep.Key) } return true }) @@ -200,19 +190,19 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, } defer func() { - params := engineParams{ - nodename: nodename, - endpoint: endpoint, - ca: ca, - cert: cert, - key: key, + params := &enginetypes.Params{ + Nodename: nodename, + Endpoint: endpoint, + CA: ca, + Cert: cert, + Key: key, } - cacheKey := params.getCacheKey() + cacheKey := params.CacheKey() if err == nil { - engineCache.Set(params, client) + engineCache.Set(cacheKey, client) logger.Infof(ctx, "store engine %+v in cache", cacheKey) } else { - engineCache.Set(params, &fake.EngineWithErr{DefaultErr: err}) + engineCache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params}) logger.Infof(ctx, "store fake engine %+v in cache", cacheKey) } }() @@ -220,18 +210,6 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, return newEngine(ctx, config, nodename, endpoint, ca, cert, key) } -type engineParams struct { - nodename string - endpoint string - ca string - cert string - key string -} - -func (ep engineParams) getCacheKey() string { - return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key) -} - func validateEngine(ctx context.Context, engine engine.API, timeout time.Duration) (err error) { utils.WithTimeout(ctx, timeout, func(ctx context.Context) { err = engine.Ping(ctx) @@ -249,7 +227,13 @@ func getEnginePrefix(endpoint string) (string, error) { } func getEngineCacheKey(endpoint, ca, cert, key string) string { - return fmt.Sprintf("%+v-%+v", endpoint, utils.SHA256(fmt.Sprintf(":%+v:%+v:%+v", ca, cert, key))[:8]) + p := enginetypes.Params{ + Endpoint: endpoint, + CA: ca, + Cert: cert, + Key: key, + } + return p.CacheKey() } // newEngine get engine @@ -275,13 +259,13 @@ func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, return client, nil } -func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *engineParams) { +func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *enginetypes.Params) { if e.stor == nil { return } logger := log.WithFunc("engine.factory.checkOneNodeStatus") - nodename := params.nodename - cacheKey := params.getCacheKey() + nodename := params.Nodename + cacheKey := params.CacheKey() if ns, err := e.stor.GetNodeStatus(ctx, nodename); (err != nil && errors.Is(err, types.ErrInvaildCount)) || (!ns.Alive) { logger.Warnf(ctx, "node %s is offline, the cache will be removed", nodename) e.Delete(cacheKey) diff --git a/engine/fake/fake.go b/engine/fake/fake.go index 823390707..76cba7878 100644 --- a/engine/fake/fake.go +++ b/engine/fake/fake.go @@ -13,6 +13,7 @@ import ( // EngineWithErr use to mock the nil engine type EngineWithErr struct { DefaultErr error + EP *enginetypes.Params } // Info . @@ -20,6 +21,10 @@ func (f *EngineWithErr) Info(_ context.Context) (*enginetypes.Info, error) { return nil, f.DefaultErr } +func (f *EngineWithErr) GetParams() *enginetypes.Params { + return f.EP +} + // Ping . func (f *EngineWithErr) Ping(_ context.Context) error { return f.DefaultErr diff --git a/engine/mocks/API.go b/engine/mocks/API.go index 959133913..e0d263d3f 100644 --- a/engine/mocks/API.go +++ b/engine/mocks/API.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.33.2. DO NOT EDIT. +// Code generated by mockery v2.42.0. DO NOT EDIT. package mocks @@ -27,6 +27,10 @@ type API struct { func (_m *API) BuildContent(ctx context.Context, scm source.Source, opts *types.BuildContentOptions) (string, io.Reader, error) { ret := _m.Called(ctx, scm, opts) + if len(ret) == 0 { + panic("no return value specified for BuildContent") + } + var r0 string var r1 io.Reader var r2 error @@ -60,6 +64,10 @@ func (_m *API) BuildContent(ctx context.Context, scm source.Source, opts *types. func (_m *API) BuildRefs(ctx context.Context, opts *types.BuildRefOptions) []string { ret := _m.Called(ctx, opts) + if len(ret) == 0 { + panic("no return value specified for BuildRefs") + } + var r0 []string if rf, ok := ret.Get(0).(func(context.Context, *types.BuildRefOptions) []string); ok { r0 = rf(ctx, opts) @@ -76,6 +84,10 @@ func (_m *API) BuildRefs(ctx context.Context, opts *types.BuildRefOptions) []str func (_m *API) CloseConn() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for CloseConn") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -90,6 +102,10 @@ func (_m *API) CloseConn() error { func (_m *API) ExecExitCode(ctx context.Context, ID string, execID string) (int, error) { ret := _m.Called(ctx, ID, execID) + if len(ret) == 0 { + panic("no return value specified for ExecExitCode") + } + var r0 int var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string) (int, error)); ok { @@ -114,6 +130,10 @@ func (_m *API) ExecExitCode(ctx context.Context, ID string, execID string) (int, func (_m *API) ExecResize(ctx context.Context, execID string, height uint, width uint) error { ret := _m.Called(ctx, execID, height, width) + if len(ret) == 0 { + panic("no return value specified for ExecResize") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, uint, uint) error); ok { r0 = rf(ctx, execID, height, width) @@ -128,6 +148,10 @@ func (_m *API) ExecResize(ctx context.Context, execID string, height uint, width func (_m *API) Execute(ctx context.Context, ID string, config *types.ExecConfig) (string, io.ReadCloser, io.ReadCloser, io.WriteCloser, error) { ret := _m.Called(ctx, ID, config) + if len(ret) == 0 { + panic("no return value specified for Execute") + } + var r0 string var r1 io.ReadCloser var r2 io.ReadCloser @@ -175,10 +199,34 @@ func (_m *API) Execute(ctx context.Context, ID string, config *types.ExecConfig) return r0, r1, r2, r3, r4 } +// GetParams provides a mock function with given fields: +func (_m *API) GetParams() *types.Params { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetParams") + } + + var r0 *types.Params + if rf, ok := ret.Get(0).(func() *types.Params); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Params) + } + } + + return r0 +} + // ImageBuild provides a mock function with given fields: ctx, input, refs, platform func (_m *API) ImageBuild(ctx context.Context, input io.Reader, refs []string, platform string) (io.ReadCloser, error) { ret := _m.Called(ctx, input, refs, platform) + if len(ret) == 0 { + panic("no return value specified for ImageBuild") + } + var r0 io.ReadCloser var r1 error if rf, ok := ret.Get(0).(func(context.Context, io.Reader, []string, string) (io.ReadCloser, error)); ok { @@ -205,6 +253,10 @@ func (_m *API) ImageBuild(ctx context.Context, input io.Reader, refs []string, p func (_m *API) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error) { ret := _m.Called(ctx, all) + if len(ret) == 0 { + panic("no return value specified for ImageBuildCachePrune") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, bool) (uint64, error)); ok { @@ -229,6 +281,10 @@ func (_m *API) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, erro func (_m *API) ImageBuildFromExist(ctx context.Context, ID string, refs []string, user string) (string, error) { ret := _m.Called(ctx, ID, refs, user) + if len(ret) == 0 { + panic("no return value specified for ImageBuildFromExist") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, []string, string) (string, error)); ok { @@ -253,6 +309,10 @@ func (_m *API) ImageBuildFromExist(ctx context.Context, ID string, refs []string func (_m *API) ImageList(ctx context.Context, image string) ([]*types.Image, error) { ret := _m.Called(ctx, image) + if len(ret) == 0 { + panic("no return value specified for ImageList") + } + var r0 []*types.Image var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) ([]*types.Image, error)); ok { @@ -279,6 +339,10 @@ func (_m *API) ImageList(ctx context.Context, image string) ([]*types.Image, err func (_m *API) ImageLocalDigests(ctx context.Context, image string) ([]string, error) { ret := _m.Called(ctx, image) + if len(ret) == 0 { + panic("no return value specified for ImageLocalDigests") + } + var r0 []string var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) ([]string, error)); ok { @@ -305,6 +369,10 @@ func (_m *API) ImageLocalDigests(ctx context.Context, image string) ([]string, e func (_m *API) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) { ret := _m.Called(ctx, ref, all) + if len(ret) == 0 { + panic("no return value specified for ImagePull") + } + var r0 io.ReadCloser var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, bool) (io.ReadCloser, error)); ok { @@ -331,6 +399,10 @@ func (_m *API) ImagePull(ctx context.Context, ref string, all bool) (io.ReadClos func (_m *API) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) { ret := _m.Called(ctx, ref) + if len(ret) == 0 { + panic("no return value specified for ImagePush") + } + var r0 io.ReadCloser var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (io.ReadCloser, error)); ok { @@ -357,6 +429,10 @@ func (_m *API) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) func (_m *API) ImageRemoteDigest(ctx context.Context, image string) (string, error) { ret := _m.Called(ctx, image) + if len(ret) == 0 { + panic("no return value specified for ImageRemoteDigest") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { @@ -381,6 +457,10 @@ func (_m *API) ImageRemoteDigest(ctx context.Context, image string) (string, err func (_m *API) ImageRemove(ctx context.Context, image string, force bool, prune bool) ([]string, error) { ret := _m.Called(ctx, image, force, prune) + if len(ret) == 0 { + panic("no return value specified for ImageRemove") + } + var r0 []string var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, bool, bool) ([]string, error)); ok { @@ -407,6 +487,10 @@ func (_m *API) ImageRemove(ctx context.Context, image string, force bool, prune func (_m *API) ImagesPrune(ctx context.Context) error { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for ImagesPrune") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -421,6 +505,10 @@ func (_m *API) ImagesPrune(ctx context.Context) error { func (_m *API) Info(ctx context.Context) (*types.Info, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for Info") + } + var r0 *types.Info var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*types.Info, error)); ok { @@ -447,6 +535,10 @@ func (_m *API) Info(ctx context.Context) (*types.Info, error) { func (_m *API) NetworkConnect(ctx context.Context, network string, target string, ipv4 string, ipv6 string) ([]string, error) { ret := _m.Called(ctx, network, target, ipv4, ipv6) + if len(ret) == 0 { + panic("no return value specified for NetworkConnect") + } + var r0 []string var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string) ([]string, error)); ok { @@ -473,6 +565,10 @@ func (_m *API) NetworkConnect(ctx context.Context, network string, target string func (_m *API) NetworkDisconnect(ctx context.Context, network string, target string, force bool) error { ret := _m.Called(ctx, network, target, force) + if len(ret) == 0 { + panic("no return value specified for NetworkDisconnect") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { r0 = rf(ctx, network, target, force) @@ -487,6 +583,10 @@ func (_m *API) NetworkDisconnect(ctx context.Context, network string, target str func (_m *API) NetworkList(ctx context.Context, drivers []string) ([]*types.Network, error) { ret := _m.Called(ctx, drivers) + if len(ret) == 0 { + panic("no return value specified for NetworkList") + } + var r0 []*types.Network var r1 error if rf, ok := ret.Get(0).(func(context.Context, []string) ([]*types.Network, error)); ok { @@ -513,6 +613,10 @@ func (_m *API) NetworkList(ctx context.Context, drivers []string) ([]*types.Netw func (_m *API) Ping(ctx context.Context) error { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for Ping") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -527,6 +631,10 @@ func (_m *API) Ping(ctx context.Context) error { func (_m *API) RawEngine(ctx context.Context, opts *types.RawEngineOptions) (*types.RawEngineResult, error) { ret := _m.Called(ctx, opts) + if len(ret) == 0 { + panic("no return value specified for RawEngine") + } + var r0 *types.RawEngineResult var r1 error if rf, ok := ret.Get(0).(func(context.Context, *types.RawEngineOptions) (*types.RawEngineResult, error)); ok { @@ -553,6 +661,10 @@ func (_m *API) RawEngine(ctx context.Context, opts *types.RawEngineOptions) (*ty func (_m *API) VirtualizationAttach(ctx context.Context, ID string, stream bool, openStdin bool) (io.ReadCloser, io.ReadCloser, io.WriteCloser, error) { ret := _m.Called(ctx, ID, stream, openStdin) + if len(ret) == 0 { + panic("no return value specified for VirtualizationAttach") + } + var r0 io.ReadCloser var r1 io.ReadCloser var r2 io.WriteCloser @@ -597,6 +709,10 @@ func (_m *API) VirtualizationAttach(ctx context.Context, ID string, stream bool, func (_m *API) VirtualizationCopyChunkTo(ctx context.Context, ID string, target string, size int64, content io.Reader, uid int, gid int, mode int64) error { ret := _m.Called(ctx, ID, target, size, content, uid, gid, mode) + if len(ret) == 0 { + panic("no return value specified for VirtualizationCopyChunkTo") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, int64, io.Reader, int, int, int64) error); ok { r0 = rf(ctx, ID, target, size, content, uid, gid, mode) @@ -611,6 +727,10 @@ func (_m *API) VirtualizationCopyChunkTo(ctx context.Context, ID string, target func (_m *API) VirtualizationCopyFrom(ctx context.Context, ID string, path string) ([]byte, int, int, int64, error) { ret := _m.Called(ctx, ID, path) + if len(ret) == 0 { + panic("no return value specified for VirtualizationCopyFrom") + } + var r0 []byte var r1 int var r2 int @@ -658,6 +778,10 @@ func (_m *API) VirtualizationCopyFrom(ctx context.Context, ID string, path strin func (_m *API) VirtualizationCopyTo(ctx context.Context, ID string, target string, content []byte, uid int, gid int, mode int64) error { ret := _m.Called(ctx, ID, target, content, uid, gid, mode) + if len(ret) == 0 { + panic("no return value specified for VirtualizationCopyTo") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, []byte, int, int, int64) error); ok { r0 = rf(ctx, ID, target, content, uid, gid, mode) @@ -672,6 +796,10 @@ func (_m *API) VirtualizationCopyTo(ctx context.Context, ID string, target strin func (_m *API) VirtualizationCreate(ctx context.Context, opts *types.VirtualizationCreateOptions) (*types.VirtualizationCreated, error) { ret := _m.Called(ctx, opts) + if len(ret) == 0 { + panic("no return value specified for VirtualizationCreate") + } + var r0 *types.VirtualizationCreated var r1 error if rf, ok := ret.Get(0).(func(context.Context, *types.VirtualizationCreateOptions) (*types.VirtualizationCreated, error)); ok { @@ -698,6 +826,10 @@ func (_m *API) VirtualizationCreate(ctx context.Context, opts *types.Virtualizat func (_m *API) VirtualizationInspect(ctx context.Context, ID string) (*types.VirtualizationInfo, error) { ret := _m.Called(ctx, ID) + if len(ret) == 0 { + panic("no return value specified for VirtualizationInspect") + } + var r0 *types.VirtualizationInfo var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (*types.VirtualizationInfo, error)); ok { @@ -724,6 +856,10 @@ func (_m *API) VirtualizationInspect(ctx context.Context, ID string) (*types.Vir func (_m *API) VirtualizationLogs(ctx context.Context, opts *types.VirtualizationLogStreamOptions) (io.ReadCloser, io.ReadCloser, error) { ret := _m.Called(ctx, opts) + if len(ret) == 0 { + panic("no return value specified for VirtualizationLogs") + } + var r0 io.ReadCloser var r1 io.ReadCloser var r2 error @@ -759,6 +895,10 @@ func (_m *API) VirtualizationLogs(ctx context.Context, opts *types.Virtualizatio func (_m *API) VirtualizationRemove(ctx context.Context, ID string, volumes bool, force bool) error { ret := _m.Called(ctx, ID, volumes, force) + if len(ret) == 0 { + panic("no return value specified for VirtualizationRemove") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, bool, bool) error); ok { r0 = rf(ctx, ID, volumes, force) @@ -773,6 +913,10 @@ func (_m *API) VirtualizationRemove(ctx context.Context, ID string, volumes bool func (_m *API) VirtualizationResize(ctx context.Context, ID string, height uint, width uint) error { ret := _m.Called(ctx, ID, height, width) + if len(ret) == 0 { + panic("no return value specified for VirtualizationResize") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, uint, uint) error); ok { r0 = rf(ctx, ID, height, width) @@ -787,6 +931,10 @@ func (_m *API) VirtualizationResize(ctx context.Context, ID string, height uint, func (_m *API) VirtualizationResume(ctx context.Context, ID string) error { ret := _m.Called(ctx, ID) + if len(ret) == 0 { + panic("no return value specified for VirtualizationResume") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(ctx, ID) @@ -801,6 +949,10 @@ func (_m *API) VirtualizationResume(ctx context.Context, ID string) error { func (_m *API) VirtualizationStart(ctx context.Context, ID string) error { ret := _m.Called(ctx, ID) + if len(ret) == 0 { + panic("no return value specified for VirtualizationStart") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(ctx, ID) @@ -815,6 +967,10 @@ func (_m *API) VirtualizationStart(ctx context.Context, ID string) error { func (_m *API) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error { ret := _m.Called(ctx, ID, gracefulTimeout) + if len(ret) == 0 { + panic("no return value specified for VirtualizationStop") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, time.Duration) error); ok { r0 = rf(ctx, ID, gracefulTimeout) @@ -829,6 +985,10 @@ func (_m *API) VirtualizationStop(ctx context.Context, ID string, gracefulTimeou func (_m *API) VirtualizationSuspend(ctx context.Context, ID string) error { ret := _m.Called(ctx, ID) + if len(ret) == 0 { + panic("no return value specified for VirtualizationSuspend") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(ctx, ID) @@ -843,6 +1003,10 @@ func (_m *API) VirtualizationSuspend(ctx context.Context, ID string) error { func (_m *API) VirtualizationUpdateResource(ctx context.Context, ID string, params resourcetypes.Resources) error { ret := _m.Called(ctx, ID, params) + if len(ret) == 0 { + panic("no return value specified for VirtualizationUpdateResource") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, resourcetypes.Resources) error); ok { r0 = rf(ctx, ID, params) @@ -857,6 +1021,10 @@ func (_m *API) VirtualizationUpdateResource(ctx context.Context, ID string, para func (_m *API) VirtualizationWait(ctx context.Context, ID string, state string) (*types.VirtualizationWaitResult, error) { ret := _m.Called(ctx, ID, state) + if len(ret) == 0 { + panic("no return value specified for VirtualizationWait") + } + var r0 *types.VirtualizationWaitResult var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string) (*types.VirtualizationWaitResult, error)); ok { diff --git a/engine/mocks/fakeengine/mock.go b/engine/mocks/fakeengine/mock.go index e5abe2b49..f86db582e 100644 --- a/engine/mocks/fakeengine/mock.go +++ b/engine/mocks/fakeengine/mock.go @@ -36,11 +36,19 @@ func (wc *writeCloser) Close() error { } // MakeClient make a mock client -func MakeClient(_ context.Context, _ coretypes.Config, _, _, _, _, _ string) (engine.API, error) { +func MakeClient(_ context.Context, _ coretypes.Config, nodename, endpoint, ca, cert, key string) (engine.API, error) { e := &enginemocks.API{} + parmas := &enginetypes.Params{ + Nodename: nodename, + Endpoint: endpoint, + CA: ca, + Cert: cert, + Key: key, + } // info e.On("Info", mock.Anything).Return(&enginetypes.Info{NCPU: 100, MemTotal: units.GiB * 100, StorageTotal: units.GiB * 100}, nil) e.On("Ping", mock.Anything).Return(nil) + e.On("GetParams").Return(parmas, nil) // exec var execID string e.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return( diff --git a/engine/systemd/systemd.go b/engine/systemd/systemd.go index cbed5b32f..32b476bd1 100644 --- a/engine/systemd/systemd.go +++ b/engine/systemd/systemd.go @@ -5,6 +5,7 @@ import ( "github.com/projecteru2/core/engine" "github.com/projecteru2/core/engine/docker" + enginetypes "github.com/projecteru2/core/engine/types" coretypes "github.com/projecteru2/core/types" ) @@ -15,6 +16,7 @@ const TCPPrefix = "systemd://" type Engine struct { engine.API config coretypes.Config + ep *enginetypes.Params } // MakeClient make systemd cli @@ -23,8 +25,20 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint if err != nil { return nil, err } + ep := &enginetypes.Params{ + Nodename: nodename, + Endpoint: endpoint, + CA: ca, + Cert: cert, + Key: key, + } return &Engine{ API: api, config: config, + ep: ep, }, nil } + +func (e *Engine) GetParams() *enginetypes.Params { + return e.ep +} diff --git a/engine/types/params.go b/engine/types/params.go new file mode 100644 index 000000000..b5cce8e10 --- /dev/null +++ b/engine/types/params.go @@ -0,0 +1,27 @@ +package types + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" +) + +type Params struct { + Nodename string + Endpoint string + CA string + Cert string + Key string +} + +func (p *Params) CacheKey() string { + return fmt.Sprintf("%+v-%+v", p.Endpoint, sha256String(fmt.Sprintf(":%+v:%+v:%+v", p.CA, p.Cert, p.Key))[:8]) +} + +// to avoid import cycle, don't use utils.SHA256 +func sha256String(input string) string { + c := sha256.New() + c.Write([]byte(input)) + bytes := c.Sum(nil) + return hex.EncodeToString(bytes) +} diff --git a/engine/virt/virt.go b/engine/virt/virt.go index 31a93abff..b22fe3004 100644 --- a/engine/virt/virt.go +++ b/engine/virt/virt.go @@ -39,10 +39,11 @@ const ( type Virt struct { client virtapi.Client config coretypes.Config + ep *enginetypes.Params } // MakeClient makes a virt. client which wraps yavirt API client. -func MakeClient(_ context.Context, config coretypes.Config, nodename, endpoint, ca, _, _ string) (engine.API, error) { +func MakeClient(_ context.Context, config coretypes.Config, nodename, endpoint, ca, cert, key string) (engine.API, error) { var uri string switch { case strings.HasPrefix(endpoint, GRPCPrefixKey): @@ -69,7 +70,14 @@ func MakeClient(_ context.Context, config coretypes.Config, nodename, endpoint, if err != nil { return nil, err } - return &Virt{cli, config}, nil + ep := &enginetypes.Params{ + Nodename: nodename, + Endpoint: endpoint, + CA: ca, + Cert: cert, + Key: key, + } + return &Virt{client: cli, config: config, ep: ep}, nil } // Info shows a connected node's information. @@ -89,6 +97,10 @@ func (v *Virt) Info(ctx context.Context) (*enginetypes.Info, error) { }, nil } +func (v *Virt) GetParams() *enginetypes.Params { + return v.ep +} + // Ping tests connection. func (v *Virt) Ping(ctx context.Context) error { _, err := v.client.Info(ctx) diff --git a/go.mod b/go.mod index 810d0a3c4..de16a2d8e 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,6 @@ require ( github.com/muroq/redislock v0.0.0-20210327061935-5425e33e6f9f github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b github.com/panjf2000/ants/v2 v2.7.3 - github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/projecteru2/libyavirt v0.0.0-20230921032447-a617cf0c746c github.com/prometheus/client_golang v1.15.0 diff --git a/go.sum b/go.sum index 81987fa87..e06c3567c 100644 --- a/go.sum +++ b/go.sum @@ -354,8 +354,6 @@ github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/panjf2000/ants/v2 v2.7.3 h1:rHQ0hH0DQvuNUqqlWIMJtkMcDuL1uQAfpX2mIhQ5/s0= github.com/panjf2000/ants/v2 v2.7.3/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 4b6db2501..821f20d33 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -16,6 +16,7 @@ import ( enginefactory "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/engine/mocks/fakeengine" + enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" @@ -331,7 +332,14 @@ func (m *Mercury) doGetNodes( if err := json.Unmarshal(ev.Value, node); err != nil { return nil, err } - node.Engine = &fake.EngineWithErr{DefaultErr: types.ErrNilEngine} + ep := enginetypes.Params{ + Nodename: node.Name, + Endpoint: node.Endpoint, + CA: node.Ca, + Cert: node.Cert, + Key: node.Key, + } + node.Engine = &fake.EngineWithErr{DefaultErr: types.ErrNilEngine, EP: &ep} if utils.LabelsFilter(node.Labels, labels) { allNodes = append(allNodes, node) } diff --git a/store/redis/node.go b/store/redis/node.go index 575286521..e380d1346 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -13,6 +13,7 @@ import ( enginefactory "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/engine/mocks/fakeengine" + enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" @@ -312,7 +313,14 @@ func (r *Rediaron) doGetNodes( if err := json.Unmarshal([]byte(value), node); err != nil { return nil, err } - node.Engine = &fake.EngineWithErr{DefaultErr: types.ErrNilEngine} + ep := enginetypes.Params{ + Nodename: node.Name, + Endpoint: node.Endpoint, + CA: node.Ca, + Cert: node.Cert, + Key: node.Key, + } + node.Engine = &fake.EngineWithErr{DefaultErr: types.ErrNilEngine, EP: &ep} if utils.LabelsFilter(node.Labels, labels) { allNodes = append(allNodes, node) } diff --git a/utils/cache.go b/utils/cache.go deleted file mode 100644 index f9af90f22..000000000 --- a/utils/cache.go +++ /dev/null @@ -1,41 +0,0 @@ -package utils - -import ( - "time" - - "github.com/projecteru2/core/engine" - - "github.com/patrickmn/go-cache" -) - -// EngineCache connections -// otherwise they'll leak -type EngineCache struct { - cache *cache.Cache -} - -// NewEngineCache creates Cache instance -func NewEngineCache(expire time.Duration, cleanupInterval time.Duration) *EngineCache { - return &EngineCache{ - cache: cache.New(expire, cleanupInterval), - } -} - -// Set connection with host -func (c *EngineCache) Set(endpoint string, client engine.API) { - c.cache.Set(endpoint, client, cache.DefaultExpiration) -} - -// Get connection by host -func (c *EngineCache) Get(endpoint string) engine.API { - e, found := c.cache.Get(endpoint) - if found { - return e.(engine.API) - } - return nil -} - -// Delete connection by host -func (c *EngineCache) Delete(host string, _ ...string) { - c.cache.Delete(host) -} diff --git a/utils/cache_test.go b/utils/cache_test.go deleted file mode 100644 index 57e38a9ef..000000000 --- a/utils/cache_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package utils - -import ( - "testing" - "time" - - enginemocks "github.com/projecteru2/core/engine/mocks" - - "github.com/stretchr/testify/assert" -) - -func TestCache(t *testing.T) { - c := NewEngineCache(2*time.Second, time.Second) - - host := "1.1.1.1" - cli := &enginemocks.API{} - c.Set(host, cli) - assert.Equal(t, c.Get(host), cli) - c.Delete(host) - assert.Nil(t, c.Get(host)) - time.Sleep(3 * time.Second) - assert.Nil(t, c.Get(host)) -}