Skip to content

Commit

Permalink
cleanup and get time for select iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 21, 2023
1 parent 72fcc84 commit 00b3e7c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/tidepool-org/platform/migrations/20231128_jellyfish_migration/utils"
)

type Config struct {
Expand Down Expand Up @@ -390,6 +388,8 @@ func (m *Migration) fetchAndUpdateBatch() bool {
Sort: bson.M{"_id": 1},
},
)
//dDataCursor.SetBatchSize(1000)

if err != nil {
log.Printf("failed to select data: %s", err)
return false
Expand All @@ -399,40 +399,40 @@ func (m *Migration) fetchAndUpdateBatch() bool {

log.Printf("fetch took %s", time.Since(fetchStart))
updateStart := time.Now()
var totalDuration time.Duration
for dDataCursor.Next(m.ctx) {

start := time.Now()
var dDataResult bson.M
if err = dDataCursor.Decode(&dDataResult); err != nil {
log.Printf("failed decoding data: %s", err)
return false
}
log.Printf("cursor decode %s", time.Since(start))
datumID, datumUpdates, err := utils.GetDatumUpdates(dDataResult)
if err != nil {
m.onError(err, datumID, "failed getting updates")
continue
}
log.Printf("datum updates %s", time.Since(start))
updateOp := mongo.NewUpdateOneModel()
updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": dDataResult["modifiedTime"]})
updateOp.SetUpdate(datumUpdates)
m.updates = append(m.updates, updateOp)
m.lastUpdatedId = datumID
log.Printf("added to updates %s", time.Since(start))
totalDuration += time.Since(start)
// start := time.Now()
// var dDataResult bson.M
// if err = dDataCursor.Decode(&dDataResult); err != nil {
// log.Printf("failed decoding data: %s", err)
// return false
// }
// log.Printf("cursor decode %s", time.Since(start))
// datumID, datumUpdates, err := utils.GetDatumUpdates(dDataResult)
// if err != nil {
// m.onError(err, datumID, "failed getting updates")
// continue
// }
// log.Printf("datum updates %s", time.Since(start))
// updateOp := mongo.NewUpdateOneModel()
// updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": dDataResult["modifiedTime"]})
// updateOp.SetUpdate(datumUpdates)
// m.updates = append(m.updates, updateOp)
// m.lastUpdatedId = datumID
// log.Printf("added to updates %s", time.Since(start))
}
log.Printf("all datum %s", totalDuration)

log.Printf("batch update took %s", time.Since(updateStart))
log.Printf("batch iteration took %s", time.Since(updateStart))
log.Printf("fetch and update took %s", time.Since(fetchAndUpdateStart))
return len(m.updates) > 0
}
return false
}

func (m *Migration) writeBatchUpdates() (int, error) {
if len(m.updates) == 0 {
return 0, nil
}
start := time.Now()
var getBatches = func(chunkSize int) [][]mongo.WriteModel {
batches := [][]mongo.WriteModel{}
Expand Down
86 changes: 53 additions & 33 deletions migrations/20231128_jellyfish_migration/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,72 +83,76 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
var rename bson.M
var identityFields []string

var errorHandler = func(id string, err error) (string, bson.M, error) {
return id, nil, err
}

datumID, ok := bsonData["_id"].(string)
if !ok {
return errorHandler("", errors.New("cannot get the datum id"))
return "", nil, errors.New("cannot get the datum id")
}

datumType, ok := bsonData["type"].(string)
if !ok {
return errorHandler(datumID, errors.New("cannot get the datum type"))
}

//log.Printf("updates bsonData marshal start %s", time.Since(start))
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, errors.New("cannot get the datum type")
}

//log.Printf("updates bsonData marshal end %s", time.Since(start))

switch datumType {
case basal.Type:
//log.Printf("updating basal start %s", time.Since(start))
var datum *basal.Basal
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
case bolus.Type:
//log.Printf("updating bolus start %s", time.Since(start))
var datum *bolus.Bolus
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
case device.Type:
//log.Printf("updating device event start %s", time.Since(start))
var datum *bolus.Bolus
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
case pump.Type:
//log.Printf("updating pump settings start %s", time.Since(start))
var datum *types.Base
var datum types.Base
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}

if pumpSettingsHasBolus(bsonData) {
Expand All @@ -157,16 +161,20 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {

sleepSchedules, err := updateIfExistsPumpSettingsSleepSchedules(bsonData)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
} else if sleepSchedules != nil {
set["sleepSchedules"] = sleepSchedules
}
case selfmonitored.Type:
//log.Printf("updating smbg start %s", time.Since(start))
var datum *selfmonitored.SelfMonitored
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
Expand All @@ -176,14 +184,18 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
case ketone.Type:
//log.Printf("updating ketone start %s", time.Since(start))
var datum *ketone.Ketone
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
Expand All @@ -193,14 +205,18 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
case continuous.Type:
//log.Printf("updating cbg start %s", time.Since(start))
var datum *continuous.Continuous
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
Expand All @@ -210,26 +226,30 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
default:
//log.Printf("updating generic start %s", time.Since(start))
var datum *types.Base
dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return datumID, nil, err
}
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
identityFields, err = datum.IdentityFields()
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}
}

//log.Printf("updates made end %s", time.Since(start))
//log.Printf("generate hash start %s", time.Since(start))
hash, err := deduplicator.GenerateIdentityHash(identityFields)
if err != nil {
return errorHandler(datumID, err)
return datumID, nil, err
}

//log.Printf("generate hash end %s", time.Since(start))
Expand Down

0 comments on commit 00b3e7c

Please sign in to comment.