Skip to content

Commit

Permalink
[back-37] jellyfish data migration (#665)
Browse files Browse the repository at this point in the history
* migrate existing jellyfish records

* migrate jellyfish pumpSettings bolus if present
  • Loading branch information
jh-bate authored Nov 8, 2023
1 parent b2242ed commit e8e07ff
Show file tree
Hide file tree
Showing 2 changed files with 389 additions and 0 deletions.
279 changes: 279 additions & 0 deletions migrations/back_37/back_37.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package main

import (
"context"
"fmt"
"strconv"
"time"

"github.com/urfave/cli"
"go.mongodb.org/mongo-driver/bson"

"github.com/tidepool-org/platform/application"
"github.com/tidepool-org/platform/data/blood/glucose"
"github.com/tidepool-org/platform/data/deduplicator/deduplicator"
"github.com/tidepool-org/platform/data/types"
"github.com/tidepool-org/platform/data/types/basal"
"github.com/tidepool-org/platform/data/types/blood/glucose/continuous"
"github.com/tidepool-org/platform/data/types/blood/glucose/selfmonitored"
"github.com/tidepool-org/platform/data/types/blood/ketone"
"github.com/tidepool-org/platform/data/types/bolus"
"github.com/tidepool-org/platform/data/types/device"
"github.com/tidepool-org/platform/errors"
migrationMongo "github.com/tidepool-org/platform/migration/mongo"
storeStructuredMongo "github.com/tidepool-org/platform/store/structured/mongo"
)

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
application.RunAndExit(NewMigration(ctx))
}

type Migration struct {
ctx context.Context
*migrationMongo.Migration
dataRepository *storeStructuredMongo.Repository
}

func getValidatedString(bsonData bson.M, fieldName string) (string, error) {
if valRaw, ok := bsonData[fieldName]; !ok {
return "", errors.Newf("%s is missing", fieldName)
} else if val, ok := valRaw.(string); !ok {
return "", errors.Newf("%s is not of expected type", fieldName)
} else if val == "" {
return "", errors.Newf("%s is empty", fieldName)
} else {
return val, nil
}
}

func getValidatedTime(bsonData bson.M, fieldName string) (time.Time, error) {
if valRaw, ok := bsonData[fieldName]; !ok {
return time.Time{}, errors.Newf("%s is missing", fieldName)
} else if val, ok := valRaw.(time.Time); !ok {
return time.Time{}, errors.Newf("%s is not of expected type", fieldName)
} else if val.IsZero() {
return time.Time{}, errors.Newf("%s is empty", fieldName)
} else {
return val, nil
}
}

func createDatumHash(bsonData bson.M) (string, error) {
identityFields := []string{}
if datumUserID, err := getValidatedString(bsonData, "_userId"); err != nil {
return "", err
} else {
identityFields = append(identityFields, datumUserID)
}
if deviceID, err := getValidatedString(bsonData, "deviceId"); err != nil {
return "", err
} else {
identityFields = append(identityFields, deviceID)
}
if datumTime, err := getValidatedTime(bsonData, "time"); err != nil {
return "", err
} else {
identityFields = append(identityFields, datumTime.Format(types.TimeFormat))
}
datumType, err := getValidatedString(bsonData, "type")
if err != nil {
return "", err
}
identityFields = append(identityFields, datumType)

switch datumType {
case basal.Type:
if deliveryType, err := getValidatedString(bsonData, "deliveryType"); err != nil {
return "", err
} else {
identityFields = append(identityFields, deliveryType)
}
case bolus.Type, device.Type:
if subType, err := getValidatedString(bsonData, "subType"); err != nil {
return "", err
} else {
identityFields = append(identityFields, subType)
}
case selfmonitored.Type, ketone.Type, continuous.Type:
units, err := getValidatedString(bsonData, "units")
if err != nil {
return "", err
} else {
identityFields = append(identityFields, units)
}

if valueRaw, ok := bsonData["value"]; !ok {
return "", errors.New("value is missing")
} else if val, ok := valueRaw.(float64); !ok {
return "", errors.New("value is not of expected type")
} else {
if units != glucose.MgdL && units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
// converted value as it is used to calculate the hash
val = getBGValuePlatformPrecision(val)
}
identityFields = append(identityFields, strconv.FormatFloat(val, 'f', -1, 64))
}
}
return deduplicator.GenerateIdentityHash(identityFields)
}

func updateIfExistsPumpSettingsBolus(bsonData bson.M) (interface{}, error) {
dataType, err := getValidatedString(bsonData, "type")
if err != nil {
return nil, err
}
if dataType == "pumpSettings" {
if bolus := bsonData["bolus"]; bolus != nil {
boluses, ok := bolus.(map[string]interface{})
if !ok {
return nil, errors.Newf("pumpSettings.bolus is not the expected type %v", bolus)
}
return boluses, nil
}
}
return nil, nil
}

func getBGValuePlatformPrecision(mmolVal float64) float64 {
if len(fmt.Sprintf("%v", mmolVal)) > 7 {
mgdlVal := mmolVal * glucose.MmolLToMgdLConversionFactor
mgdL := glucose.MgdL
mmolVal = *glucose.NormalizeValueForUnits(&mgdlVal, &mgdL)
}
return mmolVal
}

func NewMigration(ctx context.Context) *Migration {
return &Migration{
ctx: ctx,
Migration: migrationMongo.NewMigration(),
}
}

func (m *Migration) Initialize(provider application.Provider) error {
if err := m.Migration.Initialize(provider); err != nil {
return err
}

m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields"
m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API"
m.CLI().Authors = []cli.Author{
{
Name: "J H BATE",
Email: "[email protected]",
},
}

m.CLI().Action = func(ctx *cli.Context) error {
if !m.ParseContext(ctx) {
return nil
}
return m.execute()
}

return nil
}

