Skip to content

Commit

Permalink
add megatron parser
Browse files Browse the repository at this point in the history
Signed-off-by: Sunyanan Choochotkaew <[email protected]>
  • Loading branch information
sunya-ch committed Oct 27, 2023
1 parent 1e8caf2 commit 6dca89a
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 2 deletions.
2 changes: 1 addition & 1 deletion controllers/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *ResultCollector) Collect(ch chan<- prometheus.Metric) {
values := make(map[string]interface{})
err := json.Unmarshal([]byte(item.Result), &values)
if err != nil {
c.Log.Info(fmt.Sprintf("Cannot parse values of %s from respone: %s", benchmarkName, item.Result))
c.Log.Info(fmt.Sprintf("Cannot parse values of %s from respone: %s: %v", benchmarkName, item.Result, err))
continue
}
jobName := item.JobName
Expand Down
5 changes: 4 additions & 1 deletion cpe-parser/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var stressParser parser.Parser = parser.NewStressParser()
var mlperfParser parser.Parser = parser.NewMlPerfParser()
var fmworkParser parser.Parser = parser.NewFMWorkParser()
var fmtrainParser parser.Parser = parser.NewFMTrainParser()
var megatronParser parser.Parser = parser.NewMegatronParser()

var parserMap map[string]parser.Parser = map[string]parser.Parser{
"codait": codaitParser,
Expand All @@ -53,6 +54,7 @@ var parserMap map[string]parser.Parser = map[string]parser.Parser{
"mlperf": mlperfParser,
"fmwork": fmworkParser,
"fmtrain": fmtrainParser,
"megatron": megatronParser,
}

/////////////////////////////////////////////
Expand Down Expand Up @@ -193,7 +195,8 @@ func ReqPushLog(w http.ResponseWriter, r *http.Request) {
}
}
status = "OK"
msg = fmt.Sprintf("%v", logSpec.ConstLabels)
dataBytes, _ := json.Marshal(values)
msg = string(dataBytes)
pkey = ppkey
pval = ppval
}
Expand Down
109 changes: 109 additions & 0 deletions cpe-parser/parser/megatron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2022- IBM Inc. All rights reserved
* SPDX-License-Identifier: Apache2.0
*/

package parser

import (
"bufio"
"bytes"
"io"
"strings"
)

type MegatronParser struct {
*BaseParser
}

/*
2023-10-26 21:31:16,318 [Rank 15]: iteration 20/ 20 | consumed samples: 20 | elapsed time per iteration (ms): 3874.0 | learning rate: 6.000E-06 | global batch size: 16 | lm loss: 8.777018E+00 | loss scale: 1.0 | grad norm: 12.959 | number of skipped iterations: 0 | number of nan iterations: 0 | TFLOPs: 320.33 | tokens-per-second-per-gpu: 2114.61 |
*/

const (
MEGATRON_PERFORMANCE_KEY = "tokens-per-second-per-gpu"
)

func NewMegatronParser() *MegatronParser {
megatronParser := &MegatronParser{}
abs := &BaseParser{
Parser: megatronParser,
}
megatronParser.BaseParser = abs
return megatronParser
}

func (p *MegatronParser) ParseValue(body []byte) (map[string]interface{}, error) {
values := make(map[string]interface{})
valuesWithLabels := make(map[string][]ValueWithLabels)

bytesReader := bytes.NewReader(body)
bufReader := bufio.NewReader(bytesReader)
for {
line, _, err := bufReader.ReadLine()
linestr := string(line)
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if strings.Contains(linestr, MEGATRON_PERFORMANCE_KEY) {
labels := make(map[string]string)
splited := strings.Split(linestr, "|")
for _, col := range splited {
if strings.Contains(col, "Rank") && strings.Contains(col, "iteration") {
iterationFields := strings.Fields(col)
for _, field := range iterationFields {
if strings.Contains(field, "/") {
iter := strings.Split(field, "/")[0]
labels["iteration"] = iter
}
if strings.Contains(field, "]") {
rank := strings.Split(field, "]")[0]
labels["rank"] = rank
}
}
} else {
key, value, err := splitValue(col, ":")
if err == nil {
// trim key
keyFields := strings.Fields(key)
key = strings.Join(keyFields, " ")
newValue := ValueWithLabels{
Labels: labels,
Value: value,
}
if valueWithLabelsArr, ok := valuesWithLabels[key]; ok {
valuesWithLabels[key] = append(valueWithLabelsArr, newValue)
} else {
valuesWithLabels[key] = []ValueWithLabels{newValue}
}
}
}
}

}
}
for key, valueWithLabelsArr := range valuesWithLabels {
values[key] = valueWithLabelsArr
}
return values, nil
}

func (p *MegatronParser) GetPerformanceValue(values map[string]interface{}) (string, float64) {
if valuesWithLabelsInterface, ok := values[MEGATRON_PERFORMANCE_KEY]; ok {
valuesWithLabels := valuesWithLabelsInterface.([]ValueWithLabels)
if len(valuesWithLabels) == 0 {
return "NoValue", -1
}
avgValue := float64(0)
for _, valueWithLabel := range valuesWithLabels {
avgValue += valueWithLabel.Value
}
if len(valuesWithLabels) > 0 {
avgValue /= float64(len(valuesWithLabels))
}
return MEGATRON_PERFORMANCE_KEY, avgValue
}
return "NoKey", -1
}
55 changes: 55 additions & 0 deletions cpe-parser/parser/megatron_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022- IBM Inc. All rights reserved
* SPDX-License-Identifier: Apache2.0
*/

// Run go test -v parser/megatron_test.go

package parser

import (
"fmt"
"io/ioutil"
"math"
"testing"

"github.com/IBM/cpe-operator/cpe-parser/parser"
"github.com/stretchr/testify/assert"
)

// update log key here
const (
LOG_KEY string = "megatron"
)

func getFileName() string {
return fmt.Sprintf("sample/%s_pod_log.log", LOG_KEY)
}

var generalParser parser.Parser

// update parser init function
var testParser = parser.NewMegatronParser()

func TestParseValue(t *testing.T) {
fileName := getFileName()
bytes, err := ioutil.ReadFile(fileName)
generalParser = testParser
assert.Nil(t, err)
values, err := generalParser.ParseValue(bytes)
assert.Nil(t, err)
// update assert value length
assert.Equal(t, len(values), 11)
}

func TestGetPerformanceValue(t *testing.T) {
fileName := getFileName()
bytes, err := ioutil.ReadFile(fileName)
generalParser = testParser
assert.Nil(t, err)
values, err := generalParser.ParseValue(bytes)
key, pvalue := testParser.GetPerformanceValue(values)
fmt.Printf("PKey: %s, Pvalue: %.2f\n", key, pvalue)
// update assert performance value
assert.Equal(t, math.Floor(pvalue), math.Floor((2206.58+2114.61)/2))
}

0 comments on commit 6dca89a

Please sign in to comment.