Skip to content

Commit

Permalink
[fix](conn-pool) Avoid some invalid connections return to pool
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Oct 9, 2024
1 parent 76f2bb5 commit 8a78f04
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) thr
TNetworkAddress address = null;
Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
Backend destBackend = cloudSystemInfoService.getBackend(destBe);
boolean ok = true;
try {
address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
Expand All @@ -743,17 +744,22 @@ private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) thr
}
} catch (Exception e) {
LOG.warn("send pre heating rpc error. backend[{}]", destBackend.getId(), e);
ClientPool.backendPool.invalidateObject(address, client);
ok = false;
throw e;
} finally {
ClientPool.backendPool.returnObject(address, client);
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}

private Map<Long, Boolean> sendCheckWarmUpCacheAsyncRpc(List<Long> tabletIds, long be) {
BackendService.Client client = null;
TNetworkAddress address = null;
Backend destBackend = cloudSystemInfoService.getBackend(be);
boolean ok = true;
try {
address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
Expand All @@ -770,9 +776,13 @@ private Map<Long, Boolean> sendCheckWarmUpCacheAsyncRpc(List<Long> tabletIds, lo
return result.getTaskDone();
} catch (Exception e) {
LOG.warn("send check pre cache rpc error. backend[{}]", destBackend.getId(), e);
ClientPool.backendPool.invalidateObject(address, client);
ok = false;
} finally {
ClientPool.backendPool.returnObject(address, client);
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,24 @@ private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
client = ClientPool.backendPool.borrowObject(targetBackend);
} catch (Exception e) {
LOG.warn("Fetch a agent client failed, address: {}", targetBackend.toString());
ClientPool.backendPool.invalidateObject(targetBackend, client);
return resp;
}

boolean ok = true;
try {
TGetRealtimeExecStatusRequest req = new TGetRealtimeExecStatusRequest();
req.setId(queryID);
resp = client.getRealtimeExecStatus(req);
} catch (TException e) {
LOG.warn("Got exception when getRealtimeExecStatus, query {} backend {}",
DebugUtil.printId(queryID), targetBackend.toString(), e);
ClientPool.backendPool.invalidateObject(targetBackend, client);
ok = false;
} finally {
ClientPool.backendPool.returnObject(targetBackend, client);
if (ok) {
ClientPool.backendPool.returnObject(targetBackend, client);
} else {
ClientPool.backendPool.invalidateObject(targetBackend, client);
}
}

if (!resp.isSetStatus()) {
Expand Down

0 comments on commit 8a78f04

Please sign in to comment.