func (m *Migration) execute() error {
m.Logger().Debug("Migrate jellyfish API data")
m.Logger().Debug("Creating data store")

mongoConfig := m.NewMongoConfig()
mongoConfig.Database = "data"
mongoConfig.Timeout = 60 * time.Minute
dataStore, err := storeStructuredMongo.NewStore(mongoConfig)
if err != nil {
return errors.Wrap(err, "unable to create data store")
}
defer dataStore.Terminate(m.ctx)

m.Logger().Debug("Creating data repository")
m.dataRepository = dataStore.GetRepository("deviceData")
m.Logger().Info("Migration of jellyfish documents has begun")
hashUpdatedCount, errorCount := m.migrateJellyfishDocuments()
m.Logger().Infof("Migrated %d jellyfish documents", hashUpdatedCount)
m.Logger().Infof("%d errors occurred", errorCount)

return nil
}

func (m *Migration) migrateJellyfishDocuments() (int, int) {
logger := m.Logger()
logger.Debug("Finding jellyfish data")
var hashUpdatedCount, errorCount int
selector := bson.M{
// jellyfish uses a generated _id that is not an mongo objectId
"_id": bson.M{"$not": bson.M{"$type": "objectId"}},
"_deduplicator": bson.M{"$exists": false},
}

var jellyfishResult bson.M
jellyfishDocCursor, err := m.dataRepository.Find(m.ctx, selector)
if err != nil {
logger.WithError(err).Error("Unable to find jellyfish data")
errorCount++
return hashUpdatedCount, errorCount
}
defer jellyfishDocCursor.Close(m.ctx)
for jellyfishDocCursor.Next(m.ctx) {
err = jellyfishDocCursor.Decode(&jellyfishResult)
if err != nil {
logger.WithError(err).Error("Could not decode mongo doc")
errorCount++
continue
}
if !m.DryRun() {
if updated, err := m.migrateDocument(jellyfishResult); err != nil {
logger.WithError(err).Errorf("Unable to migrate jellyfish document %s.", jellyfishResult["_id"])
errorCount++
continue
} else if updated {
hashUpdatedCount++
}
}
}
if err := jellyfishDocCursor.Err(); err != nil {
logger.WithError(err).Error("Error while fetching data. Please re-run to complete the migration.")
errorCount++
}
return hashUpdatedCount, errorCount
}

func (m *Migration) migrateDocument(jfDatum bson.M) (bool, error) {

datumID, err := getValidatedString(jfDatum, "_id")
if err != nil {
return false, err
}

updates := bson.M{}
hash, err := createDatumHash(jfDatum)
if err != nil {
return false, err
}

updates["_deduplicator"] = bson.M{"hash": hash}

if boluses, err := updateIfExistsPumpSettingsBolus(jfDatum); err != nil {
return false, err
} else if boluses != nil {
updates["pumpSettings"] = bson.M{"boluses": boluses}
}

result, err := m.dataRepository.UpdateOne(m.ctx, bson.M{
"_id": datumID,
"modifiedTime": jfDatum["modifiedTime"],
}, bson.M{
"$set": updates,
})

if err != nil {
return false, err
}

return result.ModifiedCount == 1, nil
}
110 changes: 110 additions & 0 deletions migrations/back_37/back_37_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"reflect"
"testing"

"go.mongodb.org/mongo-driver/bson"

pumpTest "github.com/tidepool-org/platform/data/types/settings/pump/test"
)

func Test_getBGValuePlatformPrecision(t *testing.T) {

tests := []struct {
name string
mmolJellyfishVal float64
mmolPlatformVal float64
}{
{
name: "original mmol/L value",
mmolJellyfishVal: 10.1,
mmolPlatformVal: 10.1,
},
{
name: "converted mgd/L of 100",
mmolJellyfishVal: 5.550747991045533,
mmolPlatformVal: 5.55075,
},
{
name: "converted mgd/L of 150.0",
mmolJellyfishVal: 8.3261219865683,
mmolPlatformVal: 8.32612,
},
{
name: "converted mgd/L of 65.0",
mmolJellyfishVal: 3.6079861941795968,
mmolPlatformVal: 3.60799,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getBGValuePlatformPrecision(tt.mmolJellyfishVal); got != tt.mmolPlatformVal {
t.Errorf("getBGValuePlatformPrecision() mmolJellyfishVal = %v, want %v", got, tt.mmolPlatformVal)
}
})
}
}

func Test_updateIfExistsPumpSettingsBolus(t *testing.T) {
type args struct {
bsonData bson.M
}

bolusData := map[string]interface{}{
"bolous-1": pumpTest.NewBolus(),
"bolous-2": pumpTest.NewBolus(),
}

tests := []struct {
name string
args args
want interface{}
wantErr bool
}{
{
name: "when not pumpSettings",
args: args{
bsonData: bson.M{"type": "other"},
},
want: nil,
wantErr: false,
},
{
name: "pumpSettings but no bolus",
args: args{
bsonData: bson.M{"type": "pumpSettings"},
},
want: nil,
wantErr: false,
},
{
name: "pumpSettings bolus wrong type",
args: args{
bsonData: bson.M{"type": "pumpSettings", "bolus": "wrong"},
},
want: nil,
wantErr: true,
},
{
name: "pumpSettings bolus valid type",
args: args{
bsonData: bson.M{"type": "pumpSettings", "bolus": bolusData},
},
want: bolusData,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := updateIfExistsPumpSettingsBolus(tt.args.bsonData)
if (err != nil) != tt.wantErr {
t.Errorf("updateIfExistsPumpSettingsBolus() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("updateIfExistsPumpSettingsBolus() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit e8e07ff

Please sign in to comment.