Skip to content

Commit

Permalink
chore: [k207] chore(index-gateway): Improve instrumentation of index …
Browse files Browse the repository at this point in the history
…download/sync (#13280)

Co-authored-by: Christian Haudum <[email protected]>
  • Loading branch information
grafanabot and chaudum authored Jun 21, 2024
1 parent af567fb commit 1892816
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 50 deletions.
40 changes: 23 additions & 17 deletions pkg/storage/stores/shipper/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type IndexSet interface {
LastUsedAt() time.Time
UpdateLastUsedAt()
Sync(ctx context.Context) (err error)
AwaitReady(ctx context.Context) error
AwaitReady(ctx context.Context, reason string) error
}

// indexSet is a collection of multiple files created for a same table by various ingesters.
Expand All @@ -62,8 +62,7 @@ type indexSet struct {
cancelFunc context.CancelFunc // helps with cancellation of initialization if we are asked to stop.
}

func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.IndexSet, openIndexFileFunc index.OpenIndexFileFunc,
logger log.Logger) (IndexSet, error) {
func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.IndexSet, openIndexFileFunc index.OpenIndexFileFunc, logger log.Logger) (IndexSet, error) {
if baseIndexSet.IsUserBasedIndexSet() && userID == "" {
return nil, fmt.Errorf("userID must not be empty")
} else if !baseIndexSet.IsUserBasedIndexSet() && userID != "" {
Expand All @@ -75,10 +74,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
return nil, err
}

maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}
maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1)

is := indexSet{
openIndexFileFunc: openIndexFileFunc,
Expand All @@ -101,25 +97,25 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
func (t *indexSet) Init(forQuerying bool) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout)
t.cancelFunc = cancelFunc
ctx := context.Background()
ctx, t.cancelFunc = context.WithTimeout(ctx, downloadTimeout)

logger := spanlogger.FromContextWithFallback(ctx, t.logger)
logger, ctx := spanlogger.NewWithLogger(ctx, t.logger, "indexSet.Init")

defer func() {
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to initialize table %s, cleaning it up", t.tableName), "err", err)
level.Error(logger).Log("msg", "failed to initialize table, cleaning it up", "table", t.tableName, "err", err)
t.err = err

// cleaning up files due to error to avoid returning invalid results.
for fileName := range t.index {
if err := t.cleanupDB(fileName); err != nil {
level.Error(t.logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
level.Error(logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err)
}
}
}
t.cancelFunc()
t.indexMtx.markReady()
t.cancelFunc()
}()

dirEntries, err := os.ReadDir(t.cacheLocation)
Expand All @@ -137,12 +133,12 @@ func (t *indexSet) Init(forQuerying bool) (err error) {
// if we fail to open an index file, lets skip it and let sync operation re-download the file from storage.
idx, err := t.openIndexFileFunc(fullPath)
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to open existing index file %s, removing the file and continuing without it to let the sync operation catch up", fullPath), "err", err)
level.Error(logger).Log("msg", fmt.Sprintf("failed to open existing index file %s, removing the file and continuing without it to let the sync operation catch up", fullPath), "err", err)
// Sometimes files get corrupted when the process gets killed in the middle of a download operation which can cause problems in reading the file.
// Implementation of openIndexFileFunc should take care of gracefully handling corrupted files.
// Let us just remove the file and let the sync operation re-download it.
if err := os.Remove(fullPath); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove index file %s which failed to open", fullPath))
level.Error(logger).Log("msg", fmt.Sprintf("failed to remove index file %s which failed to open", fullPath))
}
continue
}
Expand Down Expand Up @@ -406,8 +402,18 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListC
return
}

func (t *indexSet) AwaitReady(ctx context.Context) error {
return t.indexMtx.awaitReady(ctx)
func (t *indexSet) AwaitReady(ctx context.Context, reason string) error {
start := time.Now()
err := t.indexMtx.awaitReady(ctx)
level.Info(t.logger).Log(
"msg", "waited for index set to become ready",
"reason", reason,
"table", t.tableName,
"user", t.userID,
"wait_time", time.Since(start),
"err", err,
)
return err
}

func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) (string, error) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/stores/shipper/indexshipper/downloads/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type metrics struct {
queryTimeTableDownloadDurationSeconds *prometheus.CounterVec
tablesSyncOperationTotal *prometheus.CounterVec
tablesDownloadOperationDurationSeconds prometheus.Gauge

// new metrics that will supersed the incorrect old types
queryWaitTime *prometheus.HistogramVec
tableSyncLatency *prometheus.HistogramVec
}

func newMetrics(r prometheus.Registerer) *metrics {
Expand All @@ -30,6 +34,15 @@ func newMetrics(r prometheus.Registerer) *metrics {
Name: "tables_download_operation_duration_seconds",
Help: "Time (in seconds) spent in downloading updated files for all the tables",
}),

queryWaitTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "query_wait_time_seconds",
Help: "Time (in seconds) spent waiting for index files to be queryable at query time",
}, []string{"table"}),
tableSyncLatency: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "table_sync_latency_seconds",
Help: "Time (in seconds) spent in downloading updated files for all the tables",
}, []string{"table", "status"}),
}

return m
Expand Down
48 changes: 21 additions & 27 deletions pkg/storage/stores/shipper/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time.
const (
downloadTimeout = 5 * time.Minute
downloadTimeout = 1 * time.Minute
maxDownloadConcurrency = 50
)

