Skip to content

Commit

Permalink
perf: Instrument auth{n,z} calls and simplify existing tracing code
Browse files Browse the repository at this point in the history
  • Loading branch information
Sonia Melgaço committed Oct 11, 2022
1 parent b028dac commit 9cf9fa6
Show file tree
Hide file tree
Showing 20 changed files with 71 additions and 1,512 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ run-containers: ## run all test containers
@cd test_containers && docker-compose up -d && cd ..

kill-containers: ## kill all test containers
@cd test_containers && docker-compose stop && cd ..
@cd test_containers && docker-compose down && cd ..

setup/mongo:
go run scripts/setup_mongo_messages-index.go
Expand Down
44 changes: 39 additions & 5 deletions app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

"github.com/labstack/echo"
newrelic "github.com/newrelic/go-agent"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/spf13/viper"
"github.com/topfreegames/mqtt-history/mongoclient"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -61,6 +64,16 @@ func WithSegment(name string, c echo.Context, f func() error) error {
}

func findAuthorizedTopics(ctx context.Context, username string, topics []string) ([]ACL, error) {
collection := "mqtt_acl"
span, ctx := opentracing.StartSpanFromContext(
ctx,
"find_authorized_topics",
opentracing.Tags{
string(ext.DBType): "mongo",
"collection": collection,
},
)
defer span.Finish()
searchResults := make([]ACL, 0)
query := func(c *mongo.Collection) error {
opts := options.Find()
Expand All @@ -69,31 +82,41 @@ func findAuthorizedTopics(ctx context.Context, username string, topics []string)
{"username", 1},
{"pubsub", 1},
}

// add sort to match index
opts.SetSort(defaultACLSort)

query := bson.M{"username": username, "pubsub": bson.M{"$in": topics}}

statement := mongoclient.ExtractStatementForTrace(query, defaultACLSort, -1)
span.SetTag(string(ext.DBStatement), statement)
span.SetTag(string(ext.DBInstance), c.Database().Name())

cursor, err := c.Find(ctx, query)
if err != nil {
ext.LogError(span, err, log.Message("Error finding messages in MongoDB"))
return err
}

return cursor.All(ctx, &searchResults)
}
search := func() error {
mongoCollection, err := mongoclient.GetCollection(ctx, "mqtt_acl")
mongoCollection, err := mongoclient.GetCollection(ctx, collection)
if err != nil {
ext.LogError(span, err, log.Message("Error getting collection from MongoDB"))
return err
}
return query(mongoCollection)
}
err := search()
if err != nil {
ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB"))
}
return searchResults, err
}

// GetTopics get topics
func GetTopics(ctx context.Context, username string, _topics []string) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "get_topics")
defer span.Finish()
if viper.GetBool("mongo.allow_anonymous") {
return _topics, nil
}
Expand All @@ -114,13 +137,16 @@ func IsAuthorized(ctx context.Context, app *App, userID string, topics ...string
httpAuthEnabled := app.Config.GetBool("httpAuth.enabled")

if httpAuthEnabled {
return httpAuthorize(app, userID, topics)
return httpAuthorize(ctx, app, userID, topics)
}

return mongoAuthorize(ctx, userID, topics)
}

func httpAuthorize(app *App, userID string, topics []string) (bool, []string, error) {
func httpAuthorize(ctx context.Context, app *App, userID string, topics []string) (bool, []string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "http_authorize")
defer span.Finish()

timeout := app.Config.GetDuration("httpAuth.timeout") * time.Second
address := app.Config.GetString("httpAuth.requestURL")

Expand All @@ -147,6 +173,11 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er
request.SetBasicAuth(username, password)
}

opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(request.Header))

response, err := client.Do(request)
// discard response body
if response != nil && response.Body != nil {
Expand All @@ -155,6 +186,7 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er
}

if err != nil {
ext.LogError(span, err, log.Message("Error authorizing user"))
return false, nil, err
}

Expand All @@ -168,6 +200,8 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er
}

