Skip to content

Commit

Permalink
[Fix](clean trash) Fix clean trash use agent task (#33912) (#33953)
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Apr 22, 2024
1 parent 98e90dd commit b34000b
Show file tree
Hide file tree
Showing 14 changed files with 759 additions and 39 deletions.
3 changes: 3 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ void AgentServer::start_workers(ExecEnv* exec_env) {

_report_tablet_workers = std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLE", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); });

_clean_trash_binlog_workers = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });
// clang-format on
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include "runtime/exec_env.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/random.h"
Expand Down Expand Up @@ -1764,4 +1765,12 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
LOG(INFO) << "clean trash start";
DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
static_cast<void>(engine.start_trash_sweep(nullptr, true));
static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
LOG(INFO) << "clean trash finish";
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ

void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);
Expand Down
5 changes: 0 additions & 5 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -843,11 +843,6 @@ void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
}
}

void BackendService::clean_trash() {
static_cast<void>(StorageEngine::instance()->start_trash_sweep(nullptr, true));
static_cast<void>(StorageEngine::instance()->notify_listener("REPORT_DISK_STATE"));
}

void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
StorageEngine::instance()->tablet_manager()->get_all_tablets_storage_format(&result);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ class BackendService : public BackendServiceIf {
void get_stream_load_record(TStreamLoadRecordResult& result,
const int64_t last_stream_record_time) override;

void clean_trash() override;

void check_storage_format(TCheckStorageFormatResult& result) override;

void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
Expand Down
25 changes: 6 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.clone.TabletSchedulerStat;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigException;
Expand Down Expand Up @@ -254,11 +253,11 @@
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.CleanTrashTask;
import org.apache.doris.task.CompactionTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TGetMetaDBMeta;
Expand Down Expand Up @@ -5803,25 +5802,13 @@ public void onErasePartition(Partition partition) {

public void cleanTrash(AdminCleanTrashStmt stmt) {
List<Backend> backends = stmt.getBackends();
AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backends) {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
try {
address = new TNetworkAddress(backend.getHost(), backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
client.cleanTrash(); // async
ok = true;
} catch (Exception e) {
LOG.warn("trash clean exec error. backend[{}]", backend.getId(), e);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
CleanTrashTask cleanTrashTask = new CleanTrashTask(backend.getId());
batchTask.addTask(cleanTrashTask);
LOG.info("clean trash in be {}, beId {}", backend.getHost(), backend.getId());
}
AgentTaskExecutor.submit(batchTask);
}

public void setPartitionVersion(AdminSetPartitionVersionStmt stmt) throws DdlException {
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.thrift.TAlterInvertedIndexReq;
import org.apache.doris.thrift.TAlterTabletReqV2;
import org.apache.doris.thrift.TCheckConsistencyReq;
import org.apache.doris.thrift.TCleanTrashReq;
import org.apache.doris.thrift.TClearAlterTaskRequest;
import org.apache.doris.thrift.TClearTransactionTaskRequest;
import org.apache.doris.thrift.TCloneReq;
Expand Down Expand Up @@ -392,6 +393,15 @@ private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
tAgentTaskRequest.setGcBinlogReq(request);
return tAgentTaskRequest;
}
case CLEAN_TRASH: {
CleanTrashTask cleanTrashTask = (CleanTrashTask) task;
TCleanTrashReq request = cleanTrashTask.toThrift();
if (LOG.isDebugEnabled()) {
LOG.debug(request.toString());
}
tAgentTaskRequest.setCleanTrashReq(request);
return tAgentTaskRequest;
}
default:
if (LOG.isDebugEnabled()) {
LOG.debug("could not find task type for task [{}]", task);
Expand Down
37 changes: 37 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/CleanTrashTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.task;

import org.apache.doris.thrift.TCleanTrashReq;
import org.apache.doris.thrift.TTaskType;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class CleanTrashTask extends AgentTask {
private static final Logger LOG = LogManager.getLogger(CleanTrashTask.class);

public CleanTrashTask(long backendId) {
super(null, backendId, TTaskType.CLEAN_TRASH, -1, -1, -1, -1, -1, -1, -1);
}

public TCleanTrashReq toThrift() {
return new TCleanTrashReq();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ public TStreamLoadRecordResult getStreamLoadRecord(long lastStreamRecordTime) th
return null;
}

@Override
public void cleanTrash() throws TException {
// TODO Auto-generated method stub
}

@Override
public TCheckStorageFormatResult checkStorageFormat() throws TException {
return new TCheckStorageFormatResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,6 @@ public TStreamLoadRecordResult getStreamLoadRecord(long lastStreamRecordTime) th
return new TStreamLoadRecordResult(Maps.newHashMap());
}

@Override
public void cleanTrash() throws TException {
return;
}

@Override
public TCheckStorageFormatResult checkStorageFormat() throws TException {
return new TCheckStorageFormatResult();
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ struct TPushStoragePolicyReq {
3: optional list<i64> dropped_storage_policy
}

struct TCleanTrashReq {}

enum TCompressionType {
UNKNOWN_COMPRESSION = 0,
DEFAULT_COMPRESSION = 1,
Expand Down Expand Up @@ -492,6 +494,7 @@ struct TAgentTaskRequest {
31: optional TPushStoragePolicyReq push_storage_policy_req
32: optional TAlterInvertedIndexReq alter_inverted_index_req
33: optional TGcBinlogReq gc_binlog_req
34: optional TCleanTrashReq clean_trash_req
}

struct TAgentResult {
Expand Down
2 changes: 0 additions & 2 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,6 @@ service BackendService {

TStreamLoadRecordResult get_stream_load_record(1: i64 last_stream_record_time);

oneway void clean_trash();

// check tablet rowset type
TCheckStorageFormatResult check_storage_format();

Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ enum TTaskType {
PUSH_COOLDOWN_CONF,
PUSH_STORAGE_POLICY,
ALTER_INVERTED_INDEX,
GC_BINLOG
GC_BINLOG,
CLEAN_TRASH
}

enum TStmtType {
Expand Down
Loading

0 comments on commit b34000b

Please sign in to comment.