Expand Down Expand Up @@ -57,12 +57,8 @@ type table struct {
// NewTable just creates an instance of table without trying to load files from local storage or object store.
// It is used for initializing table at query time.
func NewTable(name, cacheLocation string, storageClient storage.Client, openIndexFileFunc index.OpenIndexFileFunc, metrics *metrics) Table {
maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}

table := table{
maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1)
return &table{
name: name,
cacheLocation: cacheLocation,
storageClient: storageClient,
Expand All @@ -74,8 +70,6 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, openInde
maxConcurrent: maxConcurrent,
indexSets: map[string]IndexSet{},
}

return &table
}

// LoadTable loads a table from local storage(syncs the table too if we have it locally) or downloads it from the shared store.
Expand All @@ -91,10 +85,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}
maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1)

table := table{
name: name,
Expand Down Expand Up @@ -296,6 +287,8 @@ func (t *table) Sync(ctx context.Context) error {
// forQuerying must be set to true only getting the index for querying since
// it captures the amount of time it takes to download the index at query time.
func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) {
logger := spanlogger.FromContextWithFallback(ctx, log.With(t.logger, "user", id, "table", t.name))

t.indexSetsMtx.RLock()
indexSet, ok := t.indexSets[id]
t.indexSetsMtx.RUnlock()
Expand All @@ -318,28 +311,29 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}

// instantiate the index set, add it to the map
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc,
loggerWithUserID(t.logger, id))
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, logger)
if err != nil {
return nil, err
}
t.indexSets[id] = indexSet

// initialize the index set in async mode, it would be upto the caller to wait for its readiness using IndexSet.AwaitReady()
// initialize the index set in async mode
// it is up to the caller to wait for its readiness using IndexSet.AwaitReady()
go func() {
start := time.Now()
err := indexSet.Init(forQuerying)
duration := time.Since(start)

level.Info(logger).Log("msg", "init index set", "duration", duration, "success", err == nil)

if forQuerying {
start := time.Now()
defer func() {
duration := time.Since(start)
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds())
logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id))
level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration)
}()
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds())
t.metrics.queryWaitTime.WithLabelValues(t.name).Observe(duration.Seconds())
level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration)
}

err := indexSet.Init(forQuerying)
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
level.Error(logger).Log("msg", "failed to init user index set", "err", err)
t.cleanupBrokenIndexSet(ctx, id)
}
}()
Expand Down Expand Up @@ -372,7 +366,7 @@ func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) erro
if err != nil {
return err
}
err = commonIndexSet.AwaitReady(ctx)
err = commonIndexSet.AwaitReady(ctx, "ensure query readiness")
if err != nil {
return err
}
Expand Down Expand Up @@ -401,7 +395,7 @@ func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error
return err
}

return indexSet.AwaitReady(ctx)
return indexSet.AwaitReady(ctx, "download user indexes")
})
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

Expand Down Expand Up @@ -131,7 +132,7 @@ func (tm *tableManager) loop() {
case <-syncTicker.C:
err := tm.syncTables(tm.ctx)
if err != nil {
level.Error(tm.logger).Log("msg", "error syncing local boltdb files with storage", "err", err)
level.Error(tm.logger).Log("msg", "error syncing local index files with storage", "err", err)
}

// we need to keep ensuring query readiness to download every days new table which would otherwise be downloaded only during queries.
Expand Down Expand Up @@ -180,10 +181,13 @@ func (tm *tableManager) ForEach(ctx context.Context, tableName, userID string, c

func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {
// if table is already there, use it.
start := time.Now()
tm.tablesMtx.RLock()
table, ok := tm.tables[tableName]
tm.tablesMtx.RUnlock()

level.Info(tm.logger).Log("msg", "get or create table", "found", ok, "table", tableName, "wait_for_lock", time.Since(start))

if !ok {
tm.tablesMtx.Lock()
defer tm.tablesMtx.Unlock()
Expand All @@ -192,7 +196,7 @@ func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {
table, ok = tm.tables[tableName]
if !ok {
// table not found, creating one.
level.Info(tm.logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName))
level.Info(tm.logger).Log("msg", "downloading all files for table", "table", tableName)

tablePath := filepath.Join(tm.cfg.CacheDir, tableName)
err := util.EnsureDirectory(tablePath)
Expand Down Expand Up @@ -227,11 +231,16 @@ func (tm *tableManager) syncTables(ctx context.Context) error {

level.Info(tm.logger).Log("msg", "syncing tables")

for _, table := range tm.tables {
for name, table := range tm.tables {
level.Debug(tm.logger).Log("msg", "syncing table", "table", name)
start := time.Now()
err := table.Sync(ctx)
duration := float64(time.Since(start))
if err != nil {
return err
tm.metrics.tableSyncLatency.WithLabelValues(name, statusFailure).Observe(duration)
return errors.Wrapf(err, "failed to sync table '%s'", name)
}
tm.metrics.tableSyncLatency.WithLabelValues(name, statusSuccess).Observe(duration)
}

return nil
Expand All @@ -244,10 +253,10 @@ func (tm *tableManager) cleanupCache() error {
level.Info(tm.logger).Log("msg", "cleaning tables cache")

for name, table := range tm.tables {
level.Info(tm.logger).Log("msg", fmt.Sprintf("cleaning up expired table %s", name))
level.Debug(tm.logger).Log("msg", "cleaning up expired table", "table", name)
isEmpty, err := table.DropUnusedIndex(tm.cfg.CacheTTL, time.Now())
if err != nil {
return err
return errors.Wrapf(err, "failed to clean up expired table '%s'", name)
}

if isEmpty {
Expand Down

0 comments on commit 1892816

Please sign in to comment.