Skip to content

Commit

Permalink
[fix](schema change) reduce memory usage in schema change process (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang authored Jun 25, 2024
1 parent d163979 commit 39a8600
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -247,6 +249,7 @@ protected void runPendingJob() throws AlterCancelException {
try {
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
Expand Down Expand Up @@ -291,7 +294,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
Expand Down Expand Up @@ -401,6 +404,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
}

tbl.readLock();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
Expand Down Expand Up @@ -479,7 +483,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true),
whereClause);
whereClause, objectPool);
rollupBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@

import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -238,6 +240,7 @@ protected void runPendingJob() throws AlterCancelException {

Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
Expand Down Expand Up @@ -286,7 +289,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);

createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
Expand Down Expand Up @@ -410,7 +413,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
}

tbl.readLock();

Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
try {
Map<String, Column> indexColumnMap = Maps.newHashMap();
for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
Expand Down Expand Up @@ -468,7 +471,8 @@ protected void runWaitingTxnJob() throws AlterCancelException {
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null);
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null,
objectPool);
schemaChangeBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class DescriptorTable {

private final HashMap<SlotDescriptor, SlotDescriptor> outToIntermediateSlots = new HashMap<>();

private TDescriptorTable thriftDescTable = null; // serialized version of this

public DescriptorTable() {
}

Expand Down Expand Up @@ -182,6 +184,9 @@ public void materializeIntermediateSlots() {
}

public TDescriptorTable toThrift() {
if (thriftDescTable != null) {
return thriftDescTable;
}
TDescriptorTable result = new TDescriptorTable();
Map<Long, TableIf> referencedTbls = Maps.newHashMap();
for (TupleDescriptor tupleD : tupleDescs.values()) {
Expand All @@ -208,6 +213,7 @@ public TDescriptorTable toThrift() {
for (TableIf tbl : referencedTbls.values()) {
result.addToTableDescriptors(tbl.toThrift());
}
thriftDescTable = result;
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1035,6 +1036,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
} finally {
localTbl.readUnlock();
}
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
Expand Down Expand Up @@ -1067,7 +1069,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
localTbl.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);

task.setInRestoreMode(true);
batchTask.addTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long

short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
Expand Down Expand Up @@ -1909,7 +1910,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long
compactionPolicy, timeSeriesCompactionGoalSizeMbytes,
timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds,
timeSeriesCompactionEmptyRowsetsThreshold, timeSeriesCompactionLevelThreshold,
storeRowColumn, isDynamicSchema, binlogConfig);
storeRowColumn, isDynamicSchema, binlogConfig, objectPool);

task.setStorageFormat(storageFormat);
batchTask.addTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
Expand Down Expand Up @@ -827,7 +828,7 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);

createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class AlterReplicaTask extends AgentTask {
private Expr whereClause;
private DescriptorTable descTable;
private List<Column> baseSchemaColumns;
private Map<Object, Object> objectPool;

/**
* AlterReplicaTask constructor.
Expand All @@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
DescriptorTable descTable, List<Column> baseSchemaColumns, Expr whereClause) {
DescriptorTable descTable, List<Column> baseSchemaColumns, Expr whereClause,
Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);

this.baseTabletId = baseTabletId;
Expand All @@ -79,6 +81,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
this.objectPool = objectPool;
}

public long getBaseTabletId() {
Expand Down Expand Up @@ -125,27 +128,47 @@ public TAlterTabletReqV2 toThrift() {
}
if (defineExprs != null) {
for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setOriginColumnName(slots.get(0).getColumnName());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(entry.getKey());
if (value == null) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setOriginColumnName(slots.get(0).getColumnName());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
objectPool.put(entry.getKey(), mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
}
if (whereClause != null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(Column.WHERE_SIGN);
if (value == null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
req.setDescTbl(descTable.toThrift());

if (baseSchemaColumns != null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
Object value = objectPool.get(baseSchemaColumns);
if (value == null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
req.setColumns(columns);
objectPool.put(baseSchemaColumns, columns);
} else {
List<TColumn> columns = (List<TColumn>) value;
req.setColumns(columns);
}
req.setColumns(columns);
}
return req;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -121,6 +122,8 @@ public class CreateReplicaTask extends AgentTask {

private BinlogConfig binlogConfig;

private Map<Object, Object> objectPool;

public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
Expand All @@ -143,7 +146,8 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
boolean isDynamicSchema,
BinlogConfig binlogConfig) {
BinlogConfig binlogConfig,
Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);

this.replicaId = replicaId;
Expand Down Expand Up @@ -188,6 +192,7 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
this.objectPool = objectPool;
}

public void setIsRecoverTask(boolean isRecoverTask) {
Expand Down Expand Up @@ -248,21 +253,32 @@ public TCreateTabletReq toThrift() {
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
List<TColumn> tColumns = null;
Object tCols = objectPool.get(columns);
if (tCols != null) {
tColumns = (List<TColumn>) tCols;
} else {
tColumns = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
}
objectPool.put(columns, tColumns);
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
Expand All @@ -279,9 +295,15 @@ public TCreateTabletReq toThrift() {
tSchema.setVersionColIdx(versionCol);

if (CollectionUtils.isNotEmpty(indexes)) {
List<TOlapTableIndex> tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
List<TOlapTableIndex> tIndexes = null;
Object value = objectPool.get(indexes);
if (value != null) {
tIndexes = (List<TOlapTableIndex>) value;
} else {
tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
}
}
tSchema.setIndexes(tIndexes);
storageFormat = TStorageFormat.V2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public void setUp() throws AnalysisException {
range2 = Range.closedOpen(pk2, pk3);

// create tasks

Map<Object, Object> objectPool = new HashMap<Object, Object>();
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, false, null);
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, false, null, objectPool);

// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);
Expand Down

0 comments on commit 39a8600

Please sign in to comment.