Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Apr 19, 2024
1 parent 049729a commit 9cc2c3c
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 3 deletions.
2 changes: 2 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/network_util.h"
#include "util/perf_counters.h"
#include "util/thrift_server.h"
#include "util/time.h"

Expand Down Expand Up @@ -88,6 +89,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
get_fragment_executing_count());
heartbeat_result.backend_info.__set_fragment_last_active_time(
get_fragment_last_active_time());
heartbeat_result.backend_info.__set_be_mem(PerfCounters::get_vm_rss());
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUnit;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
Expand All @@ -48,7 +50,7 @@ public class BackendsProcDir implements ProcDirInterface {
.add("LastStartTime").add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum")
.add("DataUsedCapacity").add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct")
.add("MaxDiskUsedPct").add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status")
.add("HeartbeatFailureCounter").add("NodeRole")
.add("HeartbeatFailureCounter").add("NodeRole").add("CpuCores").add("Memory")
.build();

public static final ImmutableList<String> DISK_TITLE_NAMES = new ImmutableList.Builder<String>()
Expand Down Expand Up @@ -166,6 +168,11 @@ public static List<List<String>> getBackendInfos() {
// node role, show the value only when backend is alive.
backendInfo.add(backend.isAlive() ? backend.getNodeRoleTag().value : "");

// cpu cores
backendInfo.add(String.valueOf(backend.getCputCores()));

// memory
backendInfo.add(RuntimeProfile.printCounter(backend.getBeMemoryUse(), TUnit.BYTES));
comparableBackendInfos.add(backendInfo);
}

Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class Backend implements Writable {
@SerializedName("pipelineExecutorSize")
private int pipelineExecutorSize = 1;

private long beMemoryUse = 0;

// Counter of heartbeat failure.
// Once a heartbeat failed, increase this counter by one.
// And if it reaches Config.max_backend_heartbeat_failure_tolerance_count, this backend
Expand Down Expand Up @@ -379,6 +381,10 @@ public int getCputCores() {
return cpuCores;
}

public long getBeMemoryUse() {
return beMemoryUse;
}

public int getPipelineExecutorSize() {
return pipelineExecutorSize;
}
Expand Down Expand Up @@ -771,6 +777,7 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay)
this.nodeRoleTag = Tag.createNotCheck(Tag.TYPE_ROLE, hbResponse.getNodeRole());
}

this.beMemoryUse = hbResponse.getBeMemory();
this.lastUpdateMs = hbResponse.getHbTime();
if (!isAlive.get()) {
isChanged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
private long lastFragmentUpdateTime;
@SerializedName(value = "isShutDown")
private boolean isShutDown = false;
private long beMemory = 0;

public BackendHbResponse() {
super(HeartbeatResponse.Type.BACKEND);
Expand All @@ -76,6 +77,26 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long
this.arrowFlightSqlPort = arrowFlightSqlPort;
}

public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime,
String version, String nodeRole, long fragmentNum, long lastFragmentUpdateTime,
boolean isShutDown, int arrowFlightSqlPort, long beMemory) {
super(HeartbeatResponse.Type.BACKEND);
this.beId = beId;
this.status = HbStatus.OK;
this.bePort = bePort;
this.httpPort = httpPort;
this.brpcPort = brpcPort;
this.hbTime = hbTime;
this.beStartTime = beStartTime;
this.version = version;
this.nodeRole = nodeRole;
this.fragmentNum = fragmentNum;
this.lastFragmentUpdateTime = lastFragmentUpdateTime;
this.isShutDown = isShutDown;
this.arrowFlightSqlPort = arrowFlightSqlPort;
this.beMemory = beMemory;
}

public BackendHbResponse(long beId, String errMsg) {
super(HeartbeatResponse.Type.BACKEND);
this.status = HbStatus.BAD;
Expand Down Expand Up @@ -135,6 +156,10 @@ public boolean isShutDown() {
return isShutDown;
}

public long getBeMemory() {
return beMemory;
}

@Override
protected void readFields(DataInput in) throws IOException {
super.readFields(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,10 @@ public HeartbeatResponse call() {
if (tBackendInfo.isSetIsShutdown()) {
isShutDown = tBackendInfo.isIsShutdown();
}
long beMemory = tBackendInfo.getBeMem();
return new BackendHbResponse(backendId, bePort, httpPort, brpcPort,
System.currentTimeMillis(), beStartTime, version, nodeRole,
fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort);
fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort, beMemory);
} else {
return new BackendHbResponse(backendId, backend.getHost(),
result.getStatus().getErrorMsgs().isEmpty()
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/HeartbeatService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct TBackendInfo {
7: optional string be_node_role
8: optional bool is_shutdown
9: optional Types.TPort arrow_flight_sql_port

10: optional i64 be_mem
// For cloud
1000: optional i64 fragment_executing_count
1001: optional i64 fragment_last_active_time
Expand Down

0 comments on commit 9cc2c3c

Please sign in to comment.