Skip to content

Commit

Permalink
timing for processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Jan 8, 2024
1 parent b1466b4 commit 97cfc0c
Showing 1 changed file with 10 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,20 @@ func (m *Migration) prepare() error {
}

func (m *Migration) execute() error {
log.Printf("configured read batch size %d nop percent %d", m.config.readBatchSize, m.config.nopPercent)
totalMigrated := 0
migrateStart := time.Now()
for m.fetchAndUpdateBatch() {
writeStart := time.Now()

updatedCount, err := m.writeBatchUpdates()
if err != nil {
log.Printf("failed writing batch: %s", err)
return err
}
log.Printf("4. data write took [%s] for [%d] items", time.Since(writeStart), updatedCount)
totalMigrated = totalMigrated + updatedCount
log.Printf("migrated %d for a total of %d migrated items", updatedCount, totalMigrated)
}
log.Printf("migration took [%s] for [%d] items ", time.Since(migrateStart), totalMigrated)
return nil
}

Expand Down Expand Up @@ -360,8 +362,6 @@ func (m *Migration) blockUntilDBReady() error {
}

func (m *Migration) fetchAndUpdateBatch() bool {
fetchAndUpdateStart := time.Now()

selector := bson.M{
"_deduplicator": bson.M{"$exists": false},
// testing based on _userId for [email protected]
Expand Down Expand Up @@ -393,12 +393,14 @@ func (m *Migration) fetchAndUpdateBatch() bool {
BatchSize: &size,
},
)

if err != nil {
log.Printf("failed to select data: %s", err)
return false
}

log.Printf("1. data fetch took [%s]", time.Since(fetchStart))

decodeStart := time.Now()
err = dDataCursor.All(m.ctx, &dataSet)

if err != nil {
Expand All @@ -408,16 +410,9 @@ func (m *Migration) fetchAndUpdateBatch() bool {

defer dDataCursor.Close(m.ctx)

log.Printf("fetch took %s for %d items", time.Since(fetchStart), len(dataSet))
log.Printf("2. data decode took [%s] for [%d] items", time.Since(decodeStart), len(dataSet))
updateStart := time.Now()
for _, item := range dataSet {

//for dDataCursor.Next(m.ctx) {
// var dDataResult bson.M
// if err = dDataCursor.Decode(&dDataResult); err != nil {
// log.Printf("failed decoding data: %s", err)
// return false
// }
datumID, datumUpdates, err := utils.GetDatumUpdates(item)
if err != nil {
m.onError(err, datumID, "failed getting updates")
Expand All @@ -429,10 +424,8 @@ func (m *Migration) fetchAndUpdateBatch() bool {
m.updates = append(m.updates, updateOp)
m.lastUpdatedId = datumID
}
//}

log.Printf("batch iteration took %s", time.Since(updateStart))
log.Printf("fetch and update took %s", time.Since(fetchAndUpdateStart))
log.Printf("3. data update took [%s] for [%d] items", time.Since(updateStart), len(m.updates))
return len(m.updates) > 0
}
return false
Expand Down Expand Up @@ -481,6 +474,6 @@ func (m *Migration) writeBatchUpdates() (int, error) {
updateCount += int(results.ModifiedCount)
}
}
log.Printf("update took %s", time.Since(start))
log.Printf("mongo bulk write took %s", time.Since(start))
return updateCount, nil
}

0 comments on commit 97cfc0c

Please sign in to comment.