Skip to content

Commit

Permalink
Merge pull request #14 from auxten/float32
Browse files Browse the repository at this point in the history
Mem usage optimization
  • Loading branch information
auxten authored Nov 1, 2022
2 parents 1745897 + 85e96f0 commit 0d284b7
Show file tree
Hide file tree
Showing 27 changed files with 486 additions and 1,127 deletions.
9 changes: 5 additions & 4 deletions example/futuresales/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/auxten/edgeRec/feature"
"github.com/auxten/edgeRec/nn"
"github.com/auxten/edgeRec/ps"
"github.com/auxten/edgeRec/recommend"
"github.com/auxten/edgeRec/schema"
"github.com/auxten/edgeRec/utils"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestTrain(t *testing.T) {

fmt.Printf("training data count: %d\n", trainCount)
// training
trainSample := make(ps.Samples, trainCount)
trainSample := make(recommend.Samples, trainCount)
{
trainRows, err := scanner.GetRows(fmt.Sprintf(`select date, date_block_num, shop_id, s.item_id, item_price, item_category_id, item_name, item_cnt_day from sales_train s
left join items i on s.item_id = i.item_id limit %d`, trainCount))
Expand All @@ -104,7 +105,7 @@ func TestTrain(t *testing.T) {
if err != nil {
log.Fatal(err)
}
trainSample[i] = ps.Sample{
trainSample[i] = recommend.Sample{
Input: featureTransform(date, date_block_num, shop_id, item_id, item_price, item_category_id.Float64, item_name.String),
Response: []float64{outputTransform(item_cnt_day)}}
i++
Expand All @@ -118,7 +119,7 @@ func TestTrain(t *testing.T) {
// 10% test data
fmt.Printf("test data count: %d\n", testCount)
i = 0
testSample := make(ps.Samples, testCount)
testSample := make(recommend.Samples, testCount)
rows, err := scanner.GetRows(fmt.Sprintf(`select date, date_block_num, shop_id, s.item_id, item_price, item_category_id, item_name,
item_cnt_day from sales_train s
left join items i on s.item_id = i.item_id limit %d, %d`, trainCount, testCount))
Expand All @@ -138,7 +139,7 @@ func TestTrain(t *testing.T) {
if err != nil {
log.Fatal(err)
}
testSample[i] = ps.Sample{
testSample[i] = recommend.Sample{
Input: featureTransform(date, date_block_num, shop_id, item_id, item_price, item_category_id.Float64, item_name.String),
Response: []float64{outputTransform(item_cnt_day)}}
i++
Expand Down
47 changes: 15 additions & 32 deletions example/movielens/dinimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/auxten/edgeRec/model/din"
rcmd "github.com/auxten/edgeRec/recommend"
log "github.com/sirupsen/logrus"
"gonum.org/v1/gonum/mat"
"gorgonia.org/tensor"
)

Expand All @@ -17,8 +16,8 @@ type dinImpl struct {
iFeatureDim int
cFeatureDim int

predBatchSize int
batchSize, epochs int
PredBatchSize int
BatchSize, epochs int
sampleInfo *rcmd.SampleInfo

// stop training on earlyStop count of no cost improvement
Expand All @@ -29,18 +28,14 @@ type dinImpl struct {
pred *din.DinNet
}

func (d *dinImpl) Predict(X mat.Matrix, Y mat.Mutable) *mat.Dense {
numPred, _ := X.Dims()
inputTensor := tensor.New(tensor.WithShape(X.Dims()), tensor.WithBacking(X.(*mat.Dense).RawMatrix().Data))
y, err := din.Predict(d.pred, numPred, d.predBatchSize, d.sampleInfo, inputTensor)
func (d *dinImpl) Predict(X tensor.Tensor) tensor.Tensor {
numPred := X.Shape()[0]
y, err := din.Predict(d.pred, numPred, d.PredBatchSize, d.sampleInfo, X)
if err != nil {
log.Errorf("predict din model failed: %v", err)
return nil
}
yDense := mat.NewDense(numPred, 1, y)
if Y != nil {
Y.(*mat.Dense).SetRawMatrix(yDense.RawMatrix())
}
yDense := tensor.NewDense(din.DT, tensor.Shape{numPred, 1}, tensor.WithBacking(y))

return yDense
}
Expand All @@ -53,31 +48,19 @@ func (d *dinImpl) Fit(trainSample *rcmd.TrainSample) (pred rcmd.PredictAbstract,
d.cFeatureDim = trainSample.Info.CtxFeatureRange[1] - trainSample.Info.CtxFeatureRange[0]
d.sampleInfo = &trainSample.Info

sampleLen := len(trainSample.Data)
X := mat.NewDense(sampleLen, len(trainSample.Data[0].Input), nil)
for i, sample := range trainSample.Data {
X.SetRow(i, sample.Input)
}
Y := mat.NewDense(sampleLen, 1, nil)
for i, sample := range trainSample.Data {
Y.Set(i, 0, sample.Response[0])
if trainSample.Rows != len(trainSample.Y) {
err = fmt.Errorf("number of examples %d and labels %d do not match",
trainSample.Rows, len(trainSample.Y))
return
}

inputs := tensor.New(tensor.WithShape(trainSample.Rows, trainSample.XCols), tensor.WithBacking(trainSample.X))
labels := tensor.New(tensor.WithShape(trainSample.Rows, 1), tensor.WithBacking(trainSample.Y))

d.learner = din.NewDinNet(d.uProfileDim, d.uBehaviorSize, d.uBehaviorDim, d.iFeatureDim, d.cFeatureDim)
var (
inputs, labels tensor.Tensor
numExamples, _ = X.Dims()
numLabels, _ = Y.Dims()
)
if numExamples != numLabels {
err = fmt.Errorf("number of examples and labels do not match")
return
}

inputs = tensor.New(tensor.WithShape(X.Dims()), tensor.WithBacking(X.RawMatrix().Data))
labels = tensor.New(tensor.WithShape(Y.Dims()), tensor.WithBacking(Y.RawMatrix().Data))
err = din.Train(d.uProfileDim, d.uBehaviorSize, d.uBehaviorDim, d.iFeatureDim, d.cFeatureDim,
numExamples, d.batchSize, d.epochs, d.earlyStop,
trainSample.Rows, d.BatchSize, d.epochs, d.earlyStop,
d.sampleInfo,
inputs, labels,
d.learner,
Expand All @@ -97,7 +80,7 @@ func (d *dinImpl) Fit(trainSample *rcmd.TrainSample) (pred rcmd.PredictAbstract,
return
}
err = din.InitForwardOnlyVm(d.uProfileDim, d.uBehaviorSize, d.uBehaviorDim, d.iFeatureDim, d.cFeatureDim,
d.predBatchSize, dinPred)
d.PredBatchSize, dinPred)
if err != nil {
log.Errorf("init forward only vm failed: %v", err)
return
Expand Down
28 changes: 17 additions & 11 deletions example/movielens/dinimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"math/rand"
"testing"

"github.com/auxten/edgeRec/nn/metrics"
rcmd "github.com/auxten/edgeRec/recommend"
"github.com/auxten/edgeRec/utils"
. "github.com/smartystreets/goconvey/convey"
"gonum.org/v1/gonum/mat"
)

type dinPredictor struct {
Expand All @@ -23,6 +22,8 @@ func TestDinOnMovielens(t *testing.T) {

var (
movielens = &MovielensRec{
//DataPath: "movielens-20m.db",
//SampleCnt: 14400000,
DataPath: "movielens.db",
SampleCnt: 79948,
//SampleCnt: 10000,
Expand All @@ -33,10 +34,12 @@ func TestDinOnMovielens(t *testing.T) {

Convey("Train din model", t, func() {
dinModel := &dinImpl{
predBatchSize: 100,
batchSize: 200,
epochs: 100,
earlyStop: 20,
//PredBatchSize: 5000,
PredBatchSize: 100,
//BatchSize: 5000,
BatchSize: 200,
epochs: 200,
earlyStop: 20,
}
trainCtx := context.Background()
model, err = rcmd.Train(trainCtx, movielens, dinModel)
Expand All @@ -45,24 +48,26 @@ func TestDinOnMovielens(t *testing.T) {
})

Convey("Predict din model", t, func() {
//testCount := 5610000
testCount := 20600
rows, err := db.Query(
"SELECT userId, movieId, rating, timestamp FROM ratings_test ORDER BY timestamp, userId ASC LIMIT ?", testCount)
So(err, ShouldBeNil)
var (
userId int
itemId int
rating float64
rating float32
timestamp int64
yTrue = mat.NewDense(testCount, 1, nil)
yTrue []float32
sampleKeys = make([]rcmd.Sample, 0, testCount)
)
for i := 0; rows.Next(); i++ {
err = rows.Scan(&userId, &itemId, &rating, &timestamp)
if err != nil {
t.Errorf("scan error: %v", err)
}
yTrue.Set(i, 0, BinarizeLabel(rating))
//yTrue.Set(i, 0, BinarizeLabel(rating))
yTrue = append(yTrue, BinarizeLabel32(rating))
sampleKeys = append(sampleKeys, rcmd.Sample{userId, itemId, 0, timestamp})
}
batchPredictCtx := context.Background()
Expand All @@ -73,8 +78,9 @@ func TestDinOnMovielens(t *testing.T) {
}
yPred, err := rcmd.BatchPredict(batchPredictCtx, dinPred, sampleKeys)
So(err, ShouldBeNil)
rocAuc := metrics.ROCAUCScore(yTrue, yPred, "", nil)
rowCount, _ := yTrue.Dims()
rocAuc := utils.RocAuc32(yPred.Data().([]float32), yTrue)
//rocAuc := metrics.ROCAUCScore(yTrue, yPred, "", nil)
rowCount := len(yTrue)
fmt.Printf("rocAuc on test set %d: %f\n", rowCount, rocAuc)
})
}
35 changes: 21 additions & 14 deletions example/movielens/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func initDb(dbPath string) (err error) {
type MovielensRec struct {
DataPath string
SampleCnt int
mRatingMap map[int][2]float64
mRatingMap map[int][2]float32
ubcTrain *ubcache.UserBehaviorCache
ubcPredict *ubcache.UserBehaviorCache
}
Expand Down Expand Up @@ -103,8 +103,8 @@ func (recSys *MovielensRec) GetItemFeature(ctx context.Context, itemId int) (ten
var (
movieYear int
itemTitle, itemGenres string
avgRating, cntRating float64
GenreTensor [50]float64 // 5 * 10
avgRating, cntRating float32
GenreTensor [50]float32 // 5 * 10
)
if err = rows.Scan(&itemTitle, &itemGenres); err != nil {
log.Errorf("failed to scan item %d: %v", itemId, err)
Expand All @@ -129,11 +129,11 @@ func (recSys *MovielensRec) GetItemFeature(ctx context.Context, itemId int) (ten
}
if mr, ok := recSys.mRatingMap[itemId]; ok {
avgRating = mr[0] / 5.
cntRating = math.Log2(mr[1])
cntRating = float32(math.Log2(float64(mr[1])))
}

tensor = utils.ConcatSlice(tensor, GenreTensor[:], rcmd.Tensor{
float64(movieYear-1990) / 20.0, avgRating, cntRating,
tensor = utils.ConcatSlice32(tensor, GenreTensor[:], rcmd.Tensor{
float32(movieYear-1990) / 20.0, avgRating, cntRating,
})
return
} else {
Expand All @@ -149,7 +149,7 @@ func (recSys *MovielensRec) GetUserFeature(ctx context.Context, userId int) (ten
ugenres sql.NullString
avgRating sql.NullFloat64
cntRating sql.NullFloat64
top5GenresTensor [50]float64
top5GenresTensor [50]float32
)
// get stage value from ctx
stage := ctx.Value(rcmd.StageKey).(rcmd.Stage)
Expand Down Expand Up @@ -183,7 +183,7 @@ func (recSys *MovielensRec) GetUserFeature(ctx context.Context, userId int) (ten
for i, genre := range top5Genres {
copy(top5GenresTensor[i*10:], genreFeature(genre.Key))
}
tensor = utils.ConcatSlice(rcmd.Tensor{avgRating.Float64 / 5., cntRating.Float64 / 100.}, top5GenresTensor[:])
tensor = utils.ConcatSlice32(rcmd.Tensor{float32(avgRating.Float64) / 5., float32(cntRating.Float64) / 100.}, top5GenresTensor[:])
if rcmd.DebugItemId != 0 && userId == rcmd.DebugUserId {
log.Infof("user %d: %v ", userId, tensor)
}
Expand All @@ -196,7 +196,7 @@ func (recSys *MovielensRec) GetUserFeature(ctx context.Context, userId int) (ten
}

func genreFeature(genre string) (tensor rcmd.Tensor) {
return feature.HashOneHot([]byte(genre), 10)
return feature.HashOneHot32([]byte(genre), 10)
}

func (recSys *MovielensRec) SampleGenerator(_ context.Context) (ret <-chan rcmd.Sample, err error) {
Expand Down Expand Up @@ -228,14 +228,14 @@ func (recSys *MovielensRec) SampleGenerator(_ context.Context) (ret <-chan rcmd.
i++
var (
userId, movieId int
rating, label float64
rating, label float32
timestamp int64
)
if err = rows.Scan(&userId, &movieId, &rating, &timestamp); err != nil {
log.Errorf("failed to scan ratings: %v", err)
return
}
label = BinarizeLabel(rating)
label = BinarizeLabel32(rating)
// label = rating / 5.0

sampleCh <- rcmd.Sample{
Expand Down Expand Up @@ -268,18 +268,18 @@ func (recSys *MovielensRec) PreTrain(ctx context.Context) (err error) {
return
}
defer rows1.Close()
recSys.mRatingMap = make(map[int][2]float64)
recSys.mRatingMap = make(map[int][2]float32)
for rows1.Next() {
var (
movieId int
avgR float64
avgR float32
cntR int
)
if err = rows1.Scan(&movieId, &avgR, &cntR); err != nil {
log.Errorf("failed to scan movieId: %v", err)
return
}
recSys.mRatingMap[movieId] = [2]float64{avgR, float64(cntR)}
recSys.mRatingMap[movieId] = [2]float32{avgR, float32(cntR)}
}

// fill user behavior cache
Expand Down Expand Up @@ -389,3 +389,10 @@ func BinarizeLabel(rating float64) float64 {
}
return 0.0
}

func BinarizeLabel32(rating float32) float32 {
if rating > 3.5 {
return 1.0
}
return 0.0
}
20 changes: 14 additions & 6 deletions example/movielens/feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestFeatureEngineer(t *testing.T) {
testData := []struct {
userId int
itemId int
expected float64
expected float32
}{
{8, 527, 1.},
{8, 432, 0.},
Expand All @@ -68,8 +68,8 @@ func TestFeatureEngineer(t *testing.T) {
fmt.Printf("userId:%d, itemId:%d, expected:%f, pred:%f\n",
test.userId, test.itemId, test.expected, score[0].Score)
//So(pred.At(0, 0), ShouldAlmostEqual, test.expected)
yTrue.Set(i, 0, test.expected)
yPred.Set(i, 0, score[0].Score)
yTrue.Set(i, 0, float64(test.expected))
yPred.Set(i, 0, float64(score[0].Score))
}

rocAuc := metrics.ROCAUCScore(yTrue, yPred, "", nil)
Expand All @@ -84,23 +84,31 @@ func TestFeatureEngineer(t *testing.T) {
var (
userId int
itemId int
rating float64
rating float32
timestamp int64
yTrue = mat.NewDense(testCount, 1, nil)
yPredDense = mat.NewDense(testCount, 1, nil)
sampleKeys = make([]rcmd.Sample, 0, testCount)
)
for i := 0; rows.Next(); i++ {
err = rows.Scan(&userId, &itemId, &rating, &timestamp)
if err != nil {
t.Errorf("scan error: %v", err)
}
yTrue.Set(i, 0, BinarizeLabel(rating))
yTrue.Set(i, 0, BinarizeLabel(float64(rating)))
sampleKeys = append(sampleKeys, rcmd.Sample{userId, itemId, 0, timestamp})
}
batchPredictCtx := context.Background()
yPred, err := rcmd.BatchPredict(batchPredictCtx, model, sampleKeys)
So(err, ShouldBeNil)
rocAuc := metrics.ROCAUCScore(yTrue, yPred, "", nil)
for i := 0; i < testCount; i++ {
val, err := yPred.At(i, 0)
if err != nil {
t.Errorf("yPred.At error: %v", err)
}
yPredDense.Set(i, 0, float64(val.(float32)))
}
rocAuc := metrics.ROCAUCScore(yTrue, yPredDense, "", nil)
rowCount, _ := yTrue.Dims()
fmt.Printf("rocAuc on test set %d: %f\n", rowCount, rocAuc)
})
Expand Down
Loading

0 comments on commit 0d284b7

Please sign in to comment.