diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go index 1be495ed46d9..8eae835b441c 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go @@ -41,7 +41,7 @@ type IndexSet interface { LastUsedAt() time.Time UpdateLastUsedAt() Sync(ctx context.Context) (err error) - AwaitReady(ctx context.Context) error + AwaitReady(ctx context.Context, reason string) error } // indexSet is a collection of multiple files created for a same table by various ingesters. @@ -62,8 +62,7 @@ type indexSet struct { cancelFunc context.CancelFunc // helps with cancellation of initialization if we are asked to stop. } -func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.IndexSet, openIndexFileFunc index.OpenIndexFileFunc, - logger log.Logger) (IndexSet, error) { +func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.IndexSet, openIndexFileFunc index.OpenIndexFileFunc, logger log.Logger) (IndexSet, error) { if baseIndexSet.IsUserBasedIndexSet() && userID == "" { return nil, fmt.Errorf("userID must not be empty") } else if !baseIndexSet.IsUserBasedIndexSet() && userID != "" { @@ -75,10 +74,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I return nil, err } - maxConcurrent := runtime.GOMAXPROCS(0) / 2 - if maxConcurrent == 0 { - maxConcurrent = 1 - } + maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1) is := indexSet{ openIndexFileFunc: openIndexFileFunc, @@ -101,25 +97,25 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I func (t *indexSet) Init(forQuerying bool) (err error) { // Using background context to avoid cancellation of download when request times out. // We would anyways need the files for serving next requests. - ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout) - t.cancelFunc = cancelFunc + ctx := context.Background() + ctx, t.cancelFunc = context.WithTimeout(ctx, downloadTimeout) - logger := spanlogger.FromContextWithFallback(ctx, t.logger) + logger, ctx := spanlogger.NewWithLogger(ctx, t.logger, "indexSet.Init") defer func() { if err != nil { - level.Error(t.logger).Log("msg", fmt.Sprintf("failed to initialize table %s, cleaning it up", t.tableName), "err", err) + level.Error(logger).Log("msg", "failed to initialize table, cleaning it up", "table", t.tableName, "err", err) t.err = err // cleaning up files due to error to avoid returning invalid results. for fileName := range t.index { if err := t.cleanupDB(fileName); err != nil { - level.Error(t.logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) + level.Error(logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) } } } - t.cancelFunc() t.indexMtx.markReady() + t.cancelFunc() }() dirEntries, err := os.ReadDir(t.cacheLocation) @@ -137,12 +133,12 @@ func (t *indexSet) Init(forQuerying bool) (err error) { // if we fail to open an index file, lets skip it and let sync operation re-download the file from storage. idx, err := t.openIndexFileFunc(fullPath) if err != nil { - level.Error(t.logger).Log("msg", fmt.Sprintf("failed to open existing index file %s, removing the file and continuing without it to let the sync operation catch up", fullPath), "err", err) + level.Error(logger).Log("msg", fmt.Sprintf("failed to open existing index file %s, removing the file and continuing without it to let the sync operation catch up", fullPath), "err", err) // Sometimes files get corrupted when the process gets killed in the middle of a download operation which can cause problems in reading the file. // Implementation of openIndexFileFunc should take care of gracefully handling corrupted files. // Let us just remove the file and let the sync operation re-download it. if err := os.Remove(fullPath); err != nil { - level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove index file %s which failed to open", fullPath)) + level.Error(logger).Log("msg", fmt.Sprintf("failed to remove index file %s which failed to open", fullPath)) } continue } @@ -406,8 +402,18 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListC return } -func (t *indexSet) AwaitReady(ctx context.Context) error { - return t.indexMtx.awaitReady(ctx) +func (t *indexSet) AwaitReady(ctx context.Context, reason string) error { + start := time.Now() + err := t.indexMtx.awaitReady(ctx) + level.Info(t.logger).Log( + "msg", "waited for index set to become ready", + "reason", reason, + "table", t.tableName, + "user", t.userID, + "wait_time", time.Since(start), + "err", err, + ) + return err } func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) (string, error) { diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/metrics.go b/pkg/storage/stores/shipper/indexshipper/downloads/metrics.go index d037af15f654..4a9894368dd7 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/metrics.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/metrics.go @@ -14,6 +14,10 @@ type metrics struct { queryTimeTableDownloadDurationSeconds *prometheus.CounterVec tablesSyncOperationTotal *prometheus.CounterVec tablesDownloadOperationDurationSeconds prometheus.Gauge + + // new metrics that will supersed the incorrect old types + queryWaitTime *prometheus.HistogramVec + tableSyncLatency *prometheus.HistogramVec } func newMetrics(r prometheus.Registerer) *metrics { @@ -30,6 +34,15 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "tables_download_operation_duration_seconds", Help: "Time (in seconds) spent in downloading updated files for all the tables", }), + + queryWaitTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "query_wait_time_seconds", + Help: "Time (in seconds) spent waiting for index files to be queryable at query time", + }, []string{"table"}), + tableSyncLatency: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "table_sync_latency_seconds", + Help: "Time (in seconds) spent in downloading updated files for all the tables", + }, []string{"table", "status"}), } return m diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/table.go b/pkg/storage/stores/shipper/indexshipper/downloads/table.go index 476786130059..f329c3b41dcd 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/table.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/table.go @@ -24,7 +24,7 @@ import ( // timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time. const ( - downloadTimeout = 5 * time.Minute + downloadTimeout = 1 * time.Minute maxDownloadConcurrency = 50 ) @@ -57,12 +57,8 @@ type table struct { // NewTable just creates an instance of table without trying to load files from local storage or object store. // It is used for initializing table at query time. func NewTable(name, cacheLocation string, storageClient storage.Client, openIndexFileFunc index.OpenIndexFileFunc, metrics *metrics) Table { - maxConcurrent := runtime.GOMAXPROCS(0) / 2 - if maxConcurrent == 0 { - maxConcurrent = 1 - } - - table := table{ + maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1) + return &table{ name: name, cacheLocation: cacheLocation, storageClient: storageClient, @@ -74,8 +70,6 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, openInde maxConcurrent: maxConcurrent, indexSets: map[string]IndexSet{}, } - - return &table } // LoadTable loads a table from local storage(syncs the table too if we have it locally) or downloads it from the shared store. @@ -91,10 +85,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd return nil, err } - maxConcurrent := runtime.GOMAXPROCS(0) / 2 - if maxConcurrent == 0 { - maxConcurrent = 1 - } + maxConcurrent := max(runtime.GOMAXPROCS(0)/2, 1) table := table{ name: name, @@ -296,6 +287,8 @@ func (t *table) Sync(ctx context.Context) error { // forQuerying must be set to true only getting the index for querying since // it captures the amount of time it takes to download the index at query time. func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) { + logger := spanlogger.FromContextWithFallback(ctx, log.With(t.logger, "user", id, "table", t.name)) + t.indexSetsMtx.RLock() indexSet, ok := t.indexSets[id] t.indexSetsMtx.RUnlock() @@ -318,28 +311,29 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying } // instantiate the index set, add it to the map - indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, - loggerWithUserID(t.logger, id)) + indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, logger) if err != nil { return nil, err } t.indexSets[id] = indexSet - // initialize the index set in async mode, it would be upto the caller to wait for its readiness using IndexSet.AwaitReady() + // initialize the index set in async mode + // it is up to the caller to wait for its readiness using IndexSet.AwaitReady() go func() { + start := time.Now() + err := indexSet.Init(forQuerying) + duration := time.Since(start) + + level.Info(logger).Log("msg", "init index set", "duration", duration, "success", err == nil) + if forQuerying { - start := time.Now() - defer func() { - duration := time.Since(start) - t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds()) - logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id)) - level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration) - }() + t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration.Seconds()) + t.metrics.queryWaitTime.WithLabelValues(t.name).Observe(duration.Seconds()) + level.Info(logger).Log("msg", "downloaded index set at query time", "duration", duration) } - err := indexSet.Init(forQuerying) if err != nil { - level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err) + level.Error(logger).Log("msg", "failed to init user index set", "err", err) t.cleanupBrokenIndexSet(ctx, id) } }() @@ -372,7 +366,7 @@ func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) erro if err != nil { return err } - err = commonIndexSet.AwaitReady(ctx) + err = commonIndexSet.AwaitReady(ctx, "ensure query readiness") if err != nil { return err } @@ -401,7 +395,7 @@ func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error return err } - return indexSet.AwaitReady(ctx) + return indexSet.AwaitReady(ctx, "download user indexes") }) } diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go b/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go index 612f1d1eaa2a..6b6927259378 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -131,7 +132,7 @@ func (tm *tableManager) loop() { case <-syncTicker.C: err := tm.syncTables(tm.ctx) if err != nil { - level.Error(tm.logger).Log("msg", "error syncing local boltdb files with storage", "err", err) + level.Error(tm.logger).Log("msg", "error syncing local index files with storage", "err", err) } // we need to keep ensuring query readiness to download every days new table which would otherwise be downloaded only during queries. @@ -180,10 +181,13 @@ func (tm *tableManager) ForEach(ctx context.Context, tableName, userID string, c func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) { // if table is already there, use it. + start := time.Now() tm.tablesMtx.RLock() table, ok := tm.tables[tableName] tm.tablesMtx.RUnlock() + level.Info(tm.logger).Log("msg", "get or create table", "found", ok, "table", tableName, "wait_for_lock", time.Since(start)) + if !ok { tm.tablesMtx.Lock() defer tm.tablesMtx.Unlock() @@ -192,7 +196,7 @@ func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) { table, ok = tm.tables[tableName] if !ok { // table not found, creating one. - level.Info(tm.logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName)) + level.Info(tm.logger).Log("msg", "downloading all files for table", "table", tableName) tablePath := filepath.Join(tm.cfg.CacheDir, tableName) err := util.EnsureDirectory(tablePath) @@ -227,11 +231,16 @@ func (tm *tableManager) syncTables(ctx context.Context) error { level.Info(tm.logger).Log("msg", "syncing tables") - for _, table := range tm.tables { + for name, table := range tm.tables { + level.Debug(tm.logger).Log("msg", "syncing table", "table", name) + start := time.Now() err := table.Sync(ctx) + duration := float64(time.Since(start)) if err != nil { - return err + tm.metrics.tableSyncLatency.WithLabelValues(name, statusFailure).Observe(duration) + return errors.Wrapf(err, "failed to sync table '%s'", name) } + tm.metrics.tableSyncLatency.WithLabelValues(name, statusSuccess).Observe(duration) } return nil @@ -244,10 +253,10 @@ func (tm *tableManager) cleanupCache() error { level.Info(tm.logger).Log("msg", "cleaning tables cache") for name, table := range tm.tables { - level.Info(tm.logger).Log("msg", fmt.Sprintf("cleaning up expired table %s", name)) + level.Debug(tm.logger).Log("msg", "cleaning up expired table", "table", name) isEmpty, err := table.DropUnusedIndex(tm.cfg.CacheTTL, time.Now()) if err != nil { - return err + return errors.Wrapf(err, "failed to clean up expired table '%s'", name) } if isEmpty {