func mongoAuthorize(ctx context.Context, userID string, topics []string) (bool, []string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongo_authorize")
defer span.Finish()
for _, topic := range topics {
pieces := strings.Split(topic, "/")
pieces[len(pieces)-1] = "+"
Expand Down
3 changes: 1 addition & 2 deletions app/histories.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
topics[i] = topicPrefix + "/" + topicSuffix
}

authenticated, authorizedTopics, err := IsAuthorized(c.StdContext(), app, userID, topics...)
authenticated, authorizedTopics, err := IsAuthorized(c, app, userID, topics...)
if err != nil {
return err
}
Expand Down Expand Up @@ -69,6 +69,5 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
}
}
return c.JSON(http.StatusOK, messages)

}
}
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ services:
image: mongo:3.6.23
ports:
- "27017:27017"
volumes:
- mongo:/data/db
jaeger:
image: jaegertracing/all-in-one:1.6
ports:
- 6831:6831/udp
- 16686:16686
volumes:
mongo:
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.18
require (
github.com/franela/goblin v0.0.0-20180407132755-cd5d08fb4ede
github.com/getsentry/raven-go v0.0.0-20160805001729-c9d3cc542ad1
github.com/gocql/gocql v0.0.0-20210128155005-14aa133462a3
github.com/labstack/echo v2.0.3-0.20160926051323-04e6901d05b5+incompatible
github.com/newrelic/go-agent v1.4.0
github.com/onsi/gomega v1.10.1
Expand Down Expand Up @@ -35,7 +34,6 @@ require (
github.com/google/go-cmp v0.5.4 // indirect
github.com/gorilla/context v0.0.0-20160226214623-1ea25387ff6f // indirect
github.com/gorilla/mux v1.6.1 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/hcl v0.0.0-20160822214145-baeb59c71071 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jinzhu/inflection v0.0.0-20180308033659-04140366298a // indirect
Expand Down Expand Up @@ -76,6 +74,5 @@ require (
google.golang.org/appengine v1.0.0 // indirect
google.golang.org/protobuf v1.23.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
11 changes: 0 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ github.com/asaskevich/govalidator v0.0.0-20180315120708-ccb8e960c48f h1:y2hSFdXe
github.com/asaskevich/govalidator v0.0.0-20180315120708-ccb8e960c48f/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.34.28 h1:sscPpn/Ns3i0F4HPEWAVcwdIRaZZCuL7llJ2/60yPIk=
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -59,8 +55,6 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe
github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gocql/gocql v0.0.0-20210128155005-14aa133462a3 h1:/Qqe4XeOmswIOMaQBrTASDuxH59oWHDG2H+7ugClNRc=
github.com/gocql/gocql v0.0.0-20210128155005-14aa133462a3/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/mock v1.0.0 h1:HzcpUG60pfl43n9d2qbdi/3l1uKpAmxlfWEPWtV/QxM=
github.com/golang/mock v1.0.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand All @@ -72,7 +66,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand All @@ -85,8 +78,6 @@ github.com/gorilla/context v0.0.0-20160226214623-1ea25387ff6f h1:9oNbS1z4rVpbnkH
github.com/gorilla/context v0.0.0-20160226214623-1ea25387ff6f/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.1 h1:KOwqsTYZdeuMacU7CxjMNYEKeBvLbxW+psodrbcEa3A=
github.com/gorilla/mux v1.6.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/hcl v0.0.0-20160822214145-baeb59c71071 h1:CBvdmllHJsBQXv6fT+XtxbakibDgKfa0gTttWjakRVw=
github.com/hashicorp/hcl v0.0.0-20160822214145-baeb59c71071/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down Expand Up @@ -295,8 +286,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
57 changes: 18 additions & 39 deletions mongoclient/get_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ func GetMessages(ctx context.Context, queryParameters QueryParameters) []*models
}

func ConvertMessageV2ToMessage(messagev2 *models.MessageV2) *models.Message {
payload := messagev2.Payload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
pBytes, _ := json.Marshal(messagev2.Payload)
return &models.Message{
Timestamp: time.Unix(messagev2.Timestamp, 0),
Payload: finalStr,
Payload: string(pBytes),
Topic: messagev2.Topic,
}
}
Expand Down Expand Up @@ -136,24 +133,15 @@ func GetMessagesPlayerSupportV2WithParameter(ctx context.Context, queryParameter

mongoCollection, err := GetCollection(ctx, queryParameters.Collection)
if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error getting collection from MongoDB"),
log.Error(err),
)
span.SetTag("collection", queryParameters.Collection)
ext.LogError(span, err, log.Message("Error getting collection from MongoDB"))
logger.Logger.Warningf("Error getting collection from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}

rawResults, err := getMessagesPlayerSupportFromCollection(ctx, queryParameters, mongoCollection)
if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error getting messages from MongoDB"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error getting messages from MongoDB"))
logger.Logger.Warningf("Error getting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}
Expand All @@ -164,12 +152,7 @@ func GetMessagesPlayerSupportV2WithParameter(ctx context.Context, queryParameter
searchResults[i], err = convertRawMessageToModelMessage(rawResults[i])

if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error converting messages from MongoDB"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error converting messages from MongoDB"))
logger.Logger.Warningf("Error converting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}
Expand All @@ -189,15 +172,16 @@ func getMessagesPlayerSupportFromCollection(
{"timestamp", -1},
}

statement := extractStatementForTrace(query, sort, queryParameters.Limit)
statement := ExtractStatementForTrace(query, sort, queryParameters.Limit)
span, ctx := opentracing.StartSpanFromContext(
ctx,
"get_messages_player_support_from_collection",
opentracing.Tags{
string(ext.DBStatement): statement,
string(ext.DBType): "mongo",
string(ext.DBInstance): database,
string(ext.DBInstance): mongoCollection.Database().Name(),
string(ext.DBUser): user,
"collection": mongoCollection.Name(),
},
)
defer span.Finish()
Expand All @@ -208,13 +192,13 @@ func getMessagesPlayerSupportFromCollection(

cursor, err := mongoCollection.Find(ctx, query, opts)
if err != nil {
span.SetTag("error", true)
ext.LogError(span, err, log.Message("Error finding messages in MongoDB"))
return nil, err
}

rawResults := make([]MongoMessage, 0)
if err = cursor.All(ctx, &rawResults); err != nil {
span.SetTag("error", true)
ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB"))
return nil, err
}

Expand All @@ -241,18 +225,13 @@ func resolveQuery(queryParameters QueryParameters) bson.M {
return query
}

func extractStatementForTrace(query bson.M, sort bson.D, limit int64) string {
statementByteArray, err := bson.MarshalExtJSON(query, true, true)
if err == nil {
statementByteArray, _ = bson.MarshalExtJSONAppend(
statementByteArray,
bson.D{
{"sort", sort},
{"limit", limit},
},
true,
true,
)
func ExtractStatementForTrace(query bson.M, sort bson.D, limit int64) string {
queryCopy := make(map[string]interface{}, len(query))
for k, v := range query {
queryCopy[k] = v
}
queryCopy["sort"] = sort
queryCopy["limit"] = limit
statementByteArray, _ := bson.MarshalExtJSON(queryCopy, true, true)
return string(statementByteArray)
}
Loading

0 comments on commit 9cf9fa6

Please sign in to comment.