diff --git a/backend/cmd/analogdb/main.go b/backend/cmd/analogdb/main.go index 9c468f0..09cd1eb 100644 --- a/backend/cmd/analogdb/main.go +++ b/backend/cmd/analogdb/main.go @@ -39,20 +39,25 @@ func main() { } // create logger instance - logger, err := logger.New(cfg.Log.Level, cfg.App.Env) + logger, err := logger.New(cfg.Log.Level, cfg.App.Env, cfg.App.Name) if err != nil { err = fmt.Errorf("Failed to create logger: %w", err) fatal(nil, err) } - logger.Info().Str("app", cfg.App.Name).Str("version", cfg.App.Version).Str("env", cfg.App.Env).Str("loglevel", cfg.Log.Level).Msg("Initializing application") + logger.Info().Str("version", cfg.App.Version).Str("env", cfg.App.Env).Str("loglevel", cfg.Log.Level).Msg("Initializing application") // add slack webhook to logger to notify on error if webhookURL := cfg.Log.WebhookURL; webhookURL != "" && cfg.App.Env != "debug" { logger = logger.WithSlackNotifier(webhookURL) } + // add otel tracing to logger if enabled + if cfg.Tracing.Enabled { + logger = logger.WithTracer(cfg.App.Name) + } + // initialize otlp tracing - tracingLogger := logger.WithService("tracer") + tracingLogger := logger.WithSubsystem("tracer") tracer, err := tracer.New(tracingLogger, cfg) if err != nil { err = fmt.Errorf("Failed to initialize otlp tracing: %w", err) @@ -64,7 +69,7 @@ func main() { } // initialize prometheus metrics - metricsLogger := logger.WithService("metrics") + metricsLogger := logger.WithSubsystem("metrics") metrics, err := metrics.New(metricsLogger) if err != nil { err = fmt.Errorf("Failed to initialize prometheus metrics: %w", err) @@ -76,7 +81,7 @@ func main() { } // open connection to postgres - dbLogger := logger.WithService("database") + dbLogger := logger.WithSubsystem("database") db := postgres.NewDB(cfg.DB.URL, dbLogger, cfg.Tracing.Enabled) if err := db.Open(); err != nil { err = fmt.Errorf("Failed to startup database: %w", err) @@ -84,7 +89,7 @@ func main() { } // open connection to weaviate - dbVecLogger := logger.WithService("vector-database") + dbVecLogger := logger.WithSubsystem("vector-database") dbVec := weaviate.NewDB(cfg.VectorDB.Host, cfg.VectorDB.Scheme, dbVecLogger, tracer) if err := dbVec.Open(); err != nil { err = fmt.Errorf("Failed to startup vector database: %w", err) @@ -99,7 +104,7 @@ func main() { // open connection to redis if cache enabled var rdb *redis.RDB if cfg.App.CacheEnabled { - redisLogger := logger.WithService("redis") + redisLogger := logger.WithSubsystem("redis") rdb, err = redis.NewRDB(cfg.Redis.URL, redisLogger, metrics, cfg.Tracing.Enabled) if err != nil { err = fmt.Errorf("Failed to startup redis: %w", err) @@ -112,7 +117,7 @@ func main() { } // initialize http server - httpLogger := logger.WithService("http") + httpLogger := logger.WithSubsystem("http") server := server.New(cfg.HTTP.Port, httpLogger, metrics, cfg) // need to clean up this dependency injection diff --git a/backend/docker-compose.yaml b/backend/docker-compose.yaml index e66020c..b743c9c 100644 --- a/backend/docker-compose.yaml +++ b/backend/docker-compose.yaml @@ -76,6 +76,8 @@ services: # - redis:/bitnami/redis/data weaviate: + image: semitechnologies/weaviate:1.18.3 + container_name: weaviate command: - --host - 0.0.0.0 @@ -83,7 +85,6 @@ services: - "8080" - --scheme - http - image: semitechnologies/weaviate:1.18.3 ports: - 8081:8080 - 9092:2112 @@ -109,6 +110,7 @@ services: i2v-neural: image: semitechnologies/img2vec-pytorch:resnet50 + container_name: weaviate-i2v ports: - 8082:8080 restart: unless-stopped diff --git a/backend/go.mod b/backend/go.mod index ef039ad..87de2b0 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -18,7 +18,7 @@ require ( github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5 github.com/redis/go-redis/v9 v9.0.5 github.com/riandyrn/otelchi v0.5.1 - github.com/rs/zerolog v1.29.1 + github.com/rs/zerolog v1.30.0 github.com/weaviate/weaviate v1.18.3 github.com/weaviate/weaviate-go-client/v4 v4.7.0 go.nhat.io/otelsql v0.11.0 @@ -79,7 +79,7 @@ require ( golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sync v0.2.0 // indirect - golang.org/x/sys v0.8.0 // indirect + golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.7.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect diff --git a/backend/go.sum b/backend/go.sum index 2308dd7..262b3ec 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -1013,8 +1013,11 @@ github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4 github.com/rs/cors v1.5.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= +github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c= +github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w= github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= @@ -1564,6 +1567,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/backend/logger/logger.go b/backend/logger/logger.go index 51caa70..af04e5e 100644 --- a/backend/logger/logger.go +++ b/backend/logger/logger.go @@ -15,7 +15,7 @@ type Logger struct { zerolog.Logger } -func New(level string, env string) (*Logger, error) { +func New(level , env , app string) (*Logger, error) { switch level { case "debug": @@ -57,16 +57,18 @@ func New(level string, env string) (*Logger, error) { With(). Caller(). Timestamp(). + Str("app", app). Logger() + logger := Logger{zerologger} logger.Debug().Msg("Created new base logger") return &logger, nil } -func (l Logger) WithService(name string) *Logger { - serviceLogger := l.Logger.With().Str("service", name).Logger() - serviceLogger.Debug().Msg("Created new service logger") +func (l Logger) WithSubsystem(name string) *Logger { + serviceLogger := l.Logger.With().Str("subsystem", name).Logger() + serviceLogger.Debug().Msg("Created new subsystem logger") return &Logger{ serviceLogger, } @@ -75,8 +77,18 @@ func (l Logger) WithService(name string) *Logger { func (l Logger) WithSlackNotifier(url string) *Logger { notifier := newSlackNotifier(url) slackLogger := l.Hook(notifier) - slackLogger.Debug().Msg("Added slack notifier to logger") + slackLogger.Info().Msg("Added slack notifier to logger") return &Logger{ slackLogger, } } + +func (l Logger) WithTracer(serviceName string) *Logger { + + tracer := newTracerHook(serviceName) + tracerLogger := l.Hook(tracer) + tracerLogger.Info().Msg("Added tracer to logger") + return &Logger{ + tracerLogger, + } +} diff --git a/backend/logger/middleware.go b/backend/logger/middleware.go index d7644b9..2e5fb15 100644 --- a/backend/logger/middleware.go +++ b/backend/logger/middleware.go @@ -15,21 +15,24 @@ func Middleware(logger *Logger) func(next http.Handler) http.Handler { start := time.Now() log := logger.With().Logger() ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) + ctx := r.Context() defer func() { // Recover and record stack traces in case of a panic if rec := recover(); rec != nil { log.Error(). + Ctx(ctx). Str("type", "error"). Timestamp(). Interface("recover_info", rec). Bytes("debug_stack", debug.Stack()). - Msg("log system error") + Msg("Log system error") http.Error(ww, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } // log end request log.Info(). + Ctx(ctx). Str("type", "access"). Timestamp(). Fields(map[string]interface{}{ @@ -43,7 +46,7 @@ func Middleware(logger *Logger) func(next http.Handler) http.Handler { "bytes_in": r.Header.Get("Content-Length"), "bytes_out": ww.BytesWritten(), }). - Msg("incoming_request") + Msg("Incoming request") }() next.ServeHTTP(ww, r) diff --git a/backend/logger/slack.go b/backend/logger/slack.go index 4e67142..9bf9b9b 100644 --- a/backend/logger/slack.go +++ b/backend/logger/slack.go @@ -66,11 +66,8 @@ func (slackhook *SlackHook) shouldNotify(message string) bool { return false } -func (slackhook *SlackHook) Run( - e *zerolog.Event, - level zerolog.Level, - message string, -) { +func (slackhook *SlackHook) Run(e *zerolog.Event, level zerolog.Level, message string) { + // if the level is less than our notify threshold, don't notify if level <= zerolog.WarnLevel { return diff --git a/backend/logger/tracer.go b/backend/logger/tracer.go new file mode 100644 index 0000000..e665a91 --- /dev/null +++ b/backend/logger/tracer.go @@ -0,0 +1,31 @@ +package logger + +import ( + "github.com/rs/zerolog" + "go.opentelemetry.io/otel/trace" +) + +type tracingHook struct { + serviceName string +} + +func newTracerHook(serviceName string) *tracingHook { + return &tracingHook{ + serviceName: serviceName, + } +} + +func (t tracingHook) Run(e *zerolog.Event, level zerolog.Level, message string) { + + // grab the context from the logger. + // the logger caller pass in its current context. + ctx := e.GetCtx() + + // grab the span and trace id from the context + spanCtx := trace.SpanContextFromContext(ctx) + if spanCtx.IsValid() { + e.Str("span_id", spanCtx.SpanID().String()) + e.Str("trace_id", spanCtx.TraceID().String()) + e.Str("service.name", t.serviceName) + } +} diff --git a/backend/postgres/author.go b/backend/postgres/author.go index a096927..6ccb6eb 100644 --- a/backend/postgres/author.go +++ b/backend/postgres/author.go @@ -20,7 +20,9 @@ func NewAuthorService(db *DB) *AuthorService { func (s *AuthorService) FindAuthors(ctx context.Context) ([]string, error) { - s.db.logger.Debug().Msg("Starting find authors") + s.db.logger.Debug().Ctx(ctx).Msg("Starting find authors") + defer s.db.logger.Debug().Ctx(ctx).Msg("Finished find authors") + tx, err := s.db.db.BeginTx(ctx, nil) if err != nil { return nil, err @@ -32,8 +34,6 @@ func (s *AuthorService) FindAuthors(ctx context.Context) ([]string, error) { return nil, err } - s.db.logger.Debug().Msg("Finished find authors") - return authors, nil } diff --git a/backend/postgres/post.go b/backend/postgres/post.go index 3ce575b..5414917 100644 --- a/backend/postgres/post.go +++ b/backend/postgres/post.go @@ -160,11 +160,11 @@ func (s *PostService) AllPostIDs(ctx context.Context) ([]int, error) { // insertPost inserts a post into the DB and returns the post's ID func (db *DB) insertPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreatePost) (*int64, error) { - db.logger.Debug().Msg("Starting insert post") + db.logger.Debug().Ctx(ctx).Msg("Starting insert post") create, err := createPostToRawPostCreate(post) if err != nil { - db.logger.Error().Err(err).Msg("Failed to insert post") + db.logger.Error().Ctx(ctx).Err(err).Msg("Failed to insert post") return nil, err } @@ -182,7 +182,7 @@ func (db *DB) insertPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreateP stmt, err := tx.PrepareContext(ctx, query) if err != nil { - db.logger.Error().Err(err).Int64("postID", id).Msg("Failed to insert post") + db.logger.Error().Ctx(ctx).Err(err).Int64("postID", id).Msg("Failed to insert post") return nil, err } @@ -227,11 +227,11 @@ func (db *DB) insertPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreateP create.c5_percent).Scan(&id) if err != nil { - db.logger.Error().Err(err).Int64("postID", id).Msg("Failed to insert post") + db.logger.Error().Err(err).Ctx(ctx).Int64("postID", id).Msg("Failed to insert post") return nil, err } - db.logger.Info().Int64("postID", id).Msg("Finished inserting post") + db.logger.Info().Ctx(ctx).Int64("postID", id).Msg("Finished inserting post") return &id, nil } @@ -239,7 +239,7 @@ func (db *DB) insertPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreateP // insertKeywords inserts a post's keywords into the DB func (db *DB) insertKeywords(ctx context.Context, tx *sql.Tx, keywords []analogdb.Keyword, postID int64) error { - db.logger.Debug().Int64("postID", postID).Msg("Starting insert keywords") + db.logger.Debug().Ctx(ctx).Int64("postID", postID).Msg("Starting insert keywords") first := 1 second := 2 @@ -266,7 +266,7 @@ func (db *DB) insertKeywords(ctx context.Context, tx *sql.Tx, keywords []analogd stmt, err := tx.PrepareContext(ctx, query) if err != nil { - db.logger.Error().Err(err).Int64("postID", postID).Msg("Failed to insert keywords") + db.logger.Error().Err(err).Ctx(ctx).Int64("postID", postID).Msg("Failed to insert keywords") return err } @@ -275,11 +275,11 @@ func (db *DB) insertKeywords(ctx context.Context, tx *sql.Tx, keywords []analogd _, err = stmt.ExecContext(ctx, vals...) if err != nil { - db.logger.Error().Err(err).Int64("postID", postID).Msg("Failed to insert keywords") + db.logger.Error().Err(err).Ctx(ctx).Int64("postID", postID).Msg("Failed to insert keywords") return err } - db.logger.Info().Int64("postID", postID).Msg("Finished inserting keywords") + db.logger.Info().Ctx(ctx).Int64("postID", postID).Msg("Finished inserting keywords") return nil } @@ -287,7 +287,7 @@ func (db *DB) insertKeywords(ctx context.Context, tx *sql.Tx, keywords []analogd // deleteKeywords deletes all keywords for a given post func (db *DB) deleteKeywords(ctx context.Context, tx *sql.Tx, postID int64) error { - db.logger.Debug().Int64("postID", postID).Msg("Starting delete keywords") + db.logger.Debug().Ctx(ctx).Int64("postID", postID).Msg("Starting delete keywords") query := "DELETE FROM keywords WHERE post_id = $1" @@ -295,17 +295,17 @@ func (db *DB) deleteKeywords(ctx context.Context, tx *sql.Tx, postID int64) erro rows, err := tx.QueryContext(ctx, query, postID) defer rows.Close() if err != nil { - db.logger.Error().Err(err).Int64("postID", postID).Msg("Failed to delete keywords") + db.logger.Error().Err(err).Ctx(ctx).Int64("postID", postID).Msg("Failed to delete keywords") return err } - db.logger.Info().Int64("postID", postID).Msg("Finished deleting keywords") + db.logger.Info().Ctx(ctx).Int64("postID", postID).Msg("Finished deleting keywords") return nil } func (db *DB) createPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreatePost) (*analogdb.Post, error) { - db.logger.Debug().Msg("Starting create post") + db.logger.Debug().Ctx(ctx).Msg("Starting create post") id, err := db.insertPost(ctx, tx, post) if err != nil { @@ -323,7 +323,7 @@ func (db *DB) createPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreateP // commit transaction if both inserts are ok err = tx.Commit() if err != nil { - db.logger.Error().Err(err).Int64("postID", *id).Msg("Failed to create post") + db.logger.Error().Err(err).Ctx(ctx).Int64("postID", *id).Msg("Failed to create post") return nil, err } @@ -347,7 +347,7 @@ func (db *DB) createPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreateP DisplayPost: displayPost, } - db.logger.Info().Int64("postID", *id).Msg("Finished creating post") + db.logger.Info().Ctx(ctx).Int64("postID", *id).Msg("Finished creating post") return createdPost, nil } @@ -355,7 +355,7 @@ func (db *DB) createPost(ctx context.Context, tx *sql.Tx, post *analogdb.CreateP // findPosts is the general function responsible for handling all queries func (db *DB) findPosts(ctx context.Context, tx *sql.Tx, filter *analogdb.PostFilter) ([]*analogdb.Post, int, error) { - db.logger.Debug().Msg("Starting find posts") + db.logger.Debug().Ctx(ctx).Msg("Starting find posts") if err := validateFilter(filter); err != nil { db.logger.Error().Err(err).Msg("Failed to find posts") @@ -413,7 +413,7 @@ func (db *DB) findPosts(ctx context.Context, tx *sql.Tx, filter *analogdb.PostFi rows, err := tx.QueryContext(ctx, query, args...) if err != nil { - db.logger.Error().Err(err).Msg("Failed to find posts") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to find posts") return nil, 0, err } defer rows.Close() @@ -424,7 +424,7 @@ func (db *DB) findPosts(ctx context.Context, tx *sql.Tx, filter *analogdb.PostFi for rows.Next() { p, count, err = scanRowToRawPostCount(rows) if err != nil { - db.logger.Error().Err(err).Msg("Failed to find posts") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to find posts") return nil, 0, err } post, err := rawPostToPost(*p) @@ -433,7 +433,7 @@ func (db *DB) findPosts(ctx context.Context, tx *sql.Tx, filter *analogdb.PostFi stripAuthorPrefix(post) if err != nil { - db.logger.Error().Err(err).Msg("Failed to find posts") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to find posts") return nil, 0, err } posts = append(posts, post) @@ -442,18 +442,18 @@ func (db *DB) findPosts(ctx context.Context, tx *sql.Tx, filter *analogdb.PostFi err = tx.Commit() if err != nil { - db.logger.Error().Err(err).Msg("Failed to find posts") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to find posts") return nil, 0, err } - db.logger.Info().Msg("Finished finding posts") + db.logger.Info().Ctx(ctx).Msg("Finished finding posts") return posts, count, nil } func (db *DB) patchPost(ctx context.Context, tx *sql.Tx, patch *analogdb.PatchPost, id int) error { - db.logger.Debug().Int("postID", id).Msg("Starting patch post") + db.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting patch post") hasPatchFields := false @@ -461,7 +461,7 @@ func (db *DB) patchPost(ctx context.Context, tx *sql.Tx, patch *analogdb.PatchPo if patch.Nsfw != nil || patch.Sprocket != nil || patch.Grayscale != nil || patch.Score != nil || patch.Colors != nil { hasPatchFields = true if err := db.updatePost(ctx, tx, patch, id); err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to patch post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to patch post") return err } } @@ -469,67 +469,67 @@ func (db *DB) patchPost(ctx context.Context, tx *sql.Tx, patch *analogdb.PatchPo if patch.Keywords != nil { hasPatchFields = true if err := db.updateKeywords(ctx, tx, *patch.Keywords, id); err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to patch post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to patch post") return err } } if !hasPatchFields { err := errors.New("must include patch parameters") - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to patch post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to patch post") return err } // always insert the updated timestamp if err := db.insertPostUpdateTimes(ctx, tx, patch, id); err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to patch post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to patch post") return err } err := tx.Commit() if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to patch post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to patch post") return err } - db.logger.Info().Int("postID", id).Msg("Finished patching post") + db.logger.Info().Ctx(ctx).Int("postID", id).Msg("Finished patching post") return nil } func (db *DB) updateKeywords(ctx context.Context, tx *sql.Tx, keywords []analogdb.Keyword, id int) error { - db.logger.Debug().Int("postID", id).Msg("Starting update keywords") + db.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting update keywords") // first delete all keywords associated with post if err := db.deleteKeywords(ctx, tx, int64(id)); err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to update keywords") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to update keywords") return err } // if we have no keywords to insert, just return if len(keywords) == 0 { - db.logger.Info().Int("postID", id).Msg("Finished updating keywords (dropped all keywords)") + db.logger.Info().Ctx(ctx).Int("postID", id).Msg("Finished updating keywords (dropped all keywords)") return nil } // then insert all new keywords if err := db.insertKeywords(ctx, tx, keywords, int64(id)); err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to update keywords") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to update keywords") return err } - db.logger.Info().Int("postID", id).Msg("Finished updating keywords") + db.logger.Info().Ctx(ctx).Int("postID", id).Msg("Finished updating keywords") return nil } func (db *DB) updatePost(ctx context.Context, tx *sql.Tx, patch *analogdb.PatchPost, id int) error { - db.logger.Debug().Int("postID", id).Msg("Starting update post") + db.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting update post") set, args, err := patchToSet(patch) if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to update post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to update post") return err } @@ -541,19 +541,18 @@ func (db *DB) updatePost(ctx context.Context, tx *sql.Tx, patch *analogdb.PatchP rows, err := tx.QueryContext(ctx, query, args...) if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to update post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to update post") return err } defer rows.Close() - - db.logger.Info().Int("postID", id).Msg("Finished updating post") + db.logger.Info().Ctx(ctx).Int("postID", id).Msg("Finished updating post") return nil } func (db *DB) insertPostUpdateTimes(ctx context.Context, tx *sql.Tx, patch *analogdb.PatchPost, id int) error { - db.logger.Debug().Int("postID", id).Msg("Starting post update times") + db.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting post update times") query := ` @@ -565,7 +564,7 @@ func (db *DB) insertPostUpdateTimes(ctx context.Context, tx *sql.Tx, patch *anal stmt, err := tx.PrepareContext(ctx, query) if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to post update times") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to post update times") return err } @@ -601,11 +600,11 @@ func (db *DB) insertPostUpdateTimes(ctx context.Context, tx *sql.Tx, patch *anal rows, err := stmt.QueryContext(ctx, values...) defer rows.Close() if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to post update times") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to post update times") return err } - db.logger.Info().Int("postID", id).Msg("Finished post update times") + db.logger.Info().Ctx(ctx).Int("postID", id).Msg("Finished post update times") return nil @@ -613,7 +612,7 @@ func (db *DB) insertPostUpdateTimes(ctx context.Context, tx *sql.Tx, patch *anal func (db *DB) deletePost(ctx context.Context, tx *sql.Tx, id int) error { - db.logger.Debug().Int("postID", id).Msg("Starting delete post") + db.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting delete post") query := ` DELETE FROM pictures @@ -625,36 +624,36 @@ func (db *DB) deletePost(ctx context.Context, tx *sql.Tx, id int) error { var returnedID int err := row.Scan(&returnedID) if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to delete post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to delete post") return err } if id != returnedID { err := fmt.Errorf("error deleting post with id %d", id) - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to delete post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to delete post") return err } err = tx.Commit() if err != nil { - db.logger.Error().Err(err).Int("postID", id).Msg("Failed to delete post") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", id).Msg("Failed to delete post") return err } - db.logger.Info().Int("postID", id).Msg("Finished deleting post") + db.logger.Info().Ctx(ctx).Int("postID", id).Msg("Finished deleting post") return nil } func (db *DB) allPostIDs(ctx context.Context, tx *sql.Tx) ([]int, error) { - db.logger.Debug().Msg("Starting get all post IDs") + db.logger.Debug().Ctx(ctx).Msg("Starting get all post IDs") query := ` SELECT id FROM pictures ORDER BY id ASC` rows, err := tx.QueryContext(ctx, query) if err != nil { - db.logger.Error().Err(err).Msg("Failed to get all post IDs") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to get all post IDs") return nil, err } defer rows.Close() @@ -663,18 +662,18 @@ func (db *DB) allPostIDs(ctx context.Context, tx *sql.Tx) ([]int, error) { var id int for rows.Next() { if err := rows.Scan(&id); err != nil { - db.logger.Error().Err(err).Msg("Failed to get all post IDs") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to get all post IDs") return nil, err } ids = append(ids, id) } err = tx.Commit() if err != nil { - db.logger.Error().Err(err).Msg("Failed to get all post IDs") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to get all post IDs") return nil, err } - db.logger.Info().Msg("Finished getting all post IDs") + db.logger.Info().Ctx(ctx).Msg("Finished getting all post IDs") return ids, nil } diff --git a/backend/postgres/scrape.go b/backend/postgres/scrape.go index 3d75e0b..93350ea 100644 --- a/backend/postgres/scrape.go +++ b/backend/postgres/scrape.go @@ -5,13 +5,15 @@ import ( "database/sql" "github.com/evanofslack/analogdb" + "github.com/evanofslack/analogdb/logger" ) // ensure interface is implemented var _ analogdb.ScrapeService = (*ScrapeService)(nil) type ScrapeService struct { - db *DB + db *DB + logger *logger.Logger } func NewScrapeService(db *DB) *ScrapeService { @@ -19,6 +21,10 @@ func NewScrapeService(db *DB) *ScrapeService { } func (s *ScrapeService) KeywordUpdatedPostIDs(ctx context.Context) ([]int, error) { + + s.logger.Debug().Ctx(ctx).Msg("Starting get keyword updated post ids") + defer s.logger.Debug().Ctx(ctx).Msg("Finished keyword updated post ids") + tx, err := s.db.db.BeginTx(ctx, nil) if err != nil { return nil, err diff --git a/backend/redis/author.go b/backend/redis/author.go index b88bd63..ca14c92 100644 --- a/backend/redis/author.go +++ b/backend/redis/author.go @@ -37,9 +37,9 @@ func NewCacheAuthorService(rdb *RDB, dbService analogdb.AuthorService) *AuthorSe func (s *AuthorService) FindAuthors(ctx context.Context) ([]string, error) { - s.rdb.logger.Debug().Str("instance", s.cache.instance).Msg("Starting find authors with cache") + s.rdb.logger.Debug().Ctx(ctx).Ctx(ctx).Str("instance", s.cache.instance).Msg("Starting find authors with cache") defer func() { - s.rdb.logger.Debug().Str("instance", s.cache.instance).Msg("Finished find authors with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.cache.instance).Msg("Finished find authors with cache") }() var authors []string @@ -62,13 +62,13 @@ func (s *AuthorService) FindAuthors(ctx context.Context) ([]string, error) { // do this async so response is returned quicker go func() { - s.rdb.logger.Debug().Str("instance", s.cache.instance).Msg("Adding authors to cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.cache.instance).Msg("Adding authors to cache") // create a new context; orignal one will be canceled when request is closed ctx, cancel := context.WithTimeout(context.Background(), cacheOpTimeout) defer cancel() - s.cache.set(&cache.Item{ + s.cache.set(ctx, &cache.Item{ Ctx: ctx, Key: authorsKey, Value: &authors, diff --git a/backend/redis/post.go b/backend/redis/post.go index 2ef8fde..027d00b 100644 --- a/backend/redis/post.go +++ b/backend/redis/post.go @@ -59,15 +59,13 @@ func (s *PostService) CreatePost(ctx context.Context, post *analogdb.CreatePost) func (s *PostService) FindPosts(ctx context.Context, filter *analogdb.PostFilter) ([]*analogdb.Post, int, error) { - s.rdb.logger.Debug().Str("instance", s.postsCache.instance).Msg("Starting find posts with cache") - defer func() { - s.rdb.logger.Debug().Str("instance", s.postsCache.instance).Msg("Finished find posts with cache") - }() + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postsCache.instance).Msg("Starting find posts with cache") + defer s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postsCache.instance).Msg("Finished find posts with cache") // generate a unique hash from the filter struct hash, err := hashstructure.Hash(filter, hashstructure.FormatV2, nil) if err != nil { - s.rdb.logger.Err(err).Str("instance", s.postsCache.instance).Msg("Failed to hash post filter") + s.rdb.logger.Error().Err(err).Ctx(ctx).Str("instance", s.postsCache.instance).Msg("Failed to hash post filter") // if we failed, fallback to db return s.dbService.FindPosts(ctx, filter) @@ -100,20 +98,20 @@ func (s *PostService) FindPosts(ctx context.Context, filter *analogdb.PostFilter // do this async so response is returned quicker go func() { - s.rdb.logger.Debug().Str("instance", s.postsCache.instance).Msg("Adding posts and posts counts to cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postsCache.instance).Msg("Adding posts and posts counts to cache") // create a new context; orignal one will be canceled when request is closed ctx, cancel := context.WithTimeout(context.Background(), cacheOpTimeout) defer cancel() // add posts to cache - s.postsCache.set(&cache.Item{ + s.postsCache.set(ctx, &cache.Item{ Ctx: ctx, Key: postsHash, Value: &posts, TTL: postsTTL, }) // add posts count to cache - s.postsCache.set(&cache.Item{ + s.postsCache.set(ctx, &cache.Item{ Ctx: ctx, Key: postsCountHash, Value: &count, @@ -126,9 +124,9 @@ func (s *PostService) FindPosts(ctx context.Context, filter *analogdb.PostFilter func (s *PostService) FindPostByID(ctx context.Context, id int) (*analogdb.Post, error) { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Starting find post by id with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Starting find post by id with cache") defer func() { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Finished find post by id with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Finished find post by id with cache") }() var post *analogdb.Post @@ -152,13 +150,13 @@ func (s *PostService) FindPostByID(ctx context.Context, id int) (*analogdb.Post, // do this async so response is returned quicker go func() { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Adding post to cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Adding post to cache") // create a new context; orignal one will be canceled when request is closed ctx, cancel := context.WithTimeout(context.Background(), cacheOpTimeout) defer cancel() // add to cache - s.postCache.set(&cache.Item{ + s.postCache.set(ctx, &cache.Item{ Ctx: ctx, Key: postKey, Value: &post, @@ -170,27 +168,27 @@ func (s *PostService) FindPostByID(ctx context.Context, id int) (*analogdb.Post, func (s *PostService) PatchPost(ctx context.Context, patch *analogdb.PatchPost, id int) error { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Starting patch post with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Starting patch post with cache") defer func() { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Finished patch post with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Finished patch post with cache") }() // remove post from the cache - go s.removePostFromCache(id) + go s.removePostFromCache(ctx, id) return s.dbService.PatchPost(ctx, patch, id) } func (s *PostService) DeletePost(ctx context.Context, id int) error { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Starting delete post with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Starting delete post with cache") defer func() { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Finished delete post with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Finished delete post with cache") }() // cache is now stale, delete old entries go func() { - s.removePostFromCache(id) + s.removePostFromCache(ctx, id) }() return s.dbService.DeletePost(ctx, id) @@ -200,9 +198,9 @@ func (s *PostService) AllPostIDs(ctx context.Context) ([]int, error) { return s.dbService.AllPostIDs(ctx) } -func (s *PostService) removePostFromCache(id int) { +func (s *PostService) removePostFromCache(ctx context.Context, id int) { - s.rdb.logger.Debug().Str("instance", s.postCache.instance).Int("postID", id).Msg("Removing post from cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.postCache.instance).Int("postID", id).Msg("Removing post from cache") postKey := fmt.Sprint(id) diff --git a/backend/redis/redis.go b/backend/redis/redis.go index 9132790..b36dc23 100644 --- a/backend/redis/redis.go +++ b/backend/redis/redis.go @@ -64,7 +64,7 @@ func NewRDB(url string, logger *logger.Logger, metrics *metrics.Metrics, tracing // otel instrumentation of redis if tracingEnabled { if err := redisotel.InstrumentTracing(db); err != nil { - rdb.logger.Err(err).Msg("Failed to instrument redis with tracing") + rdb.logger.Error().Err(err).Msg("Failed to instrument redis with tracing") } else { rdb.logger.Info().Msg("Instrumented redis with tracing") } @@ -134,7 +134,7 @@ func (rdb *RDB) NewCache(instance string, size int, ttl time.Duration) *Cache { func (cache *Cache) get(ctx context.Context, key string, item interface{}) error { - cache.logger.Debug().Str("instance", cache.instance).Msg("Getting item from cache") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Getting item from cache") // do the lookup on the inner cache err := cache.cache.Get(ctx, key, item) @@ -144,50 +144,50 @@ func (cache *Cache) get(ctx context.Context, key string, item interface{}) error // was it a cache miss? if strings.Contains(err.Error(), cacheMissErr) { - cache.logger.Debug().Str("instance", cache.instance).Msg("Cache miss") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Cache miss") cache.stats.incMisses() // or error decoding an empty array? this is fine and not an error - } else if strings.Contains(err.Error(), decodeArrayErr) { - cache.logger.Debug().Str("instance", cache.instance).Msg("Error decoding array on cache get, proceeding") - cache.stats.incMisses() + // } else if strings.Contains(err.Error(), decodeArrayErr) { + // cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Error decoding array on cache get, proceeding") + // cache.stats.incMisses() // or an actual error } else { - cache.logger.Err(err).Str("instance", cache.instance).Msg("Error getting item from cache") + cache.logger.Error().Err(err).Ctx(ctx).Str("instance", cache.instance).Msg("Error getting item from cache") cache.stats.incErrors() } return err } // no error means cache hit - cache.logger.Debug().Str("instance", cache.instance).Msg("Cache hit") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Cache hit") cache.stats.incHits() return nil } -func (cache *Cache) set(item *cache.Item) error { +func (cache *Cache) set(ctx context.Context, item *cache.Item) error { - cache.logger.Debug().Str("instance", cache.instance).Msg("Setting item in cache") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Setting item in cache") err := cache.cache.Set(item) if err != nil { - cache.logger.Err(err).Str("instance", cache.instance).Msg("Failed to set item") + cache.logger.Error().Err(err).Ctx(ctx).Str("instance", cache.instance).Msg("Failed to set item") } - cache.logger.Debug().Str("instance", cache.instance).Msg("Added item cache") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Added item cache") return err } func (cache *Cache) delete(ctx context.Context, key string) error { - cache.logger.Debug().Str("instance", cache.instance).Msg("Deleting item from cache") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Deleting item from cache") err := cache.cache.Delete(ctx, key) if err != nil { - cache.logger.Err(err).Str("instance", cache.instance).Msg("Failed to delete item") + cache.logger.Error().Err(err).Ctx(ctx).Str("instance", cache.instance).Msg("Failed to delete item") } - cache.logger.Debug().Str("instance", cache.instance).Msg("Deleted item from cache") + cache.logger.Debug().Ctx(ctx).Str("instance", cache.instance).Msg("Deleted item from cache") return err } diff --git a/backend/redis/similarity.go b/backend/redis/similarity.go index 2d53455..99cd583 100644 --- a/backend/redis/similarity.go +++ b/backend/redis/similarity.go @@ -3,6 +3,7 @@ package redis import ( "context" "fmt" + "strings" "time" "github.com/go-redis/cache/v9" @@ -19,6 +20,8 @@ const ( idKeysInstance = "idkeys" idKeysLocalSize = 1000 idKeysTTL = time.Hour * 24 + + delimiter = ";" ) // ensure interface is implemented @@ -70,15 +73,15 @@ func (s *SimilarityService) FindSimilarPosts(ctx context.Context, filter *analog id := *filter.ID - s.rdb.logger.Debug().Str("instance", s.similarCache.instance).Int("postID", id).Msg("Starting find similar posts by image with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.similarCache.instance).Int("postID", id).Msg("Starting find similar posts by image with cache") defer func() { - s.rdb.logger.Debug().Str("instance", s.similarCache.instance).Int("postID", id).Msg("Finished find similar posts by image with cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.similarCache.instance).Int("postID", id).Msg("Finished find similar posts by image with cache") }() // generate a unique hash from the filter struct hash, err := hashstructure.Hash(filter, hashstructure.FormatV2, nil) if err != nil { - s.rdb.logger.Err(err).Str("instance", s.similarCache.instance).Int("postID", id).Msg("Failed to hash post similarity filter") + s.rdb.logger.Error().Err(err).Ctx(ctx).Str("instance", s.similarCache.instance).Int("postID", id).Msg("Failed to hash post similarity filter") // if we failed, fallback to db return s.dbService.FindSimilarPosts(ctx, filter) @@ -87,7 +90,7 @@ func (s *SimilarityService) FindSimilarPosts(ctx context.Context, filter *analog postKey := fmt.Sprint(hash) idKey := fmt.Sprint(id) - s.rdb.logger.Debug().Str("instance", s.similarCache.instance).Int("postID", id).Str("hash", postKey).Msg("Generated post key hash from similarity filter") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.similarCache.instance).Int("postID", id).Str("hash", postKey).Msg("Generated post key hash from similarity filter") var posts []*analogdb.Post @@ -109,12 +112,12 @@ func (s *SimilarityService) FindSimilarPosts(ctx context.Context, filter *analog // do this async so response is returned quicker go func() { - s.rdb.logger.Debug().Str("instance", s.similarCache.instance).Msg("Adding similar posts to cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.similarCache.instance).Msg("Adding similar posts to cache") // create a new context; orignal one will be canceled when request is closed ctx, cancel := context.WithTimeout(context.Background(), cacheOpTimeout) defer cancel() - s.similarCache.set(&cache.Item{ + s.similarCache.set(ctx, &cache.Item{ Ctx: ctx, Key: postKey, Value: &posts, @@ -126,37 +129,47 @@ func (s *SimilarityService) FindSimilarPosts(ctx context.Context, filter *analog // do this async so response is returned quicker go func() { - s.rdb.logger.Debug().Str("instance", s.idKeysCache.instance).Msg("Adding id hashes to cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.idKeysCache.instance).Msg("Adding id hashes to cache") // create a new context; orignal one will be canceled when request is closed ctx, cancel := context.WithTimeout(context.Background(), cacheOpTimeout) defer cancel() - // slice of all hashes this id maps to - // would rather use a map/set but it doesn't serialize correctly - var idKeyHashes []string + // list of hashes seperated with delimiter + var idKeyHashesString string // try to get id key's hashes from cache - err = s.idKeysCache.get(ctx, idKey, &idKeyHashes) + err = s.idKeysCache.get(ctx, idKey, &idKeyHashesString) + + // split string to list + var idKeyHashes []string - // check if key already in slice; if not, add it. + if idKeyHashesString != "" { + idKeyHashes = strings.Split(idKeyHashesString, delimiter) + } + + // if key already exists in slice, no more to do for _, h := range idKeyHashes { if postKey == h { - s.rdb.logger.Debug().Str("instance", s.idKeysCache.instance).Int("postID", id).Str("hash", postKey).Msg("Post key hash already exists in cache, skipping") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.idKeysCache.instance).Int("postID", id).Str("hash", postKey).Msg("Post key hash already exists in cache, skipping") return } } + // otherwise add it and save back to cache idKeyHashes = append(idKeyHashes, postKey) - s.rdb.logger.Debug().Int("postID", id).Str("hash", postKey).Msg(fmt.Sprintf("Added hash for postID, now has %d hashes", len(idKeyHashes))) + // serialize as string + idKeyHashesString = strings.Join(idKeyHashes, delimiter) // save this back to the cache - s.idKeysCache.set(&cache.Item{ + s.idKeysCache.set(ctx, &cache.Item{ Ctx: ctx, Key: idKey, Value: &idKeyHashes, TTL: similarTTL, }) + + s.rdb.logger.Debug().Ctx(ctx).Int("postID", id).Str("hash", postKey).Msg(fmt.Sprintf("Added hash for postID, now has %d hashes", len(idKeyHashes))) }() return posts, nil @@ -164,9 +177,9 @@ func (s *SimilarityService) FindSimilarPosts(ctx context.Context, filter *analog func (s *SimilarityService) DeletePost(ctx context.Context, id int) error { - s.rdb.logger.Debug().Int("postID", id).Msg("Starting delete vector post with cache") + s.rdb.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting delete vector post with cache") defer func() { - s.rdb.logger.Debug().Int("postID", id).Msg("Finished delete vector post with cache") + s.rdb.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Finished delete vector post with cache") }() // remove from cache in background @@ -174,23 +187,29 @@ func (s *SimilarityService) DeletePost(ctx context.Context, id int) error { idKey := fmt.Sprint(id) // set of all hashes this id maps to - var idKeyHashes []string + var idKeyHashesString string // try to get id key's hashes from cache - err := s.idKeysCache.get(ctx, idKey, &idKeyHashes) - + err := s.idKeysCache.get(ctx, idKey, &idKeyHashesString) if err != nil { return } + // split string to list + var idKeyHashes []string + + if idKeyHashesString != "" { + idKeyHashes = strings.Split(idKeyHashesString, delimiter) + } + // for all hashes, remove from posts cache for _, hash := range idKeyHashes { - s.rdb.logger.Debug().Str("instance", s.similarCache.instance).Int("postID", id).Str("hash", hash).Msg("Deleting hash from similar posts cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.similarCache.instance).Int("postID", id).Str("hash", hash).Msg("Deleting hash from similar posts cache") s.similarCache.delete(ctx, hash) } // and remove from key ids cache - s.rdb.logger.Debug().Str("instance", s.idKeysCache.instance).Int("postID", id).Msg("Deleting key from post ids cache") + s.rdb.logger.Debug().Ctx(ctx).Str("instance", s.idKeysCache.instance).Int("postID", id).Msg("Deleting key from post ids cache") s.idKeysCache.delete(ctx, idKey) }() diff --git a/backend/server/auth.go b/backend/server/auth.go index ff4428d..42e05fb 100644 --- a/backend/server/auth.go +++ b/backend/server/auth.go @@ -9,16 +9,18 @@ import ( func (s *Server) auth(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + username := s.config.Auth.Username password := s.config.Auth.Password authenticated := s.passBasicAuth(username, password, r) if authenticated { - s.logger.Debug().Bool("authenticated", authenticated).Msg("Authorized with basic auth") + s.logger.Debug().Ctx(ctx).Bool("authenticated", authenticated).Msg("Authorized with basic auth") next.ServeHTTP(w, r) return } - s.logger.Debug().Bool("authenticated", authenticated).Msg("Unauthorized with basic auth") + s.logger.Debug().Ctx(ctx).Bool("authenticated", authenticated).Msg("Unauthorized with basic auth") w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`) http.Error(w, "Unauthorized", http.StatusUnauthorized) }) diff --git a/backend/server/error.go b/backend/server/error.go index 526041d..c2ded32 100644 --- a/backend/server/error.go +++ b/backend/server/error.go @@ -24,15 +24,17 @@ func errorStatusCode(code string) int { func (s *Server) writeError(w http.ResponseWriter, r *http.Request, err error) { + ctx := r.Context() + code, message := analogdb.ErrorCode(err), analogdb.ErrorMessage(err) - s.logger.Error().Err(err).Str("method", r.Method).Str("path", r.URL.Path).Str("code", code).Msg(message) + s.logger.Error().Err(err).Ctx(ctx).Str("method", r.Method).Str("path", r.URL.Path).Str("code", code).Msg(message) w.Header().Set("Content-type", "application/json") w.WriteHeader(errorStatusCode(code)) marshallErr := json.NewEncoder(w).Encode(&ErrorResponse{Error: message}) if marshallErr != nil { - s.logger.Error().Err(err).Msg("Failed to marshall json") + s.logger.Error().Err(err).Ctx(ctx).Msg("Failed to marshall json") } } diff --git a/backend/server/middleware.go b/backend/server/middleware.go index 1b7a2de..d23d0e1 100644 --- a/backend/server/middleware.go +++ b/backend/server/middleware.go @@ -18,16 +18,22 @@ const ( func (s *Server) mountMiddleware() { + // add recoverer first s.router.Use(middleware.Recoverer) - s.router.Use(logger.Middleware(s.logger)) + + // collect prom metrics s.router.Use(s.collectStats) // is tracing enabled? + // attach before logger so span id is logged if s.config.Tracing.Enabled { s.router.Use(otelchi.Middleware("http", otelchi.WithChiRoutes(s.router))) s.logger.Info().Msg("Added tracing middleware") } + // log all requests + s.router.Use(logger.Middleware(s.logger)) + // is rate limiting enabled? if s.config.App.RateLimitEnabled { @@ -53,21 +59,25 @@ func (s *Server) mountMiddleware() { AllowCredentials: true, MaxAge: 500, }) + + // CORS s.router.Use(corsHandler) } // apply rate limit only if user is not authenticated func (s *Server) applyRateLimit(r *http.Request) bool { + ctx := r.Context() + rl_username := s.config.Auth.RateLimitUsername rl_password := s.config.Auth.RateLimitPassword authenticated := s.passBasicAuth(rl_username, rl_password, r) if authenticated { - s.logger.Debug().Bool("authenticated", authenticated).Msg("Bypassing rate limit") + s.logger.Debug().Ctx(ctx).Bool("authenticated", authenticated).Msg("Bypassing rate limit") return false } - s.logger.Debug().Bool("authenticated", authenticated).Msg("Applying rate limit") + s.logger.Debug().Ctx(ctx).Bool("authenticated", authenticated).Msg("Applying rate limit") return true } diff --git a/backend/tracer/otel.go b/backend/tracer/otel.go index 90220f0..dd99514 100644 --- a/backend/tracer/otel.go +++ b/backend/tracer/otel.go @@ -6,6 +6,7 @@ import ( "github.com/evanofslack/analogdb/config" "github.com/evanofslack/analogdb/logger" + "go.opentelemetry.io/contrib" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -107,7 +108,7 @@ func (tracer *Tracer) StartExporter() error { // create named tracer to be used by this library tracer.logger.Debug().Str("name", tracerName).Msg("Creating new internal tracer") - tracer.Tracer = tracerProvider.Tracer(tracerName) + tracer.Tracer = tracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(contrib.SemVersion())) tracer.logger.Info().Str("endpoint", endpoint).Msg("Started tracing exporter") diff --git a/backend/weaviate/batch.go b/backend/weaviate/batch.go index 0e4e218..13a515a 100644 --- a/backend/weaviate/batch.go +++ b/backend/weaviate/batch.go @@ -33,7 +33,7 @@ func (ss SimilarityService) BatchEncodePosts(ctx context.Context, ids []int, bat func (db *DB) batchUploadObjects(ctx context.Context, objects []*models.Object) error { - db.logger.Debug().Msg("Starting batch upload to vector DB") + db.logger.Debug().Ctx(ctx).Msg("Starting batch upload to vector DB") batcher := db.db.Batch().ObjectsBatcher() for _, obj := range objects { @@ -41,7 +41,7 @@ func (db *DB) batchUploadObjects(ctx context.Context, objects []*models.Object) } _, err := batcher.Do(ctx) if err != nil { - db.logger.Error().Err(err).Msg("Failed to do batch upload to vector DB") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to do batch upload to vector DB") return err } return nil diff --git a/backend/weaviate/encode.go b/backend/weaviate/encode.go index 7c58926..289359a 100644 --- a/backend/weaviate/encode.go +++ b/backend/weaviate/encode.go @@ -14,7 +14,7 @@ import ( func (ss SimilarityService) EncodePost(ctx context.Context, id int) error { - ss.db.logger.Debug().Int("postID", id).Msg("Starting encode post") + ss.db.logger.Debug().Ctx(ctx).Int("postID", id).Msg("Starting encode post") ctx, span := ss.db.tracer.Tracer.Start(ctx, "vector:encode_post") defer span.End() @@ -29,7 +29,7 @@ func (ss SimilarityService) EncodePost(ctx context.Context, id int) error { err = fmt.Errorf("failed to convert post to picture object: %w", err) return err } - err = ss.db.uploadObject(ctx, obj) + err = ss.db.uploadObject(ctx, obj) if err != nil { err = fmt.Errorf("failed to upload picture object: %w", err) return err @@ -37,10 +37,9 @@ func (ss SimilarityService) EncodePost(ctx context.Context, id int) error { return nil } - func (db *DB) downloadPostImage(ctx context.Context, post *analogdb.Post) (string, error) { - db.logger.Debug().Msg("Starting download post") + db.logger.Debug().Ctx(ctx).Msg("Starting download post") ctx, span := db.tracer.Tracer.Start(ctx, "vector:download_post_image") defer span.End() @@ -81,7 +80,7 @@ func (db *DB) downloadPostImage(ctx context.Context, post *analogdb.Post) (strin func (db *DB) postToPictureObject(ctx context.Context, post *analogdb.Post) (*models.Object, error) { - db.logger.Debug().Msg("Starting convert post to picture object") + db.logger.Debug().Ctx(ctx).Msg("Starting convert post to picture object") image, err := db.downloadPostImage(ctx, post) if err != nil { @@ -94,16 +93,16 @@ func (db *DB) postToPictureObject(ctx context.Context, post *analogdb.Post) (*mo func (db *DB) uploadObject(ctx context.Context, obj *models.Object) error { - db.logger.Debug().Msg("Starting upload object") + db.logger.Debug().Ctx(ctx).Msg("Starting upload object") - ctx, span := db.tracer.Tracer.Start(ctx, "vector:upload_object") + ctx, span := db.startTrace(ctx, "vector:upload_object") defer span.End() batcher := db.db.Batch().ObjectsBatcher() _, err := batcher.WithObject(obj).Do(ctx) if err != nil { err = fmt.Errorf("failed to upload to vector DB: %w", err) - db.logger.Error().Err(err).Msg("Failed upload to vector DB") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed upload to vector DB") span.SetStatus(codes.Error, "Upload object failed") span.RecordError(err) return err diff --git a/backend/weaviate/schema.go b/backend/weaviate/schema.go index f7bff95..fb21a8f 100644 --- a/backend/weaviate/schema.go +++ b/backend/weaviate/schema.go @@ -28,7 +28,7 @@ func (db *DB) getSchema(ctx context.Context) (*schema.Dump, error) { func (db *DB) createPictureSchema(ctx context.Context) error { - db.logger.Debug().Msg("Starting to create picture schema in vector DB") + db.logger.Debug().Ctx(ctx).Msg("Starting to create picture schema in vector DB") classObj := &models.Class{ Class: "Picture", @@ -78,10 +78,10 @@ func (db *DB) createPictureSchema(ctx context.Context) error { err := db.db.Schema().ClassCreator().WithClass(classObj).Do(context.Background()) if err != nil { err = fmt.Errorf("Failed to create picture schema, %w", err) - db.logger.Error().Err(err).Msg("Failed to create picture schema in vector DB") + db.logger.Error().Err(err).Ctx(ctx).Msg("Failed to create picture schema in vector DB") return err } - db.logger.Info().Msg("Created picture schema in vector DB") + db.logger.Info().Ctx(ctx).Msg("Created picture schema in vector DB") return nil } diff --git a/backend/weaviate/similarity.go b/backend/weaviate/similarity.go index 84e8b93..eb6f294 100644 --- a/backend/weaviate/similarity.go +++ b/backend/weaviate/similarity.go @@ -53,9 +53,10 @@ func (ss SimilarityService) FindSimilarPosts(ctx context.Context, similarityFilt func (db *DB) deletePost(ctx context.Context, postID int) error { - db.logger.Debug().Int("postID", postID).Msg("Starting delete post from vector DB") + db.logger.Debug().Ctx(ctx).Int("postID", postID).Msg("Starting delete post from vector DB") - ctx, span := db.tracer.Tracer.Start(ctx, "vector:delete_post", trace.WithAttributes(attribute.Int("postID", postID))) + + ctx, span := db.startTrace(ctx, "vector:delete_post", trace.WithAttributes(attribute.Int("postID", postID))) defer span.End() fields := []graphql.Field{ @@ -80,7 +81,7 @@ func (db *DB) deletePost(ctx context.Context, postID int) error { if err != nil || result == nil { err = fmt.Errorf("Failed to find postID in vector DB, err=%w", err) - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to delete post from vectorDB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to delete post from vectorDB") span.SetStatus(codes.Error, "Get embedding by postID failed") span.RecordError(err) return &analogdb.Error{Code: analogdb.ERRNOTFOUND, Message: fmt.Sprintf("Post %d not found", postID)} @@ -89,7 +90,7 @@ func (db *DB) deletePost(ctx context.Context, postID int) error { pics, err := unmarshallPicturesResp(result) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to delete post from vector DB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to delete post from vector DB") span.SetStatus(codes.Error, "Unmarshall embedding failed") span.RecordError(err) return &analogdb.Error{Code: analogdb.ERRNOTFOUND, Message: fmt.Sprintf("Post %d not found", postID)} @@ -104,14 +105,14 @@ func (db *DB) deletePost(ctx context.Context, postID int) error { Do(ctx) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to delete post from vector DB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to delete post from vector DB") span.SetStatus(codes.Error, "Delete picture failed") span.RecordError(err) return &analogdb.Error{Code: analogdb.ERRINTERNAL, Message: fmt.Sprintf("Post %d could not be deleted from vector DB", postID)} } span.AddEvent("Deleted picture", trace.WithAttributes(attribute.Int("postID", postID), attribute.String("uuid", uuid))) - db.logger.Info().Int("postID", postID).Msg("Deleted post from vector DB") + db.logger.Info().Ctx(ctx).Int("postID", postID).Msg("Deleted post from vector DB") return err } @@ -132,9 +133,9 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila postID := *filter.ID - db.logger.Debug().Int("postID", postID).Msg("Starting get similar posts from vector DB") + db.logger.Debug().Ctx(ctx).Int("postID", postID).Msg("Starting get similar posts from vector DB") - ctx, span := db.tracer.Tracer.Start(ctx, "vector:get_similar_post_ids", trace.WithAttributes(attribute.Int("postID", postID))) + ctx, span := db.startTrace(ctx, "vector:get_similar_post_ids", trace.WithAttributes(attribute.Int("postID", postID))) defer span.End() // first make the query to lookup UUID associated with post's embedding @@ -160,7 +161,7 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila Do(ctx) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to find post in vector DB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to find post in vector DB") span.SetStatus(codes.Error, "Get embedding by postID failed") span.RecordError(err) return ids, err @@ -169,7 +170,7 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila pics, err := unmarshallPicturesResp(result) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to unmarshall post from vector DB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to unmarshall post from vector DB") span.SetStatus(codes.Error, "Unmarshall embedding failed") span.RecordError(err) return ids, &analogdb.Error{Code: analogdb.ERRNOTFOUND, Message: fmt.Sprintf("Post %d not found", postID)} @@ -182,7 +183,7 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila // this is where we narrow down the results where, err = filterToWhere(filter) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to convert similarity filter to where clause") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to convert similarity filter to where clause") span.SetStatus(codes.Error, "Similarity filter to where clause failed") span.RecordError(err) return ids, err @@ -204,7 +205,7 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila Do(ctx) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to find near embeddings in vector DB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to find near embeddings in vector DB") span.SetStatus(codes.Error, "Failed to find similar embeddings in vector DB") span.RecordError(err) return ids, err @@ -213,7 +214,7 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila pics, err = unmarshallPicturesResp(result) if err != nil { - db.logger.Error().Err(err).Int("postID", postID).Msg("Failed to unmarshall post from vector DB") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Failed to unmarshall post from vector DB") span.SetStatus(codes.Error, "Unmarshall embedding failed") span.RecordError(err) return ids, err @@ -225,7 +226,7 @@ func (db *DB) getSimilarPostIDs(ctx context.Context, filter *analogdb.PostSimila } if len(ids) == 0 { - db.logger.Error().Err(err).Int("postID", postID).Msg("Found zero similar posts") + db.logger.Error().Err(err).Ctx(ctx).Int("postID", postID).Msg("Found zero similar posts") span.SetStatus(codes.Error, "Found zero similar posts") span.RecordError(err) return ids, &analogdb.Error{Code: analogdb.ERRNOTFOUND, Message: "No similar posts found"} diff --git a/backend/weaviate/weaviate.go b/backend/weaviate/weaviate.go index 9638789..a6cb0ad 100644 --- a/backend/weaviate/weaviate.go +++ b/backend/weaviate/weaviate.go @@ -8,6 +8,8 @@ import ( "github.com/evanofslack/analogdb/logger" "github.com/evanofslack/analogdb/tracer" "github.com/weaviate/weaviate-go-client/v4/weaviate" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const weaviateClientTimeout = 30 * time.Second @@ -92,3 +94,19 @@ func (db *DB) Close() error { db.logger.Info().Msg("Closed vector DB connection") return nil } + +// start a trace targeting the weaviate server +func (db *DB) startTrace(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + + dbSystem := attribute.String("db.system", "weaviate") + dbName := attribute.String("db.name", "pictures") + serverAddress := attribute.String("server.address", db.host) + + attributes := trace.WithAttributes(dbSystem, dbName, serverAddress) + spanKind := trace.WithSpanKind(trace.SpanKindClient) + + opts = append(opts, attributes, spanKind) + + return db.tracer.Tracer.Start(ctx, spanName, opts...) + +}