Skip to content

Commit

Permalink
Merge pull request #133 from lbbniu/tarsAdminRegistry
Browse files Browse the repository at this point in the history
fix: 多副本执行addTaskReq操作
  • Loading branch information
ruanshudong authored Feb 28, 2023
2 parents 1bf348c + c728377 commit 23321fe
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 112 deletions.
15 changes: 12 additions & 3 deletions AdminRegistryServer/AdminRegistryImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ int AdminRegistryImp::restartServer(const string & application, const string & s

bool isDnsServer = false;
int iRet = EM_TARS_SUCCESS;
CurrentPtr nodeCurrent = NodeManager::getInstance()->getNodeCurrent(nodeName);
try
{
if(application == ServerConfig::Application && serverName == ServerConfig::ServerName)
Expand Down Expand Up @@ -629,10 +630,14 @@ int AdminRegistryImp::restartServer(const string & application, const string & s
{
isDnsServer = true;
}
else
else if (nodeCurrent)
{
iRet = NodeManager::getInstance()->stopServer(application, serverName, nodeName, result, current);
iRet = NodeManager::getInstance()->stopServer(application, serverName, nodeName, result, NULL);
}
else
{
iRet = NodeManager::getInstance()->stopServer(application, serverName, nodeName, result, current);
}
TLOG_DEBUG("call node restartServer, stop|" << application << "." << serverName << "_" << nodeName << "|"
<< current->getHostName() << ":" << current->getPort() << endl);
if (iRet != EM_TARS_SUCCESS)
Expand Down Expand Up @@ -664,9 +669,13 @@ int AdminRegistryImp::restartServer(const string & application, const string & s
TLOG_DEBUG( "|" << " '" + application + "." + serverName + "_" + nodeName + "' is tars_dns server" << endl);
return DBPROXY->updateServerState(application, serverName, nodeName, "present_state", tars::Active);
}
else if (nodeCurrent)
{
return NodeManager::getInstance()->startServer(application, serverName, nodeName, result, NULL);
}
else
{
return NodeManager::getInstance()->startServer(application, serverName, nodeName, result, current);
return NodeManager::getInstance()->startServer(application, serverName, nodeName, result, current);
}
}
catch(TarsSyncCallTimeoutException& ex)
Expand Down
1 change: 1 addition & 0 deletions AdminRegistryServer/AdminRegistryImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef __AMIN_REGISTRY_H__
#define __AMIN_REGISTRY_H__

#include "util/tc_common.h"
#include "AdminReg.h"
#include "Registry.h"
#include "Patch.h"
Expand Down
124 changes: 16 additions & 108 deletions AdminRegistryServer/ExecuteTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

#include "ExecuteTask.h"
#include "servant/Communicator.h"
#include "servant/Application.h"
#include "servant/RemoteLogger.h"
#include "util/tc_timeprovider.h"
#include <thread>
Expand All @@ -24,8 +24,8 @@ extern TC_Config * g_pconf;
TaskList::TaskList(const TaskReq &taskReq, unsigned int t)
: _taskReq(taskReq)
{
//_adminPrx = Communicator::getInstance()->stringToProxy<AdminRegPrx>(g_pconf->get("/tars/objname<AdminRegObjName>", ""));
_adminPrx = ExecuteTask::getInstance()->getAdminImp();
_adminPrx = Application::getCommunicator()->stringToProxy<AdminRegPrx>(g_pconf->get("/tars/objname<AdminRegObjName>", "tars.tarsAdminRegistry.AdminRegObj"));
_adminImp = ExecuteTask::getInstance()->getAdminImp();

_taskRsp.taskNo = _taskReq.taskNo;
_taskRsp.serial = _taskReq.serial;
Expand Down Expand Up @@ -122,7 +122,7 @@ void TaskList::setRspInfo(size_t index, bool start, EMTaskItemStatus status)

try
{
_adminPrx->setTaskItemInfo_inner(rsp.req.itemNo, info);
_adminImp->setTaskItemInfo_inner(rsp.req.itemNo, info);
}
catch (exception &ex)
{
Expand Down Expand Up @@ -153,7 +153,7 @@ void TaskList::setRspLog(size_t index, const string &log)

try
{
_adminPrx->setTaskItemInfo_inner(rsp.req.itemNo, info);
_adminImp->setTaskItemInfo_inner(rsp.req.itemNo, info);
}
catch (exception &ex)
{
Expand Down Expand Up @@ -211,7 +211,7 @@ int TaskList::prepareFile()

string result;

int ret = _adminPrx->prepareInfo_inner(pi, result);
int ret = _adminImp->prepareInfo_inner(pi, result);
if (ret != 0)
{
TLOG_ERROR("preparePatch_inner error:" << result << endl);
Expand All @@ -231,7 +231,7 @@ int TaskList::prepareFile()

TLOG_DEBUG("preparePatch_inner prepare :" << e.second.application << "." << e.second.serverName << ", patchId:" << e.second.patchId << ", file:" << e.second.patchFile << ", docker image:" << e.second.baseImage << endl);

int ret = this->_adminPrx->preparePatch_inner(e.second, result);
int ret = _adminImp->preparePatch_inner(e.second, result);
if(ret != 0)
{
TLOG_ERROR("preparePatch_inner error:" << result << endl);
Expand All @@ -248,7 +248,7 @@ EMTaskItemStatus TaskList::start(const TaskItemReq &req, string &log)
int ret = -1;
try
{
ret = _adminPrx->startServer_inner(req.application, req.serverName, req.nodeName, log);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->startServer(req.application, req.serverName, req.nodeName, log);
if (ret == 0)
return EM_I_SUCCESS;
}
Expand All @@ -268,7 +268,7 @@ EMTaskItemStatus TaskList::restart(const TaskItemReq &req, string &log)
int ret = -1;
try
{
ret = _adminPrx->restartServer_inner(req.application, req.serverName, req.nodeName, log);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->restartServer(req.application, req.serverName, req.nodeName, log);
if (ret == 0)
{
return EM_I_SUCCESS;
Expand Down Expand Up @@ -297,7 +297,7 @@ EMTaskItemStatus TaskList::graceRestart(const TaskItemReq &req, string &log)
TLOG_ERROR("TaskList::graceRestartServer no servers" << endl);
return EM_I_FAILED;
}
ret = _adminPrx->notifyServer_inner(req.application, req.serverName, req.nodeName, "tars.gracerestart",log);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->notifyServer(req.application, req.serverName, req.nodeName, "tars.gracerestart",log);
if (ret == 0) {
return EM_I_SUCCESS;
}
Expand All @@ -317,7 +317,7 @@ EMTaskItemStatus TaskList::undeploy(const TaskItemReq &req, string &log)
int ret = -1;
try
{
ret = _adminPrx->undeploy_inner(req.application, req.serverName, req.nodeName, req.userName, log);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->undeploy(req.application, req.serverName, req.nodeName, req.userName, log);
if (ret == 0)
return EM_I_SUCCESS;
}
Expand All @@ -337,7 +337,7 @@ EMTaskItemStatus TaskList::stop(const TaskItemReq &req, string &log)
int ret = -1;
try
{
ret = _adminPrx->stopServer_inner(req.application, req.serverName, req.nodeName, log);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->stopServer(req.application, req.serverName, req.nodeName, log);
if (ret == 0)
return EM_I_SUCCESS;
}
Expand Down Expand Up @@ -387,7 +387,7 @@ EMTaskItemStatus TaskList::patch(size_t index, const TaskItemReq &req, string &l
//外部是串行处理的
try
{
ret = _adminPrx->batchPatch_inner(patchReq, log);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->batchPatch(patchReq, log);
}
catch (exception &ex)
{
Expand Down Expand Up @@ -416,7 +416,7 @@ EMTaskItemStatus TaskList::patch(size_t index, const TaskItemReq &req, string &l

try
{
ret = _adminPrx->getPatchPercent_inner(req.application, req.serverName, req.nodeName, pi);
ret = _adminPrx->tars_hash(tars::hash<string>()(req.nodeName))->getPatchPercent(req.application, req.serverName, req.nodeName, pi);
}
catch (exception &ex)
{
Expand All @@ -430,14 +430,14 @@ EMTaskItemStatus TaskList::patch(size_t index, const TaskItemReq &req, string &l
if (ret != 0)
{
log = pi.sResult;
_adminPrx->updatePatchLog_inner(req.application, req.serverName, req.nodeName, patchId, req.userName, patchType, false);
_adminImp->updatePatchLog_inner(req.application, req.serverName, req.nodeName, patchId, req.userName, patchType, false);
TLOG_ERROR("getPatchPercent error, ret:" << ret << ", " << req.application << "." << req.serverName << "_" << req.nodeName << ", percent:" << pi.iPercent << ", log:" << pi.sResult << endl);
return EM_I_FAILED;
}

if(pi.iPercent == 100 && pi.bSucc)
{
_adminPrx->updatePatchLog_inner(req.application, req.serverName, req.nodeName, patchId, req.userName, patchType, true);
_adminImp->updatePatchLog_inner(req.application, req.serverName, req.nodeName, patchId, req.userName, patchType, true);
TLOG_DEBUG("getPatchPercent ok, percent:" << pi.iPercent << "%" << endl);
retStatus = EM_I_SUCCESS;
break;
Expand All @@ -460,98 +460,6 @@ EMTaskItemStatus TaskList::patch(size_t index, const TaskItemReq &req, string &l
}
return EM_I_SUCCESS;
}
//
//EMTaskItemStatus TaskList::patch(const TaskItemReq &req, string &log,std::size_t index)
//{
// try
// {
// int ret = EM_TARS_UNKNOWN_ERR;
//
// TLOG_DEBUG("TaskList::patch:" << TC_Common::tostr(req.parameters.begin(), req.parameters.end())
// << req.application << "." << req.serverName << ", " << req.nodeName << endl);
//
// string patchId = get("patch_id", req.parameters);
// string patchType = get("patch_type", req.parameters);
// string runType = get("run_type",req.parameters);
//
// tars::PatchRequest patchReq;
// patchReq.appname = req.application;
// patchReq.servername = req.serverName;
// patchReq.nodename = req.nodeName;
// patchReq.version = patchId;
// patchReq.user = req.userName;
// patchReq.md5 = get("run_type",req.parameters);
//
// //外部是并行处理的!
// try {
//// ret = _adminPrx->preparePatch_inner(patchReq, log, index != 0, taskItemSharedState);
//// if (ret != 0) {
//// log = "tarsAdminRegistry batchPatch err:" + log;
//// TLOG_ERROR("TaskList::patch batchPatch error:" << log << endl);
//// return EM_I_FAILED;
//// }
// ret = _adminPrx->batchPatch_inner(patchReq, runType=="container", log);
// }
// catch (exception &ex)
// {
// log = ex.what();
// TLOG_ERROR("TaskList::patch batchPatch error:" << log << endl);
// return EM_I_FAILED;
// }
//
// if (ret != EM_TARS_SUCCESS)
// {
// log = "tarsAdminRegistry batchPatch err:" + log;
// return EM_I_FAILED;
// }
//
// // 这里做个超时保护, 否则如果tarsnode状态错误, 这里一直循环调用
// time_t tNow = TNOW;
// unsigned int patchTimeout = TC_Common::strto<unsigned int>(g_pconf->get("/tars/patch/<patch_wait_timeout>", "300"));
//
// EMTaskItemStatus retStatus = EM_I_FAILED;
// while (TNOW < tNow + patchTimeout)
// {
// PatchInfo pi;
//
// try
// {
// ret = _adminPrx->getPatchPercent_inner(req.application, req.serverName, req.nodeName, pi);
// }
// catch (exception &ex)
// {
// log = ex.what();
// TLOG_ERROR("TaskList::patch getPatchPercent error, ret:" << ret << endl);
// }
//
// if (ret != 0)
// {
// _adminPrx->updatePatchLog_inner(req.application, req.serverName, req.nodeName, patchId, req.userName, patchType, false);
// TLOG_ERROR("TaskList::patch getPatchPercent error, ret:" << ret << endl);
// return EM_I_FAILED;
// }
//
// if(pi.iPercent == 100 && pi.bSucc)
// {
// _adminPrx->updatePatchLog_inner(req.application, req.serverName, req.nodeName, patchId, req.userName, patchType, true);
// TLOG_DEBUG("TaskList::patch getPatchPercent ok, percent:" << pi.iPercent << "%" << endl);
// retStatus = EM_I_SUCCESS;
// break;
// }
//
// TLOG_DEBUG("TaskList::patch getPatchPercent percent:" << pi.iPercent << "%, succ:" << pi.bSucc << endl);
//
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// return retStatus;
// }
// catch (exception &ex)
// {
// TLOG_ERROR("TaskList::patch error:" << ex.what() << endl);
// return EM_I_FAILED;
// }
// return EM_I_FAILED;
//}

EMTaskItemStatus TaskList::executeSingleTask(size_t index, const TaskItemReq &req)
{
Expand Down
3 changes: 2 additions & 1 deletion AdminRegistryServer/ExecuteTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class TaskList : public TC_ThreadMutex
//返回任务
TaskRsp _taskRsp;

AdminRegistryImp* _adminPrx;
AdminRegPrx _adminPrx;
AdminRegistryImp* _adminImp;
time_t _createTime;
unsigned int _timeout;
bool _finished;
Expand Down

0 comments on commit 23321fe

Please sign in to comment.