From d6f600a745b32e34925ec032a3dc00ae912587e2 Mon Sep 17 00:00:00 2001 From: vdaas-ci <57942646+vdaas-ci@users.noreply.github.com> Date: Wed, 11 Sep 2024 16:21:07 +0900 Subject: [PATCH] Bugfix NGT flush logic (#2598) (#2606) * fix: bugfix flush logic * fix: nil check for flushing * fix: add flush check logic * fix: nil check bug * fix: add nil check * fix: return err when the flush process is executing * fix: add error check for flushing * fix: error message * fix: disable kvs and vqueue initialization * fix: disable commentout * fix: disable kvs and vq * fix: nil set to kvs and vq * fix: copy ngt service object for flushing * fix: deleted unnecessary nil check * fix: variable name --------- Signed-off-by: hlts2 Co-authored-by: Hiroto Funakoshi Co-authored-by: Yusuke Kato Co-authored-by: Kiichiro YUKAWA --- pkg/agent/core/ngt/handler/grpc/index.go | 6 ++ pkg/agent/core/ngt/service/ngt.go | 73 +++++++++++++++--------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/agent/core/ngt/handler/grpc/index.go b/pkg/agent/core/ngt/handler/grpc/index.go index 62df71e180..8e7f24d9f6 100644 --- a/pkg/agent/core/ngt/handler/grpc/index.go +++ b/pkg/agent/core/ngt/handler/grpc/index.go @@ -64,6 +64,9 @@ func (s *server) CreateIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled @@ -149,6 +152,9 @@ func (s *server) CreateAndSaveIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 095f0b6f0a..09bf8577f5 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -260,6 +260,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) { return n, nil } +func (n *ngt) copyNGT(src *ngt) { + // instances + n.core = src.core + n.kvs = src.kvs + n.fmap = src.fmap + n.vq = src.vq + + // counters + n.wfci = src.wfci + n.nobic = src.nobic + n.nopvq = atomic.Uint64{} + + // paths + n.path = src.path + n.tmpPath = src.tmpPath + n.oldPath = src.oldPath + n.basePath = src.basePath + n.brokenPath = src.brokenPath +} + // migrate migrates the index directory from old to new under the input path if necessary. // Migration happens when the path is not empty and there is no `path/origin` directory, // which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet. @@ -908,7 +928,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { } return ctx.Err() case <-tick.C: - if n.vq.IVQLen() >= n.alen { + if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { err = n.CreateIndex(ctx, n.poolSize) } case <-limit.C: @@ -1242,14 +1262,12 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil // gc runtime.GC() atomic.AddUint64(&n.nogce, 1) - if n.inMem { + if !n.inMem { // delete file err = file.DeleteDir(ctx, n.path) if err != nil { @@ -1265,30 +1283,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } } - nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) - - nvq, err := vqueue.New() - if err != nil { - log.Errorf("failed to create new vector vector queue. error: %v", err) - } - // renew instance nn, err := newNGT(n.cfg, n.opts...) if err != nil { return err } - nn.kvs = nkvs - nn.vq = nvq - - // Regenerate with flags set - nn.flushing.Store(true) - nn.indexing.Store(true) - defer nn.flushing.Store(false) - defer nn.indexing.Store(false) + n.copyNGT(nn) - n = nn - - return nil + return n.loadStatistics() } func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { @@ -1299,8 +1301,11 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } }() - if n.isReadReplica { + switch { + case n.isReadReplica: return errors.ErrWriteOperationToReadReplica + case n.IsFlushing(): + return errors.ErrFlushingIsInProgress } ic := n.vq.IVQLen() + n.vq.DVQLen() @@ -1428,6 +1433,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { return err } } + return n.loadStatistics() +} + +func (n *ngt) loadStatistics() error { if n.IsStatisticsEnabled() { log.Info("loading index statistics to cache") stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics) @@ -1471,8 +1480,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { IndegreeHistogram: stats.IndegreeHistogram, }) } - - return err + return nil } func (n *ngt) removeInvalidIndex(ctx context.Context) { @@ -1941,15 +1949,24 @@ func (n *ngt) gc() { } func (n *ngt) Len() uint64 { - return n.kvs.Len() + if n.kvs != nil && !n.IsFlushing() { + return n.kvs.Len() + } + return 0 } func (n *ngt) InsertVQueueBufferLen() uint64 { - return uint64(n.vq.IVQLen()) + if n.vq != nil && !n.IsFlushing() { + return uint64(n.vq.IVQLen()) + } + return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - return uint64(n.vq.DVQLen()) + if n.vq != nil && !n.IsFlushing() { + return uint64(n.vq.DVQLen()) + } + return 0 } func (n *ngt) GetDimensionSize() int {