diff --git a/script/mongodb.js b/script/mongodb.js index 99e3f19..45470ed 100644 --- a/script/mongodb.js +++ b/script/mongodb.js @@ -58,7 +58,7 @@ db.proposal.createIndex({"proposal_id": 1}, {"unique": true}); db.tx_msg.createIndex({"hash": 1}, {"unique": true}); // init data -db.sync_conf.insert({"block_num_per_worker_handle": 100, "max_worker_sleep_time": 120}); +db.sync_conf.insert({"block_num_per_worker_handle": 50, "max_worker_sleep_time": 120}); // drop collection // db.account.drop(); diff --git a/service/task/task_execute.go b/service/task/task_execute.go index a047a9a..0778fc5 100644 --- a/service/task/task_execute.go +++ b/service/task/task_execute.go @@ -23,7 +23,7 @@ func StartExecuteTask() { blockNumPerWorkerHandle int64 maxWorkerSleepTime int64 ) - log := logger.GetLogger("StartCreateTask") + log := logger.GetLogger("TaskExecutor") // get sync conf syncConf, err := syncConfModel.GetConf() @@ -56,13 +56,14 @@ func executeTask(blockNumPerWorkerHandle, maxWorkerSleepTime int64, chanLimit ch workerId, taskType string blockChainLatestHeight int64 ) - log := logger.GetLogger("StartCreateTask") + log := logger.GetLogger("TaskExecutor") genWorkerId := func() string { // generate worker id use hostname@xxx hostname, _ := os.Hostname() return fmt.Sprintf("%v@%v", hostname, bson.NewObjectId().Hex()) } + healthCheckQuit := make(chan bool) workerId = genWorkerId() client := helper.GetClient() @@ -70,6 +71,7 @@ func executeTask(blockNumPerWorkerHandle, maxWorkerSleepTime int64, chanLimit ch if r := recover(); r != nil { log.Error("execute task fail", logger.Any("err", r)) } + close(healthCheckQuit) <-chanLimit client.Release() }() @@ -92,7 +94,6 @@ func executeTask(blockNumPerWorkerHandle, maxWorkerSleepTime int64, chanLimit ch err = syncTaskModel.TakeOverTask(task, workerId) if err != nil { if err == mgo.ErrNotFound { - // this task has been take over by other goroutine log.Info("Task has been take over by other goroutine") } else { log.Error("Take over task fail", logger.String("err", err.Error())) @@ -109,7 +110,7 @@ func executeTask(blockNumPerWorkerHandle, maxWorkerSleepTime int64, chanLimit ch taskType = document.SyncTaskTypeFollow } log.Info("worker begin execute task", - logger.String("cur_worker", workerId), logger.Any("task_id", task.ID), + logger.String("cur_worker", workerId), logger.String("task_id", task.ID.Hex()), logger.String("from-to", fmt.Sprintf("%v-%v", task.StartHeight, task.EndHeight))) // worker health check, if worker is alive, then update last update time every minute. @@ -124,33 +125,43 @@ func executeTask(blockNumPerWorkerHandle, maxWorkerSleepTime int64, chanLimit ch }() for { - task, err := syncTaskModel.GetTaskByIdAndWorker(taskId, workerId) - if err == nil { - blockChainLatestHeight, err := getBlockChainLatestHeight() + select { + case <-healthCheckQuit: + logger.Info("get health check quit signal, now exit health check") + return + default: + task, err := syncTaskModel.GetTaskByIdAndWorker(taskId, workerId) if err == nil { - if assertTaskValid(task, blockNumPerWorkerHandle, blockChainLatestHeight) { - // update task last update time - if err := syncTaskModel.UpdateLastUpdateTime(task); err != nil { - log.Error("update last update time fail", logger.String("err", err.Error())) + blockChainLatestHeight, err := getBlockChainLatestHeight() + if err == nil { + if assertTaskValid(task, blockNumPerWorkerHandle, blockChainLatestHeight) { + // update task last update time + if err := syncTaskModel.UpdateLastUpdateTime(task); err != nil { + log.Error("update last update time fail", logger.String("err", err.Error()), + logger.String("task_id", task.ID.Hex())) + } + logger.Info("health check success, now sleep one minute", + logger.String("task_id", task.ID.Hex()), + logger.String("task_current_worker", task.WorkerId)) + } else { + log.Info("task is invalid, exit health check", logger.String("task_id", taskId.Hex())) + break } } else { - log.Info("task is invalid, exit health check", logger.String("task_id", taskId.Hex())) - break + log.Error("get block chain latest height fail", logger.String("err", err.Error())) } } else { - log.Error("get block chain latest height fail", logger.String("err", err.Error())) - } - } else { - if err == mgo.ErrNotFound { - log.Info("task may be task over by other goroutine, exit health check", - logger.String("task_id", taskId.Hex()), logger.String("current_worker", workerId)) - break - } else { - log.Error("get task by id and worker fail", logger.String("task_id", taskId.Hex()), - logger.String("current_worker", workerId)) + if err == mgo.ErrNotFound { + log.Info("task may be task over by other goroutine, exit health check", + logger.String("task_id", taskId.Hex()), logger.String("current_worker", workerId)) + break + } else { + log.Error("get task by id and worker fail", logger.String("task_id", taskId.Hex()), + logger.String("current_worker", workerId)) + } } + time.Sleep(1 * time.Minute) } - time.Sleep(1 * time.Minute) } } go workerHealthCheck(task.ID, workerId)