Skip to content

Commit

Permalink
fix: copy ngt service object for flushing
Browse files Browse the repository at this point in the history
Signed-off-by: hlts2 <[email protected]>
  • Loading branch information
hlts2 committed Sep 10, 2024
1 parent b3151dc commit d4c79e2
Showing 1 changed file with 27 additions and 22 deletions.
49 changes: 27 additions & 22 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) {
return n, nil
}

func (n *ngt) copyNGT(obj *ngt) {
// instances
n.core = obj.core
n.kvs = obj.kvs
n.fmap = obj.fmap
n.vq = obj.vq

// counters
n.wfci = obj.wfci
n.nobic = obj.nobic
n.nopvq = atomic.Uint64{}

// paths
n.path = obj.path
n.tmpPath = obj.tmpPath
n.oldPath = obj.oldPath
n.basePath = obj.basePath
n.brokenPath = obj.brokenPath
}

// migrate migrates the index directory from old to new under the input path if necessary.
// Migration happens when the path is not empty and there is no `path/origin` directory,
// which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet.
Expand Down Expand Up @@ -1242,8 +1262,6 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
if err != nil {
log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err)
}
n.kvs = nil
n.vq = nil

// gc
runtime.GC()
Expand All @@ -1265,30 +1283,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
}
}

// nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))
//
// nvq, err := vqueue.New()
// if err != nil {
// log.Errorf("failed to create new vector vector queue. error: %v", err)
// }

// renew instance
nn, err := newNGT(n.cfg, n.opts...)
if err != nil {
return err
}
// nn.kvs = nkvs
// nn.vq = nvq

// Regenerate with flags set
nn.flushing.Store(true)
nn.indexing.Store(true)
defer nn.flushing.Store(false)
defer nn.indexing.Store(false)
n.copyNGT(nn)

n = nn

return nil
return n.loadStatistics()
}

func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
Expand Down Expand Up @@ -1431,6 +1433,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
return err
}
}
return n.loadStatistics()
}

func (n *ngt) loadStatistics() error {
if n.IsStatisticsEnabled() {
log.Info("loading index statistics to cache")
stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics)
Expand Down Expand Up @@ -1474,8 +1480,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
IndegreeHistogram: stats.IndegreeHistogram,
})
}

return err
return nil
}

func (n *ngt) removeInvalidIndex(ctx context.Context) {
Expand Down

0 comments on commit d4c79e2

Please sign in to comment.