From 1f18c113ec3cbef3604caf158c50a1017c99ae57 Mon Sep 17 00:00:00 2001 From: Adi Ben-Dahan Date: Tue, 10 May 2016 23:27:41 +0300 Subject: [PATCH 1/4] Setup cleanup * added const for defaults * removed unnecessary nesting * reorganize code --- beater/mysqlbeat.go | 208 +++++++++++++++++++++++--------------------- config/config.go | 2 +- 2 files changed, 112 insertions(+), 98 deletions(-) diff --git a/beater/mysqlbeat.go b/beater/mysqlbeat.go index ecb2fa3..553c30d 100644 --- a/beater/mysqlbeat.go +++ b/beater/mysqlbeat.go @@ -33,11 +33,11 @@ type Mysqlbeat struct { password string passwordAES string queries []string - querytypes []string - deltawildcard string + queryTypes []string + deltaWildcard string - oldvalues common.MapStr - oldvaluesage common.MapStr + oldValues common.MapStr + oldValuesAge common.MapStr } var ( @@ -50,6 +50,13 @@ const ( // you can encrypt your password with github.com/adibendahan/mysqlbeat-password-encrypter just update your secret // (and commonIV if you choose to change it) and compile. secret = "github.com/adibendahan/mysqlbeat" + + defaultPeriod = "10s" + defaultHostname = "127.0.0.1" + defaultPort = "3306" + defaultUsername = "mysqlbeat_user" + defaultPassword = "mysqlbeat_pass" + defaultDeltaWildcard = "__DELTA" ) // New Creates beater @@ -66,6 +73,7 @@ func (bt *Mysqlbeat) Config(b *beat.Beat) error { // Load beater beatConfig err := cfgfile.Read(&bt.beatConfig, "") + if err != nil { return fmt.Errorf("Error reading config file: %v", err) } @@ -91,87 +99,93 @@ func roundF2I(val float64, roundOn float64) (newVal int64) { // Setup is a function to setup all beat config & info into the beat struct func (bt *Mysqlbeat) Setup(b *beat.Beat) error { - var err error - - if len(bt.beatConfig.Mysqlbeat.Queries) > 0 { - - bt.oldvalues = common.MapStr{"mysqlbeat": "init"} - bt.oldvaluesage = common.MapStr{"mysqlbeat": "init"} + if len(bt.beatConfig.Mysqlbeat.Queries) < 1 { + err := fmt.Errorf("there are no queries to execute") + return err + } - // Setting default period if not set - if bt.beatConfig.Mysqlbeat.Period == "" { - bt.beatConfig.Mysqlbeat.Period = "10s" - } + // init the oldValues and oldValuesAge array + bt.oldValues = common.MapStr{"mysqlbeat": "init"} + bt.oldValuesAge = common.MapStr{"mysqlbeat": "init"} - if bt.beatConfig.Mysqlbeat.DeltaWildCard == "" { - bt.beatConfig.Mysqlbeat.DeltaWildCard = "__DELTA" - } + if len(bt.beatConfig.Mysqlbeat.Queries) != len(bt.beatConfig.Mysqlbeat.QueryTypes) { + err := fmt.Errorf("error on config file, queries array length != queryTypes array length (each query should have a corresponding type on the same index)") + return err + } - if len(bt.beatConfig.Mysqlbeat.Queries) != len(bt.beatConfig.Mysqlbeat.QueryTypes) { - err := fmt.Errorf("error on config file, queries array length != querytypes array length (each query should have a corresponding type on the same index)") - return err - } + // Setting defaults for missing config + if bt.beatConfig.Mysqlbeat.Period == "" { + logp.Info("Period not selected, proceeding with '%v' as default", defaultPeriod) + bt.beatConfig.Mysqlbeat.Period = defaultPeriod + } - bt.queries = bt.beatConfig.Mysqlbeat.Queries - bt.querytypes = bt.beatConfig.Mysqlbeat.QueryTypes + if bt.beatConfig.Mysqlbeat.Hostname == "" { + logp.Info("Hostname not selected, proceeding with '%v' as default", defaultHostname) + bt.beatConfig.Mysqlbeat.Hostname = defaultHostname + } - logp.Info("Total # of queries to execute: %d", len(bt.queries)) + if bt.beatConfig.Mysqlbeat.Port == "" { + logp.Info("Port not selected, proceeding with '%v' as default", defaultPort) + bt.beatConfig.Mysqlbeat.Port = defaultPort + } - for index, queryStr := range bt.queries { - logp.Info("Query #%d (type: %s): %s", index+1, bt.querytypes[index], queryStr) - } + if bt.beatConfig.Mysqlbeat.Username == "" { + logp.Info("Username not selected, proceeding with '%v' as default", defaultUsername) + bt.beatConfig.Mysqlbeat.Username = defaultUsername + } - bt.period, err = time.ParseDuration(bt.beatConfig.Mysqlbeat.Period) - if err != nil { - return err - } + if bt.beatConfig.Mysqlbeat.Password == "" && bt.beatConfig.Mysqlbeat.EncryptedPassword == "" { + logp.Info("Password not selected, proceeding with default password") + bt.beatConfig.Mysqlbeat.Password = defaultPassword + } - if bt.beatConfig.Mysqlbeat.Hostname == "" { - logp.Info("Hostname not selected, proceeding with '127.0.0.1' as default") - bt.beatConfig.Mysqlbeat.Hostname = "127.0.0.1" - } + if bt.beatConfig.Mysqlbeat.DeltaWildcard == "" { + logp.Info("DeltaWildcard not selected, proceeding with '%v' as default", defaultDeltaWildcard) + bt.beatConfig.Mysqlbeat.DeltaWildcard = defaultDeltaWildcard + } - if bt.beatConfig.Mysqlbeat.Port == "" { - logp.Info("Port not selected, proceeding with '3306' as default") - bt.beatConfig.Mysqlbeat.Port = "3306" - } + // Parse the Period string + var durationParseError error + bt.period, durationParseError = time.ParseDuration(bt.beatConfig.Mysqlbeat.Period) + if durationParseError != nil { + return durationParseError + } - if bt.beatConfig.Mysqlbeat.Username == "" { - logp.Info("Username not selected, proceeding with 'mysqlbeat_user' as default") - bt.beatConfig.Mysqlbeat.Username = "mysqlbeat_user" - } + if bt.beatConfig.Mysqlbeat.Password != "" { + bt.password = bt.beatConfig.Mysqlbeat.Password + } else if bt.beatConfig.Mysqlbeat.EncryptedPassword != "" { - if bt.beatConfig.Mysqlbeat.Password == "" && bt.beatConfig.Mysqlbeat.EncryptedPassword == "" { - logp.Info("Password not selected, proceeding with 'mysqlbeat_pass' as default") - bt.beatConfig.Mysqlbeat.Password = "mysqlbeat_pass" + aesCipher, err := aes.NewCipher([]byte(secret)) + if err != nil { + return err } - if bt.beatConfig.Mysqlbeat.Password != "" { - bt.password = bt.beatConfig.Mysqlbeat.Password - } else if bt.beatConfig.Mysqlbeat.EncryptedPassword != "" { + cfbDecrypter := cipher.NewCFBDecrypter(aesCipher, commonIV) + chiperText, err := hex.DecodeString(bt.beatConfig.Mysqlbeat.EncryptedPassword) - c, err := aes.NewCipher([]byte(secret)) - if err != nil { - return err - } - cfbdec := cipher.NewCFBDecrypter(c, commonIV) - chipertext, _ := hex.DecodeString(bt.beatConfig.Mysqlbeat.EncryptedPassword) - plaintextCopy := make([]byte, len(chipertext)) - cfbdec.XORKeyStream(plaintextCopy, chipertext) - bt.password = string(plaintextCopy) + if err != nil { + return err } - bt.hostname = bt.beatConfig.Mysqlbeat.Hostname - bt.port = bt.beatConfig.Mysqlbeat.Port - bt.username = bt.beatConfig.Mysqlbeat.Username - bt.deltawildcard = bt.beatConfig.Mysqlbeat.DeltaWildCard + plaintextCopy := make([]byte, len(chiperText)) + cfbDecrypter.XORKeyStream(plaintextCopy, chiperText) + bt.password = string(plaintextCopy) + } - } else { + // Save config values to the bt + bt.hostname = bt.beatConfig.Mysqlbeat.Hostname + bt.port = bt.beatConfig.Mysqlbeat.Port + bt.username = bt.beatConfig.Mysqlbeat.Username + bt.queries = bt.beatConfig.Mysqlbeat.Queries + bt.queryTypes = bt.beatConfig.Mysqlbeat.QueryTypes + bt.deltaWildcard = bt.beatConfig.Mysqlbeat.DeltaWildcard - err := fmt.Errorf("there are no queries to execute") - return err + logp.Info("Total # of queries to execute: %d", len(bt.queries)) + for index, queryStr := range bt.queries { + logp.Info("Query #%d (type: %s): %s", index+1, bt.queryTypes[index], queryStr) } + return nil } @@ -195,7 +209,7 @@ func (bt *Mysqlbeat) Run(b *beat.Beat) error { } } -// readData is a function that connects to the mysql, runs the query and returns the data +// beat is a function that connects to the mysql, runs the query and returns the data func (bt *Mysqlbeat) beat(b *beat.Beat) error { connString := bt.username + ":" + bt.password + "@tcp(" + bt.hostname + ":" + bt.port + ")/" @@ -239,7 +253,7 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { currentRow++ - if bt.querytypes[index] == "single-row" && currentRow == 1 { + if bt.queryTypes[index] == "single-row" && currentRow == 1 { err = rows.Scan(scanArgs...) if err != nil { @@ -266,32 +280,32 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { } } - if strings.HasSuffix(strColName, bt.deltawildcard) { + if strings.HasSuffix(strColName, bt.deltaWildcard) { var exists bool - _, exists = bt.oldvalues[strColName] + _, exists = bt.oldValues[strColName] if !exists { - bt.oldvaluesage[strColName] = dtNow + bt.oldValuesAge[strColName] = dtNow if strColType == "string" { - bt.oldvalues[strColName] = strColValue + bt.oldValues[strColName] = strColValue } else if strColType == "int" { - bt.oldvalues[strColName] = nColValue + bt.oldValues[strColName] = nColValue } else if strColType == "float" { - bt.oldvalues[strColName] = fColValue + bt.oldValues[strColName] = fColValue } } else { - if dtOld, ok := bt.oldvaluesage[strColName].(time.Time); ok { + if dtOld, ok := bt.oldValuesAge[strColName].(time.Time); ok { delta := dtNow.Sub(dtOld) if strColType == "int" { var calcVal int64 - oldVal, _ := bt.oldvalues[strColName].(int64) + oldVal, _ := bt.oldValues[strColName].(int64) if nColValue > oldVal { var devRes float64 @@ -303,13 +317,13 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { event[strColName] = calcVal - bt.oldvalues[strColName] = nColValue - bt.oldvaluesage[strColName] = dtNow + bt.oldValues[strColName] = nColValue + bt.oldValuesAge[strColName] = dtNow } else if strColType == "float" { var calcVal float64 - oldVal, _ := bt.oldvalues[strColName].(float64) + oldVal, _ := bt.oldValues[strColName].(float64) if fColValue > oldVal { calcVal = (fColValue - oldVal) / float64(delta.Seconds()) @@ -319,8 +333,8 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { event[strColName] = calcVal - bt.oldvalues[strColName] = fColValue - bt.oldvaluesage[strColName] = dtNow + bt.oldValues[strColName] = fColValue + bt.oldValuesAge[strColName] = dtNow } else { event[strColName] = strColValue } @@ -340,7 +354,7 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { rows.Close() - } else if bt.querytypes[index] == "two-columns" { + } else if bt.queryTypes[index] == "two-columns" { err = rows.Scan(scanArgs...) @@ -366,32 +380,32 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { } } - if strings.HasSuffix(strColName, bt.deltawildcard) { + if strings.HasSuffix(strColName, bt.deltaWildcard) { var exists bool - _, exists = bt.oldvalues[strColName] + _, exists = bt.oldValues[strColName] if !exists { - bt.oldvaluesage[strColName] = dtNow + bt.oldValuesAge[strColName] = dtNow if strColType == "string" { - bt.oldvalues[strColName] = strColValue + bt.oldValues[strColName] = strColValue } else if strColType == "int" { - bt.oldvalues[strColName] = nColValue + bt.oldValues[strColName] = nColValue } else if strColType == "float" { - bt.oldvalues[strColName] = fColValue + bt.oldValues[strColName] = fColValue } } else { - if dtOld, ok := bt.oldvaluesage[strColName].(time.Time); ok { + if dtOld, ok := bt.oldValuesAge[strColName].(time.Time); ok { delta := dtNow.Sub(dtOld) if strColType == "int" { var calcVal int64 - oldVal, _ := bt.oldvalues[strColName].(int64) + oldVal, _ := bt.oldValues[strColName].(int64) if nColValue > oldVal { var devRes float64 @@ -404,15 +418,15 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { event[strColName] = calcVal - bt.oldvalues[strColName] = nColValue - bt.oldvaluesage[strColName] = dtNow + bt.oldValues[strColName] = nColValue + bt.oldValuesAge[strColName] = dtNow //logp.Info("DEBUG: o: %d n: %d time diff: %d calc: %d", oldVal, nColValue, int64(delta.Seconds()), calcVal) } else if strColType == "float" { var calcVal float64 - oldVal, _ := bt.oldvalues[strColName].(float64) + oldVal, _ := bt.oldValues[strColName].(float64) if fColValue > oldVal { calcVal = (fColValue - oldVal) / float64(delta.Seconds()) @@ -422,8 +436,8 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { event[strColName] = calcVal - bt.oldvalues[strColName] = fColValue - bt.oldvaluesage[strColName] = dtNow + bt.oldValues[strColName] = fColValue + bt.oldValuesAge[strColName] = dtNow } else { event[strColName] = strColValue @@ -440,7 +454,7 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { } } - } else if bt.querytypes[index] == "multiple-rows" { + } else if bt.queryTypes[index] == "multiple-rows" { mevent := common.MapStr{ "@timestamp": common.Time(time.Now()), "type": b.Name, @@ -473,7 +487,7 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { b.Events.PublishEvent(mevent) logp.Info("Event sent") - } else if bt.querytypes[index] == "show-slave-delay" && currentRow == 1 { + } else if bt.queryTypes[index] == "show-slave-delay" && currentRow == 1 { err = rows.Scan(scanArgs...) if err != nil { @@ -499,7 +513,7 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { } } - if bt.querytypes[index] != "multiple-rows" && len(event) > 2 { + if bt.queryTypes[index] != "multiple-rows" && len(event) > 2 { b.Events.PublishEvent(event) logp.Info("Event sent") } diff --git a/config/config.go b/config/config.go index 0bf9f96..4aabc42 100644 --- a/config/config.go +++ b/config/config.go @@ -16,5 +16,5 @@ type MysqlbeatConfig struct { EncryptedPassword string `yaml:"encryptedpassword"` Queries []string `yaml:"queries"` QueryTypes []string `yaml:"querytypes"` - DeltaWildCard string `yaml:"deltawildcard"` + DeltaWildcard string `yaml:"deltawildcard"` } From 36ddaa95831b0b13124d46635ec694e06a95c19c Mon Sep 17 00:00:00 2001 From: Adi Ben-Dahan Date: Tue, 10 May 2016 23:34:29 +0300 Subject: [PATCH 2/4] reorder methods location --- beater/mysqlbeat.go | 53 +++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/beater/mysqlbeat.go b/beater/mysqlbeat.go index 553c30d..077e6a5 100644 --- a/beater/mysqlbeat.go +++ b/beater/mysqlbeat.go @@ -81,21 +81,6 @@ func (bt *Mysqlbeat) Config(b *beat.Beat) error { return nil } -// roundF2I is a function that returns a rounded int64 from a float64 -func roundF2I(val float64, roundOn float64) (newVal int64) { - var round float64 - - digit := val - _, div := math.Modf(digit) - if div >= roundOn { - round = math.Ceil(digit) - } else { - round = math.Floor(digit) - } - - return int64(round) -} - // Setup is a function to setup all beat config & info into the beat struct func (bt *Mysqlbeat) Setup(b *beat.Beat) error { @@ -151,23 +136,19 @@ func (bt *Mysqlbeat) Setup(b *beat.Beat) error { return durationParseError } + // Handle password decryption and save in the bt if bt.beatConfig.Mysqlbeat.Password != "" { bt.password = bt.beatConfig.Mysqlbeat.Password } else if bt.beatConfig.Mysqlbeat.EncryptedPassword != "" { - aesCipher, err := aes.NewCipher([]byte(secret)) - if err != nil { return err } - cfbDecrypter := cipher.NewCFBDecrypter(aesCipher, commonIV) chiperText, err := hex.DecodeString(bt.beatConfig.Mysqlbeat.EncryptedPassword) - if err != nil { return err } - plaintextCopy := make([]byte, len(chiperText)) cfbDecrypter.XORKeyStream(plaintextCopy, chiperText) bt.password = string(plaintextCopy) @@ -202,13 +183,24 @@ func (bt *Mysqlbeat) Run(b *beat.Beat) error { } err := bt.beat(b) - if err != nil { return err } } } +// Cleanup is a function that does nothing on this beat :) +func (bt *Mysqlbeat) Cleanup(b *beat.Beat) error { + return nil +} + +// Stop is a function that runs once the beat is stopped +func (bt *Mysqlbeat) Stop() { + close(bt.done) +} + +/// *** mysqlbeat methods ***/// + // beat is a function that connects to the mysql, runs the query and returns the data func (bt *Mysqlbeat) beat(b *beat.Beat) error { @@ -526,12 +518,17 @@ func (bt *Mysqlbeat) beat(b *beat.Beat) error { return nil } -// Cleanup is a function that does nothing on this beat :) -func (bt *Mysqlbeat) Cleanup(b *beat.Beat) error { - return nil -} +// roundF2I is a function that returns a rounded int64 from a float64 +func roundF2I(val float64, roundOn float64) (newVal int64) { + var round float64 -// Stop is a function that runs once the beat is stopped -func (bt *Mysqlbeat) Stop() { - close(bt.done) + digit := val + _, div := math.Modf(digit) + if div >= roundOn { + round = math.Ceil(digit) + } else { + round = math.Floor(digit) + } + + return int64(round) } From f4bbb034ea3eaf43e320946698781448f2442445 Mon Sep 17 00:00:00 2001 From: Adi Ben-Dahan Date: Thu, 12 May 2016 15:13:22 +0300 Subject: [PATCH 3/4] beat Cleanup * Added consts * Break beat into smaller functions * Added comments --- beater/mysqlbeat.go | 541 ++++++++++++++++++++++++-------------------- 1 file changed, 296 insertions(+), 245 deletions(-) diff --git a/beater/mysqlbeat.go b/beater/mysqlbeat.go index 077e6a5..3cb82e3 100644 --- a/beater/mysqlbeat.go +++ b/beater/mysqlbeat.go @@ -22,7 +22,7 @@ import ( _ "github.com/go-sql-driver/mysql" ) -// Mysqlbeat is a struct tol hold the beat config & info +// Mysqlbeat is a struct to hold the beat config & info type Mysqlbeat struct { beatConfig *config.Config done chan struct{} @@ -51,12 +51,27 @@ const ( // (and commonIV if you choose to change it) and compile. secret = "github.com/adibendahan/mysqlbeat" + // default values defaultPeriod = "10s" defaultHostname = "127.0.0.1" defaultPort = "3306" defaultUsername = "mysqlbeat_user" defaultPassword = "mysqlbeat_pass" defaultDeltaWildcard = "__DELTA" + + // query types values + queryTypeSingleRow = "single-row" + queryTypeMultipleRows = "multiple-rows" + queryTypeTwoColumns = "two-columns" + queryTypeSlaveDelay = "show-slave-delay" + + // special column names values + columnNameSlaveDelay = "Seconds_Behind_Master" + + // column types values + columnTypeString = iota + columnTypeInt + columnTypeFloat ) // New Creates beater @@ -66,14 +81,13 @@ func New() *Mysqlbeat { } } -/// *** Beater interface methods ***/// +///*** Beater interface methods ***/// // Config is a function to read config file func (bt *Mysqlbeat) Config(b *beat.Beat) error { // Load beater beatConfig err := cfgfile.Read(&bt.beatConfig, "") - if err != nil { return fmt.Errorf("Error reading config file: %v", err) } @@ -89,10 +103,6 @@ func (bt *Mysqlbeat) Setup(b *beat.Beat) error { return err } - // init the oldValues and oldValuesAge array - bt.oldValues = common.MapStr{"mysqlbeat": "init"} - bt.oldValuesAge = common.MapStr{"mysqlbeat": "init"} - if len(bt.beatConfig.Mysqlbeat.Queries) != len(bt.beatConfig.Mysqlbeat.QueryTypes) { err := fmt.Errorf("error on config file, queries array length != queryTypes array length (each query should have a corresponding type on the same index)") return err @@ -154,6 +164,10 @@ func (bt *Mysqlbeat) Setup(b *beat.Beat) error { bt.password = string(plaintextCopy) } + // init the oldValues and oldValuesAge array + bt.oldValues = common.MapStr{"mysqlbeat": "init"} + bt.oldValuesAge = common.MapStr{"mysqlbeat": "init"} + // Save config values to the bt bt.hostname = bt.beatConfig.Mysqlbeat.Hostname bt.port = bt.beatConfig.Mysqlbeat.Port @@ -199,323 +213,360 @@ func (bt *Mysqlbeat) Stop() { close(bt.done) } -/// *** mysqlbeat methods ***/// +///*** mysqlbeat methods ***/// -// beat is a function that connects to the mysql, runs the query and returns the data +// beat is a function that iterate over the query array, generate and publish events func (bt *Mysqlbeat) beat(b *beat.Beat) error { - connString := bt.username + ":" + bt.password + "@tcp(" + bt.hostname + ":" + bt.port + ")/" + // Build the MySQL connection string + connString := fmt.Sprintf("%v:%v@tcp(%v:%v)/", bt.username, bt.password, bt.hostname, bt.port) db, err := sql.Open("mysql", connString) - if err != nil { return err } defer db.Close() - for index, queryStr := range bt.queries { + // Create a two-columns event for later use + var twoColumnEvent common.MapStr +LoopQueries: + for index, queryStr := range bt.queries { + // Log the query run time and run the query + dtNow := time.Now() rows, err := db.Query(queryStr) - if err != nil { return err } + // Populate columns array columns, err := rows.Columns() if err != nil { return err } - values := make([]sql.RawBytes, len(columns)) - scanArgs := make([]interface{}, len(values)) - - for i := range values { - scanArgs[i] = &values[i] - } - - currentRow := 0 - dtNow := time.Now() - - event := common.MapStr{ - "@timestamp": common.Time(dtNow), - "type": b.Name, + // Populate the two-columns event + if bt.queryTypes[index] == queryTypeTwoColumns { + twoColumnEvent = common.MapStr{ + "@timestamp": common.Time(dtNow), + "type": queryTypeTwoColumns, + } } + LoopRows: for rows.Next() { - currentRow++ + switch bt.queryTypes[index] { + case queryTypeSingleRow, queryTypeSlaveDelay: + // Generate an event from the current row + event, err := bt.generateEventFromRow(rows, columns, bt.queryTypes[index], dtNow) - if bt.queryTypes[index] == "single-row" && currentRow == 1 { - - err = rows.Scan(scanArgs...) if err != nil { - return err + logp.Err("Query #%v error generating event from rows: %v", index, err) + } else if event != nil { + b.Events.PublishEvent(event) + logp.Info("%v event sent", bt.queryTypes[index]) } + // breaking after the first row + break LoopRows - for i, col := range values { - - strColName := string(columns[i]) - strColValue := string(col) - strColType := "string" - - nColValue, err := strconv.ParseInt(strColValue, 0, 64) - - if err == nil { - strColType = "int" - } - - fColValue, err := strconv.ParseFloat(strColValue, 64) - - if err == nil { - if strColType == "string" { - strColType = "float" - } - } - - if strings.HasSuffix(strColName, bt.deltaWildcard) { - - var exists bool - _, exists = bt.oldValues[strColName] - - if !exists { - - bt.oldValuesAge[strColName] = dtNow - - if strColType == "string" { - bt.oldValues[strColName] = strColValue - } else if strColType == "int" { - bt.oldValues[strColName] = nColValue - } else if strColType == "float" { - bt.oldValues[strColName] = fColValue - } - - } else { - - if dtOld, ok := bt.oldValuesAge[strColName].(time.Time); ok { - delta := dtNow.Sub(dtOld) - - if strColType == "int" { - var calcVal int64 - - oldVal, _ := bt.oldValues[strColName].(int64) - - if nColValue > oldVal { - var devRes float64 - devRes = float64((nColValue - oldVal)) / float64(delta.Seconds()) - calcVal = roundF2I(devRes, .5) - } else { - calcVal = 0 - } - - event[strColName] = calcVal - - bt.oldValues[strColName] = nColValue - bt.oldValuesAge[strColName] = dtNow - - } else if strColType == "float" { - var calcVal float64 - - oldVal, _ := bt.oldValues[strColName].(float64) - - if fColValue > oldVal { - calcVal = (fColValue - oldVal) / float64(delta.Seconds()) - } else { - calcVal = 0 - } - - event[strColName] = calcVal - - bt.oldValues[strColName] = fColValue - bt.oldValuesAge[strColName] = dtNow - } else { - event[strColName] = strColValue - } - } - } - } else { - if strColType == "string" { - event[strColName] = strColValue - } else if strColType == "int" { - event[strColName] = nColValue - } else if strColType == "float" { - event[strColName] = fColValue - } - } + case queryTypeMultipleRows: + // Generate an event from the current row + event, err := bt.generateEventFromRow(rows, columns, bt.queryTypes[index], dtNow) + if err != nil { + logp.Err("Query #%v error generating event from rows: %v", index, err) + break LoopRows + } else if event != nil { + b.Events.PublishEvent(event) + logp.Info("%v event sent", bt.queryTypes[index]) } - rows.Close() - - } else if bt.queryTypes[index] == "two-columns" { + // Move to the next row + continue LoopRows - err = rows.Scan(scanArgs...) + case queryTypeTwoColumns: + // append current row to the two-columns event + err := bt.appendRowToEvent(twoColumnEvent, rows, columns, dtNow) if err != nil { - return err + logp.Err("Query #%v error appending two-columns event: %v", index, err) + break LoopRows } - strColName := string(values[0]) - strColValue := string(values[1]) - strColType := "string" + // Move to the next row + continue LoopRows + } + } - nColValue, err := strconv.ParseInt(strColValue, 0, 64) + // If the two-columns event has data, publish it + if bt.queryTypes[index] == queryTypeTwoColumns && len(twoColumnEvent) > 2 { + b.Events.PublishEvent(twoColumnEvent) + logp.Info("%v event sent", queryTypeTwoColumns) + twoColumnEvent = nil + } - if err == nil { - strColType = "int" - } + rows.Close() + if err = rows.Err(); err != nil { + logp.Err("Query #%v error closing rows: %v", index, err) + continue LoopQueries + } + } - fColValue, err := strconv.ParseFloat(strColValue, 64) + // Great success! + return nil +} - if err == nil { - if strColType == "string" { - strColType = "float" - } - } +// appendRowToEvent appends the two-column event the current row data +func (bt *Mysqlbeat) appendRowToEvent(event common.MapStr, row *sql.Rows, columns []string, rowAge time.Time) error { - if strings.HasSuffix(strColName, bt.deltaWildcard) { + // Make a slice for the values + values := make([]sql.RawBytes, len(columns)) - var exists bool - _, exists = bt.oldValues[strColName] + // Copy the references into such a []interface{} for row.Scan + scanArgs := make([]interface{}, len(values)) + for i := range values { + scanArgs[i] = &values[i] + } - if !exists { + // Get RawBytes from data + err := row.Scan(scanArgs...) + if err != nil { + return err + } - bt.oldValuesAge[strColName] = dtNow + // First column is the name, second is the value + strColName := string(values[0]) + strColValue := string(values[1]) + strColType := columnTypeString - if strColType == "string" { - bt.oldValues[strColName] = strColValue - } else if strColType == "int" { - bt.oldValues[strColName] = nColValue - } else if strColType == "float" { - bt.oldValues[strColName] = fColValue - } + // Try to parse the value to an int64 + nColValue, err := strconv.ParseInt(strColValue, 0, 64) + if err == nil { + strColType = columnTypeInt + } - } else { + // Try to parse the value to a float64 + fColValue, err := strconv.ParseFloat(strColValue, 64) + if err == nil { + // If it's not already an established int64, set type to float + if strColType == columnTypeString { + strColType = columnTypeFloat + } + } - if dtOld, ok := bt.oldValuesAge[strColName].(time.Time); ok { - delta := dtNow.Sub(dtOld) + // If the column name ends with the deltaWildcard + if strings.HasSuffix(strColName, bt.deltaWildcard) { + var exists bool + _, exists = bt.oldValues[strColName] + + // If an older value doesn't exist + if !exists { + // Save the current value in the oldValues array + bt.oldValuesAge[strColName] = rowAge + + if strColType == columnTypeString { + bt.oldValues[strColName] = strColValue + } else if strColType == columnTypeInt { + bt.oldValues[strColName] = nColValue + } else if strColType == columnTypeFloat { + bt.oldValues[strColName] = fColValue + } + } else { + // If found the old value's age + if dtOldAge, ok := bt.oldValuesAge[strColName].(time.Time); ok { + delta := rowAge.Sub(dtOldAge) + + if strColType == columnTypeInt { + var calcVal int64 + + // Get old value + oldVal, _ := bt.oldValues[strColName].(int64) + if nColValue > oldVal { + // Calculate the delta + devResult := float64((nColValue - oldVal)) / float64(delta.Seconds()) + // Round the calculated result back to an int64 + calcVal = roundF2I(devResult, .5) + } else { + calcVal = 0 + } - if strColType == "int" { - var calcVal int64 + // Add the delta value to the event + event[strColName] = calcVal - oldVal, _ := bt.oldValues[strColName].(int64) + // Save current values as old values + bt.oldValues[strColName] = nColValue + bt.oldValuesAge[strColName] = rowAge + } else if strColType == columnTypeFloat { + var calcVal float64 - if nColValue > oldVal { - var devRes float64 - devRes = float64((nColValue - oldVal)) / float64(delta.Seconds()) - calcVal = roundF2I(devRes, .5) + // Get old value + oldVal, _ := bt.oldValues[strColName].(float64) + if fColValue > oldVal { + // Calculate the delta + calcVal = (fColValue - oldVal) / float64(delta.Seconds()) + } else { + calcVal = 0 + } - } else { - calcVal = 0 - } + // Add the delta value to the event + event[strColName] = calcVal - event[strColName] = calcVal + // Save current values as old values + bt.oldValues[strColName] = fColValue + bt.oldValuesAge[strColName] = rowAge + } else { + event[strColName] = strColValue + } + } + } + } else { // Not a delta column, add the value to the event as is + if strColType == columnTypeString { + event[strColName] = strColValue + } else if strColType == columnTypeInt { + event[strColName] = nColValue + } else if strColType == columnTypeFloat { + event[strColName] = fColValue + } + } - bt.oldValues[strColName] = nColValue - bt.oldValuesAge[strColName] = dtNow + // Great success! + return nil +} - //logp.Info("DEBUG: o: %d n: %d time diff: %d calc: %d", oldVal, nColValue, int64(delta.Seconds()), calcVal) +// generateEventFromRow creates a new event from the row data and returns it +func (bt *Mysqlbeat) generateEventFromRow(row *sql.Rows, columns []string, queryType string, rowAge time.Time) (common.MapStr, error) { - } else if strColType == "float" { - var calcVal float64 + // Make a slice for the values + values := make([]sql.RawBytes, len(columns)) - oldVal, _ := bt.oldValues[strColName].(float64) + // Copy the references into such a []interface{} for row.Scan + scanArgs := make([]interface{}, len(values)) + for i := range values { + scanArgs[i] = &values[i] + } - if fColValue > oldVal { - calcVal = (fColValue - oldVal) / float64(delta.Seconds()) - } else { - calcVal = 0 - } + // Create the event and populate it + event := common.MapStr{ + "@timestamp": common.Time(rowAge), + "type": queryType, + } - event[strColName] = calcVal + // Get RawBytes from data + err := row.Scan(scanArgs...) + if err != nil { + return nil, err + } - bt.oldValues[strColName] = fColValue - bt.oldValuesAge[strColName] = dtNow + // Loop on all columns + for i, col := range values { + // Get column name and string value + strColName := string(columns[i]) + strColValue := string(col) + strColType := columnTypeString - } else { - event[strColName] = strColValue - } - } - } - } else { - if strColType == "string" { - event[strColName] = strColValue - } else if strColType == "int" { - event[strColName] = nColValue - } else if strColType == "float" { - event[strColName] = fColValue - } - } + // Skip column proccessing when query type is show-slave-delay and the column isn't Seconds_Behind_Master + if queryType == queryTypeSlaveDelay && strColName != columnNameSlaveDelay { + continue + } - } else if bt.queryTypes[index] == "multiple-rows" { - mevent := common.MapStr{ - "@timestamp": common.Time(time.Now()), - "type": b.Name, - } + // Try to parse the value to an int64 + nColValue, err := strconv.ParseInt(strColValue, 0, 64) + if err == nil { + strColType = columnTypeInt + } - err = rows.Scan(scanArgs...) + // Try to parse the value to a float64 + fColValue, err := strconv.ParseFloat(strColValue, 64) + if err == nil { + // If it's not already an established int64, set type to float + if strColType == columnTypeString { + strColType = columnTypeFloat + } + } - if err != nil { - return err + // If query type is single row and the column name ends with the deltaWildcard + if queryType == queryTypeSingleRow && strings.HasSuffix(strColName, bt.deltaWildcard) { + var exists bool + _, exists = bt.oldValues[strColName] + + // If an older value doesn't exist + if !exists { + // Save the current value in the oldValues array + bt.oldValuesAge[strColName] = rowAge + + if strColType == columnTypeString { + bt.oldValues[strColName] = strColValue + } else if strColType == columnTypeInt { + bt.oldValues[strColName] = nColValue + } else if strColType == columnTypeFloat { + bt.oldValues[strColName] = fColValue } - - for i, col := range values { - - strColValue := string(col) - n, err := strconv.ParseInt(strColValue, 0, 64) - - if err == nil { - mevent[columns[i]] = n - } else { - f, err := strconv.ParseFloat(strColValue, 64) - - if err == nil { - mevent[columns[i]] = f + } else { + // If found the old value's age + if dtOldAge, ok := bt.oldValuesAge[strColName].(time.Time); ok { + delta := rowAge.Sub(dtOldAge) + + if strColType == columnTypeInt { + var calcVal int64 + + // Get old value + oldVal, _ := bt.oldValues[strColName].(int64) + + if nColValue > oldVal { + // Calculate the delta + devResult := float64((nColValue - oldVal)) / float64(delta.Seconds()) + // Round the calculated result back to an int64 + calcVal = roundF2I(devResult, .5) } else { - mevent[columns[i]] = strColValue + calcVal = 0 } - } - - } - - b.Events.PublishEvent(mevent) - logp.Info("Event sent") - } else if bt.queryTypes[index] == "show-slave-delay" && currentRow == 1 { - - err = rows.Scan(scanArgs...) - if err != nil { - return err - } - for i, col := range values { + // Add the delta value to the event + event[strColName] = calcVal - if string(columns[i]) == "Seconds_Behind_Master" { + // Save current values as old values + bt.oldValues[strColName] = nColValue + bt.oldValuesAge[strColName] = rowAge + } else if strColType == columnTypeFloat { + var calcVal float64 + oldVal, _ := bt.oldValues[strColName].(float64) - strColName := string(columns[i]) - strColValue := string(col) + if fColValue > oldVal { + // Calculate the delta + calcVal = (fColValue - oldVal) / float64(delta.Seconds()) + } else { + calcVal = 0 + } - nColValue, err := strconv.ParseInt(strColValue, 0, 64) + // Add the delta value to the event + event[strColName] = calcVal - if err == nil { - event[strColName] = nColValue - } + // Save current values as old values + bt.oldValues[strColName] = fColValue + bt.oldValuesAge[strColName] = rowAge + } else { + event[strColName] = strColValue } - rows.Close() - } } + } else { // Not a delta column, add the value to the event as is + if strColType == columnTypeString { + event[strColName] = strColValue + } else if strColType == columnTypeInt { + event[strColName] = nColValue + } else if strColType == columnTypeFloat { + event[strColName] = fColValue + } } + } - if bt.queryTypes[index] != "multiple-rows" && len(event) > 2 { - b.Events.PublishEvent(event) - logp.Info("Event sent") - } - - if err = rows.Err(); err != nil { - return err - } + // If the event has no data, set to nil + if len(event) == 2 { + event = nil } - return nil + return event, nil } // roundF2I is a function that returns a rounded int64 from a float64 From b0dbe1602d605e9c96d923acb66dcb6b5e36f507 Mon Sep 17 00:00:00 2001 From: Adi Ben-Dahan Date: Thu, 12 May 2016 23:02:23 +0300 Subject: [PATCH 4/4] Add mysql driver glide dependency --- glide.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/glide.yaml b/glide.yaml index 516c728..17992f6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -3,7 +3,9 @@ import: - package: github.com/elastic/beats version: d8ca37efa8888ce624e2cfaa90107e79fd41be1e subpackages: - - /libbeat/beat + - libbeat/beat - libbeat/cfgfile - libbeat/common - libbeat/logp +- package: github.com/go-sql-driver/mysql + version: 1421caf44f6464fd2ee8de694c7508ee13f92964