Skip to content

Commit

Permalink
planner/core: pushdown TiFlash store type check to ColumnToProto (#55463
Browse files Browse the repository at this point in the history
)

close #55462
  • Loading branch information
joechenrh authored Aug 19, 2024
1 parent 7ef2d46 commit 559f634
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func buildDAGPB(exprCtx exprctx.BuildContext, distSQLCtx *distsqlctx.DistSQLCont
}

func constructTableScanPB(ctx exprctx.BuildContext, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) {
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos)
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos, false)
tblScan.TableId = tblInfo.ID
err := tables.SetPBColumnsDefaultValue(ctx, tblScan.Columns, colInfos)
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (r *reorgInfo) String() string {
}

func constructDescTableScanPB(physicalTableID int64, tblInfo *model.TableInfo, handleCols []*model.ColumnInfo) *tipb.Executor {
tblScan := tables.BuildTableScanFromInfos(tblInfo, handleCols)
tblScan := tables.BuildTableScanFromInfos(tblInfo, handleCols, false)
tblScan.TableId = physicalTableID
tblScan.Desc = true
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (e *CheckIndexRangeExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.table.ID,
IndexId: e.index.ID,
Columns: util.ColumnsToProto(e.cols, e.table.PKIsHandle, true),
Columns: util.ColumnsToProto(e.cols, e.table.PKIsHandle, true, false),
}
return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error {
}

func (e *RecoverIndexExec) constructTableScanPB(tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) {
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos)
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos, false)
tblScan.TableId = e.physicalID
err := tables.SetPBColumnsDefaultValue(e.Ctx().GetExprCtx(), tblScan.Columns, colInfos)
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
Expand Down Expand Up @@ -880,7 +880,7 @@ func (e *CleanupIndexExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.physicalID,
IndexId: e.index.Meta().ID,
Columns: util.ColumnsToProto(e.columns, e.table.Meta().PKIsHandle, true),
Columns: util.ColumnsToProto(e.columns, e.table.Meta().PKIsHandle, true, false),
PrimaryColumnIds: tables.TryGetCommonPkColumnIds(e.table.Meta()),
}
return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2881,7 +2881,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(
SampleSize: int64(opts[ast.AnalyzeOptNumSamples]),
SampleRate: sampleRate,
SketchSize: statistics.MaxSketchSize,
ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle, false),
ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle, false, false),
ColumnGroups: colGroups,
}
if task.TblInfo != nil {
Expand Down Expand Up @@ -3011,7 +3011,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(
BucketSize: int64(opts[ast.AnalyzeOptNumBuckets]),
SampleSize: MaxRegionSampleSize,
SketchSize: statistics.MaxSketchSize,
ColumnsInfo: util.ColumnsToProto(cols, task.HandleCols != nil && task.HandleCols.IsInt(), false),
ColumnsInfo: util.ColumnsToProto(cols, task.HandleCols != nil && task.HandleCols.IsInt(), false, false),
CmsketchDepth: &depth,
CmsketchWidth: &width,
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2458,13 +2458,7 @@ func convertToTableScan(ds *DataSource, prop *property.PhysicalProperty, candida
// TiFlash fast mode(https://github.com/pingcap/tidb/pull/35851) does not keep order in TableScan
return base.InvalidTask, nil
}
if ts.StoreType == kv.TiFlash {
for _, col := range ts.Columns {
if col.IsVirtualGenerated() {
col.AddFlag(mysql.GeneratedColumnFlag)
}
}
}

// In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated.
// So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask.
isTiFlashPath := ts.StoreType == kv.TiFlash
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (p *PhysicalTableScan) ToPB(ctx *base.BuildPBContext, storeType kv.StoreTyp
if storeType == kv.TiFlash && p.Table.GetPartitionInfo() != nil && p.IsMPPOrBatchCop && p.SCtx().GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
return p.partitionTableScanToPBForFlash(ctx)
}
tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns)
tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns, p.StoreType == kv.TiFlash)
tsExec.Desc = p.Desc
keepOrder := p.KeepOrder
tsExec.KeepOrder = &keepOrder
Expand Down Expand Up @@ -488,7 +488,7 @@ func (p *PhysicalIndexScan) ToPB(_ *base.BuildPBContext, _ kv.StoreType) (*tipb.
idxExec := &tipb.IndexScan{
TableId: p.Table.ID,
IndexId: p.Index.ID,
Columns: util.ColumnsToProto(columns, p.Table.PKIsHandle, true),
Columns: util.ColumnsToProto(columns, p.Table.PKIsHandle, true, false),
Desc: p.Desc,
PrimaryColumnIds: pkColIDs,
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/planner/core/plan_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func TestColumnToProto(t *testing.T) {
col := &model.ColumnInfo{
FieldType: *tp,
}
pc := util.ColumnToProto(col, false)
pc := util.ColumnToProto(col, false, false)
expect := &tipb.ColumnInfo{ColumnId: 0, Tp: 3, Collation: 83, ColumnLen: 11, Decimal: 0, Flag: 10, Elems: []string(nil), DefaultVal: []uint8(nil), PkHandle: false, XXX_unrecognized: []uint8(nil)}
require.Equal(t, expect, pc)

cols := []*model.ColumnInfo{col, col}
pcs := util.ColumnsToProto(cols, false, false)
pcs := util.ColumnsToProto(cols, false, false, false)
for _, v := range pcs {
require.Equal(t, int32(10), v.GetFlag())
}
pcs = util.ColumnsToProto(cols, true, false)
pcs = util.ColumnsToProto(cols, true, false, false)
for _, v := range pcs {
require.Equal(t, int32(10), v.GetFlag())
}
Expand All @@ -56,19 +56,19 @@ func TestColumnToProto(t *testing.T) {
col1 := &model.ColumnInfo{
FieldType: *tp,
}
pc = util.ColumnToProto(col1, false)
pc = util.ColumnToProto(col1, false, false)
require.Equal(t, int32(8), pc.Collation)

collate.SetNewCollationEnabledForTest(true)

pc = util.ColumnToProto(col, false)
pc = util.ColumnToProto(col, false, false)
expect = &tipb.ColumnInfo{ColumnId: 0, Tp: 3, Collation: -83, ColumnLen: 11, Decimal: 0, Flag: 10, Elems: []string(nil), DefaultVal: []uint8(nil), PkHandle: false, XXX_unrecognized: []uint8(nil)}
require.Equal(t, expect, pc)
pcs = util.ColumnsToProto(cols, true, false)
pcs = util.ColumnsToProto(cols, true, false, false)
for _, v := range pcs {
require.Equal(t, int32(-83), v.Collation)
}
pc = util.ColumnToProto(col1, false)
pc = util.ColumnToProto(col1, false, false)
require.Equal(t, int32(-8), pc.Collation)

tp = types.NewFieldType(mysql.TypeEnum)
Expand All @@ -77,7 +77,7 @@ func TestColumnToProto(t *testing.T) {
col2 := &model.ColumnInfo{
FieldType: *tp,
}
pc = util.ColumnToProto(col2, false)
pc = util.ColumnToProto(col2, false, false)
require.Len(t, pc.Elems, 2)

tp = types.NewFieldTypeBuilder().
Expand All @@ -91,7 +91,7 @@ func TestColumnToProto(t *testing.T) {
col3 := &model.ColumnInfo{
FieldType: *tp,
}
pc = util.ColumnToProto(col3, true)
pc = util.ColumnToProto(col3, true, false)
expect = &tipb.ColumnInfo{ColumnId: 0, Tp: 0xfe, Collation: 63, ColumnLen: 100, Decimal: 0, Flag: 10, Elems: []string(nil), DefaultVal: []uint8(nil), PkHandle: false, XXX_unrecognized: []uint8(nil)}
require.Equal(t, expect, pc)
}
6 changes: 3 additions & 3 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,11 +1953,11 @@ func getSequenceAllocator(allocs autoid.Allocators) (autoid.Allocator, error) {
}

// BuildTableScanFromInfos build tipb.TableScan with *model.TableInfo and *model.ColumnInfo.
func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo) *tipb.TableScan {
func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.ColumnInfo, isTiFlashStore bool) *tipb.TableScan {
pkColIDs := TryGetCommonPkColumnIds(tableInfo)
tsExec := &tipb.TableScan{
TableId: tableInfo.ID,
Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle, false),
Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle, false, isTiFlashStore),
PrimaryColumnIds: pkColIDs,
}
if tableInfo.IsCommonHandle {
Expand All @@ -1971,7 +1971,7 @@ func BuildPartitionTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []
pkColIDs := TryGetCommonPkColumnIds(tableInfo)
tsExec := &tipb.PartitionTableScan{
TableId: tableInfo.ID,
Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle, false),
Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle, false, false),
PrimaryColumnIds: pkColIDs,
IsFastScan: &fastScan,
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ func TLSCipher2String(n uint16) string {
}

// ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo.
func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool, forIndex bool) []*tipb.ColumnInfo {
func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool, forIndex bool, isTiFlashStore bool) []*tipb.ColumnInfo {
cols := make([]*tipb.ColumnInfo, 0, len(columns))
for _, c := range columns {
col := ColumnToProto(c, forIndex)
col := ColumnToProto(c, forIndex, isTiFlashStore)
// TODO: Here `PkHandle`'s meaning is changed, we will change it to `IsHandle` when tikv's old select logic
// is abandoned.
if (pkIsHandle && mysql.HasPriKeyFlag(c.GetFlag())) || c.ID == model.ExtraHandleID {
Expand All @@ -411,7 +411,7 @@ func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool, forIndex bool)
}

// ColumnToProto converts model.ColumnInfo to tipb.ColumnInfo.
func ColumnToProto(c *model.ColumnInfo, forIndex bool) *tipb.ColumnInfo {
func ColumnToProto(c *model.ColumnInfo, forIndex bool, isTiFlashStore bool) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: c.ID,
Collation: collate.RewriteNewCollationIDIfNeeded(int32(mysql.CollationNames[c.GetCollate()])),
Expand All @@ -420,6 +420,9 @@ func ColumnToProto(c *model.ColumnInfo, forIndex bool) *tipb.ColumnInfo {
Flag: int32(c.GetFlag()),
Elems: c.GetElems(),
}
if isTiFlashStore && c.IsVirtualGenerated() {
pc.Flag |= int32(mysql.GeneratedColumnFlag)
}
if forIndex {
// Use array type for read the multi-valued index.
pc.Tp = int32(c.FieldType.ArrayType().GetType())
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func TestToPB(t *testing.T) {
}
column2.SetCollate("utf8mb4_bin")

assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnToProto(column, false).String())
assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnsToProto([]*model.ColumnInfo{column, column2}, false, false)[0].String())
assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnToProto(column, false, false).String())
assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnsToProto([]*model.ColumnInfo{column, column2}, false, false, false)[0].String())
}

func TestComposeURL(t *testing.T) {
Expand Down

0 comments on commit 559f634

Please sign in to comment.