From 1aa2d91574f7bd77f1e625e5af7f0f0b5b265d43 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 12 Mar 2024 22:44:30 +0800 Subject: [PATCH] [fix](audit-loader) fix invalid token check logic (#32095) The check of the token should be forwarded to Master FE. I add a new RPC method `checkToken()` in Frontend for this logic. Otherwise, after enable the audit loader, the log from non-master FE can not be loaded to audit table with `Invalid token` error. --- .../apache/doris/httpv2/rest/LoadAction.java | 7 ++- .../doris/load/loadv2/TokenManager.java | 61 ++++++++++++++++--- .../doris/service/FrontendServiceImpl.java | 28 ++++++--- gensrc/thrift/FrontendService.thrift | 2 + 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 6952bd37b5c91b..6be5654a2ea112 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -362,7 +363,11 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario private boolean checkClusterToken(String token) { - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + try { + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + } catch (UserException e) { + throw new UnauthorizedException(e.getMessage()); + } } // NOTE: This function can only be used for AuditlogPlugin stream load for now. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java index 6443e6b2322687..ca714d66b29d29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java @@ -63,11 +63,6 @@ private String generateNewToken() { return UUID.randomUUID().toString(); } - // this method only will be called in master node, since stream load only send message to master. - public boolean checkAuthToken(String token) { - return tokenQueue.contains(token); - } - public String acquireToken() throws UserException { if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) { return tokenQueue.peek(); @@ -81,9 +76,8 @@ public String acquireToken() throws UserException { } } - public String acquireTokenFromMaster() throws TException { + private String acquireTokenFromMaster() throws TException { TNetworkAddress thriftAddress = getMasterAddress(); - FrontendService.Client client = getClient(thriftAddress); if (LOG.isDebugEnabled()) { @@ -108,7 +102,7 @@ public String acquireTokenFromMaster() throws TException { } else { TMySqlLoadAcquireTokenResult result = client.acquireToken(); if (result.getStatus().getStatusCode() != TStatusCode.OK) { - throw new TException("commit failed."); + throw new TException("acquire token from master failed. " + result.getStatus()); } isReturnToPool = true; return result.getToken(); @@ -122,6 +116,57 @@ public String acquireTokenFromMaster() throws TException { } } + /** + * Check if the token is valid. + * If this is not Master FE, will send the request to Master FE. + */ + public boolean checkAuthToken(String token) throws UserException { + if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) { + return tokenQueue.contains(token); + } else { + try { + return checkTokenFromMaster(token); + } catch (TException e) { + LOG.warn("check token error", e); + throw new UserException("Check token from master failed", e); + } + } + } + + private boolean checkTokenFromMaster(String token) throws TException { + TNetworkAddress thriftAddress = getMasterAddress(); + FrontendService.Client client = getClient(thriftAddress); + + if (LOG.isDebugEnabled()) { + LOG.debug("Send check token to Master {}", thriftAddress); + } + + boolean isReturnToPool = false; + try { + boolean result = client.checkToken(token); + isReturnToPool = true; + return result; + } catch (TTransportException e) { + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw e; + } + if (e.getType() == TTransportException.TIMED_OUT) { + throw e; + } else { + boolean result = client.checkToken(token); + isReturnToPool = true; + return result; + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + private TNetworkAddress getMasterAddress() throws TException { Env.getCurrentEnv().checkReadyOrThrowTException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index eb62cd9c75a25e..eface922ef97eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1062,12 +1062,6 @@ private void checkPasswordAndPrivs(String user, String passwd, String db, List