From d59c4473682853e979849cf7abfbf40d4e6dc5e6 Mon Sep 17 00:00:00 2001 From: chengshiwen Date: Tue, 13 Aug 2024 11:01:41 +0800 Subject: [PATCH] optimize code style --- backend/backend.go | 8 ++--- backend/executor.go | 24 ++++++------- backend/http.go | 12 +++---- backend/proxy.go | 28 +++++++-------- backend/shard.go | 6 ++-- service/http.go | 12 ++++--- transfer/transfer.go | 82 ++++++++++++++++++++++---------------------- 7 files changed, 88 insertions(+), 84 deletions(-) diff --git a/backend/backend.go b/backend/backend.go index 06c5ca5..21b96cf 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -338,9 +338,9 @@ func (ib *Backend) GetHealth(ic *Circle, withStats bool) interface{} { go func(db string) { defer wg.Done() inplace, incorrect := 0, 0 - measurements := ib.GetMeasurements(db) - for _, meas := range measurements { - key := ic.getKeyFn(db, meas) + mms := ib.GetMeasurements(db) + for _, mm := range mms { + key := ic.getKeyFn(db, mm) nb := ic.GetBackend(key) if nb.Url == ib.Url { inplace++ @@ -349,7 +349,7 @@ func (ib *Backend) GetHealth(ic *Circle, withStats bool) interface{} { } } smap.Store(db, map[string]int{ - "measurements": len(measurements), + "measurements": len(mms), "inplace": inplace, "incorrect": incorrect, }) diff --git a/backend/executor.go b/backend/executor.go index 90b7996..e382b96 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -59,9 +59,9 @@ func query(w http.ResponseWriter, req *http.Request, ip *Proxy, key string, fn f return nil, ErrBackendsUnavailable } -func ReadProm(w http.ResponseWriter, req *http.Request, ip *Proxy, db, meas string) (err error) { - // all circles -> backend by key(db,meas) -> select or show - key := ip.GetKey(db, meas) +func ReadProm(w http.ResponseWriter, req *http.Request, ip *Proxy, db, mm string) (err error) { + // all circles -> backend by key(db,mm) -> select or show + key := ip.GetKey(db, mm) fn := func(be *Backend, req *http.Request, w http.ResponseWriter) ([]byte, error) { err = be.ReadProm(req, w) return nil, err @@ -70,9 +70,9 @@ func ReadProm(w http.ResponseWriter, req *http.Request, ip *Proxy, db, meas stri return } -func QueryFlux(w http.ResponseWriter, req *http.Request, ip *Proxy, bucket, meas string) (err error) { - // all circles -> backend by key(org,bucket,meas) -> query flux - key := ip.GetKey(bucket, meas) +func QueryFlux(w http.ResponseWriter, req *http.Request, ip *Proxy, bucket, measurement string) (err error) { + // all circles -> backend by key(bucket,measurement) -> query flux + key := ip.GetKey(bucket, measurement) fn := func(be *Backend, req *http.Request, w http.ResponseWriter) ([]byte, error) { err = be.QueryFlux(req, w) return nil, err @@ -82,12 +82,12 @@ func QueryFlux(w http.ResponseWriter, req *http.Request, ip *Proxy, bucket, meas } func QueryFromQL(w http.ResponseWriter, req *http.Request, ip *Proxy, tokens []string, db string) (body []byte, err error) { - // all circles -> backend by key(db,meas) -> select or show - meas, err := GetMeasurementFromTokens(tokens) + // all circles -> backend by key(db,mm) -> select or show + mm, err := GetMeasurementFromTokens(tokens) if err != nil { return nil, ErrGetMeasurement } - key := ip.GetKey(db, meas) + key := ip.GetKey(db, mm) fn := func(be *Backend, req *http.Request, w http.ResponseWriter) ([]byte, error) { qr := be.Query(req, w, false) return qr.Body, qr.Err @@ -151,12 +151,12 @@ func QueryShowQL(w http.ResponseWriter, req *http.Request, ip *Proxy, tokens []s } func QueryDeleteOrDropQL(w http.ResponseWriter, req *http.Request, ip *Proxy, tokens []string, db string) (body []byte, err error) { - // all circles -> backend by key(db,meas) -> delete or drop measurement/series - meas, err := GetMeasurementFromTokens(tokens) + // all circles -> backend by key(db,mm) -> delete or drop measurement/series + mm, err := GetMeasurementFromTokens(tokens) if err != nil { return nil, err } - key := ip.GetKey(db, meas) + key := ip.GetKey(db, mm) backends := ip.GetBackends(key) return QueryBackends(backends, req, w) } diff --git a/backend/http.go b/backend/http.go index 3da3029..63d4976 100644 --- a/backend/http.go +++ b/backend/http.go @@ -463,13 +463,13 @@ func (hb *HttpBackend) GetMeasurements(db string) []string { return hb.GetSeriesValues(db, "show measurements") } -func (hb *HttpBackend) GetTagKeys(db, rp, meas string) []string { - return hb.GetSeriesValues(db, fmt.Sprintf("show tag keys from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(meas))) +func (hb *HttpBackend) GetTagKeys(db, rp, mm string) []string { + return hb.GetSeriesValues(db, fmt.Sprintf("show tag keys from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(mm))) } -func (hb *HttpBackend) GetFieldKeys(db, rp, meas string) map[string][]string { +func (hb *HttpBackend) GetFieldKeys(db, rp, mm string) map[string][]string { fieldKeys := make(map[string][]string) - q := fmt.Sprintf("show field keys from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(meas)) + q := fmt.Sprintf("show field keys from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(mm)) qr := hb.Query(NewQueryRequest("GET", db, q, ""), nil, true) if qr.Err != nil { return fieldKeys @@ -484,8 +484,8 @@ func (hb *HttpBackend) GetFieldKeys(db, rp, meas string) map[string][]string { return fieldKeys } -func (hb *HttpBackend) DropMeasurement(db, meas string) ([]byte, error) { - q := fmt.Sprintf("drop measurement \"%s\"", util.EscapeIdentifier(meas)) +func (hb *HttpBackend) DropMeasurement(db, mm string) ([]byte, error) { + q := fmt.Sprintf("drop measurement \"%s\"", util.EscapeIdentifier(mm)) qr := hb.Query(NewQueryRequest("POST", db, q, ""), nil, true) return qr.Body, qr.Err } diff --git a/backend/proxy.go b/backend/proxy.go index a2dae51..2c41517 100644 --- a/backend/proxy.go +++ b/backend/proxy.go @@ -45,8 +45,8 @@ func NewProxy(cfg *ProxyConfig) (ip *Proxy) { return } -func (ip *Proxy) GetKey(db, meas string) string { - return ip.sTpl.GetKey(db, meas) +func (ip *Proxy) GetKey(db, mm string) string { + return ip.sTpl.GetKey(db, mm) } func (ip *Proxy) GetBackends(key string) []*Backend { @@ -88,11 +88,11 @@ func (ip *Proxy) IsForbiddenDB(db string) bool { } func (ip *Proxy) QueryFlux(w http.ResponseWriter, req *http.Request, qr *QueryRequest) (err error) { - var bucket, meas string + var bucket, measurement string if qr.Query != "" { - bucket, meas, err = ScanQuery(qr.Query) + bucket, measurement, err = ScanQuery(qr.Query) } else if qr.Spec != nil { - bucket, meas, err = ScanSpec(qr.Spec) + bucket, measurement, err = ScanSpec(qr.Spec) } if err != nil { return @@ -102,10 +102,10 @@ func (ip *Proxy) QueryFlux(w http.ResponseWriter, req *http.Request, qr *QueryRe } else if ip.IsForbiddenDB(bucket) { return fmt.Errorf("database forbidden: %s", bucket) } - if meas == "" { + if measurement == "" { return ErrGetMeasurement } - return QueryFlux(w, req, ip, bucket, meas) + return QueryFlux(w, req, ip, bucket, measurement) } func (ip *Proxy) Query(w http.ResponseWriter, req *http.Request) (body []byte, err error) { @@ -177,20 +177,20 @@ func (ip *Proxy) Write(p []byte, db, rp, precision string) (err error) { func (ip *Proxy) WriteRow(line []byte, db, rp, precision string) { nanoLine := AppendNano(line, precision) - meas, err := ScanKey(nanoLine) + mm, err := ScanKey(nanoLine) if err != nil { log.Printf("scan key error: %s", err) return } - if !RapidCheck(nanoLine[len(meas):]) { + if !RapidCheck(nanoLine[len(mm):]) { log.Printf("invalid format, db: %s, rp: %s, precision: %s, line: %s", db, rp, precision, string(line)) return } - key := ip.GetKey(db, meas) + key := ip.GetKey(db, mm) backends := ip.GetBackends(key) if len(backends) == 0 { - log.Printf("write data error: can't get backends, db: %s, meas: %s", db, meas) + log.Printf("write data error: can't get backends, db: %s, mm: %s", db, mm) return } @@ -206,11 +206,11 @@ func (ip *Proxy) WriteRow(line []byte, db, rp, precision string) { func (ip *Proxy) WritePoints(points []models.Point, db, rp string) error { var err error for _, pt := range points { - meas := string(pt.Name()) - key := ip.GetKey(db, meas) + mm := string(pt.Name()) + key := ip.GetKey(db, mm) backends := ip.GetBackends(key) if len(backends) == 0 { - log.Printf("write point error: can't get backends, db: %s, meas: %s", db, meas) + log.Printf("write point error: can't get backends, db: %s, mm: %s", db, mm) err = ErrEmptyBackends continue } diff --git a/backend/shard.go b/backend/shard.go index 4f9d183..bdb83af 100644 --- a/backend/shard.go +++ b/backend/shard.go @@ -41,14 +41,14 @@ func newShardTpl(tpl string) *shardTpl { return sTpl } -func (sTpl *shardTpl) GetKey(db, meas string) string { +func (sTpl *shardTpl) GetKey(db, mm string) string { var b strings.Builder - b.Grow(len(sTpl.tpl) + (len(db)-len(ShardKeyVarDb))*sTpl.dbCnt + (len(meas)-len(ShardKeyVarMm))*sTpl.mmCnt) + b.Grow(len(sTpl.tpl) + (len(db)-len(ShardKeyVarDb))*sTpl.dbCnt + (len(mm)-len(ShardKeyVarMm))*sTpl.mmCnt) for _, item := range sTpl.items { if item == ShardKeyVarDb { b.WriteString(db) } else if item == ShardKeyVarMm { - b.WriteString(meas) + b.WriteString(mm) } else { b.WriteString(item) } diff --git a/service/http.go b/service/http.go index 1f477d7..e8ce9a3 100644 --- a/service/http.go +++ b/service/http.go @@ -288,9 +288,13 @@ func (hs *HttpService) HandlerReplica(w http.ResponseWriter, req *http.Request) } db := req.URL.Query().Get("db") - meas := req.URL.Query().Get("meas") - if db != "" && meas != "" { - key := hs.ip.GetKey(db, meas) + mm := req.URL.Query().Get("mm") + if mm == "" { + // compatible with version <= 2.5.11 + mm = req.URL.Query().Get("meas") + } + if db != "" && mm != "" { + key := hs.ip.GetKey(db, mm) backends := hs.ip.GetBackends(key) data := make([]map[string]interface{}, len(backends)) for i, b := range backends { @@ -302,7 +306,7 @@ func (hs *HttpService) HandlerReplica(w http.ResponseWriter, req *http.Request) } hs.Write(w, req, http.StatusOK, data) } else { - hs.WriteError(w, req, http.StatusBadRequest, "invalid db or meas") + hs.WriteError(w, req, http.StatusBadRequest, "invalid db or mm") } } diff --git a/transfer/transfer.go b/transfer/transfer.go index 4138cf3..b6c64ac 100644 --- a/transfer/transfer.go +++ b/transfer/transfer.go @@ -49,7 +49,7 @@ type Transfer struct { pool *ants.Pool tlogDir string CircleStates []*CircleState - getKeyFn func(db, meas string) string + getKeyFn func(string, string) string Worker int Batch int Tick int64 @@ -203,7 +203,7 @@ func reformFieldKeys(fieldKeys map[string][]string) map[string]string { return fieldMap } -func (tx *Transfer) write(ch chan *QueryResult, dsts []*backend.Backend, db, rp, meas string, tagMap util.Set, fieldMap map[string]string) error { +func (tx *Transfer) write(ch chan *QueryResult, dsts []*backend.Backend, db, rp, mm string, tagMap util.Set, fieldMap map[string]string) error { var buf bytes.Buffer var wg sync.WaitGroup pool, err := ants.NewPool(len(dsts) * 20) @@ -219,7 +219,7 @@ func (tx *Transfer) write(ch chan *QueryResult, dsts []*backend.Backend, db, rp, columns := serie.Columns valen := len(serie.Values) for idx, value := range serie.Values { - mtagSet := []string{util.EscapeMeasurement(meas)} + mtagSet := []string{util.EscapeMeasurement(mm)} fieldSet := make([]string, 0) for i := 1; i < len(value); i++ { k := columns[i] @@ -255,7 +255,7 @@ func (tx *Transfer) write(ch chan *QueryResult, dsts []*backend.Backend, db, rp, for i := 0; i <= RetryCount; i++ { if i > 0 { time.Sleep(time.Duration(RetryInterval) * time.Second) - tlog.Printf("transfer write retry: %d, err:%s dst:%s db:%s rp:%s meas:%s", i, err, dst.Url, db, rp, meas) + tlog.Printf("transfer write retry: %d, err:%s dst:%s db:%s rp:%s mm:%s", i, err, dst.Url, db, rp, mm) } err = dst.Write(db, rp, p) if err == nil { @@ -263,7 +263,7 @@ func (tx *Transfer) write(ch chan *QueryResult, dsts []*backend.Backend, db, rp, } } if err != nil { - tlog.Printf("transfer write error: %s, dst:%s db:%s rp:%s meas:%s", err, dst.Url, db, rp, meas) + tlog.Printf("transfer write error: %s, dst:%s db:%s rp:%s mm:%s", err, dst.Url, db, rp, mm) } }) } @@ -275,18 +275,18 @@ func (tx *Transfer) write(ch chan *QueryResult, dsts []*backend.Backend, db, rp, return nil } -func (tx *Transfer) query(ch chan *QueryResult, src *backend.Backend, db, rp, meas string) { +func (tx *Transfer) query(ch chan *QueryResult, src *backend.Backend, db, rp, mm string) { defer close(ch) var rsp *backend.ChunkedResponse var err error - q := fmt.Sprintf("select * from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(meas)) + q := fmt.Sprintf("select * from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(mm)) if tx.Tick > 0 { q = fmt.Sprintf("%s where time >= %ds", q, tx.Tick) } for i := 0; i <= RetryCount; i++ { if i > 0 { time.Sleep(time.Duration(RetryInterval) * time.Second) - tlog.Printf("transfer query retry: %d, err:%s src:%s db:%s rp:%s meas:%s batch:%d tick:%d", i, err, src.Url, db, rp, meas, tx.Batch, tx.Tick) + tlog.Printf("transfer query retry: %d, err:%s src:%s db:%s rp:%s mm:%s batch:%d tick:%d", i, err, src.Url, db, rp, mm, tx.Batch, tx.Tick) } rsp, err = src.QueryChunk("GET", db, q, "ns", tx.Batch) if err == nil { @@ -327,9 +327,9 @@ func (tx *Transfer) query(ch chan *QueryResult, src *backend.Backend, db, rp, me } } -func (tx *Transfer) transfer(src *backend.Backend, dsts []*backend.Backend, db, rp, meas string) error { +func (tx *Transfer) transfer(src *backend.Backend, dsts []*backend.Backend, db, rp, mm string) error { ch := make(chan *QueryResult, 20) - go tx.query(ch, src, db, rp, meas) + go tx.query(ch, src, db, rp, mm) var tagMap util.Set var fieldMap map[string]string @@ -337,45 +337,45 @@ func (tx *Transfer) transfer(src *backend.Backend, dsts []*backend.Backend, db, wg.Add(1) go func() { defer wg.Done() - tagKeys := src.GetTagKeys(db, rp, meas) + tagKeys := src.GetTagKeys(db, rp, mm) tagMap = util.NewSetFromSlice(tagKeys) }() wg.Add(1) go func() { defer wg.Done() - fieldKeys := src.GetFieldKeys(db, rp, meas) + fieldKeys := src.GetFieldKeys(db, rp, mm) fieldMap = reformFieldKeys(fieldKeys) }() wg.Wait() - return tx.write(ch, dsts, db, rp, meas, tagMap, fieldMap) + return tx.write(ch, dsts, db, rp, mm, tagMap, fieldMap) } -func (tx *Transfer) submitTransfer(cs *CircleState, src *backend.Backend, dsts []*backend.Backend, db, meas string) { +func (tx *Transfer) submitTransfer(cs *CircleState, src *backend.Backend, dsts []*backend.Backend, db, mm string) { rps := src.GetRetentionPolicies(db) for _, rp := range rps { rp := rp cs.wg.Add(1) tx.pool.Submit(func() { defer cs.wg.Done() - err := tx.transfer(src, dsts, db, rp, meas) + err := tx.transfer(src, dsts, db, rp, mm) if err == nil { - tlog.Printf("transfer done, src:%s dst:%v db:%s rp:%s meas:%s batch:%d tick:%d", src.Url, getBackendUrls(dsts), db, rp, meas, tx.Batch, tx.Tick) + tlog.Printf("transfer done, src:%s dst:%v db:%s rp:%s mm:%s batch:%d tick:%d", src.Url, getBackendUrls(dsts), db, rp, mm, tx.Batch, tx.Tick) } else { - tlog.Printf("transfer error: %s, src:%s dst:%v db:%s rp:%s meas:%s batch:%d tick:%d", err, src.Url, getBackendUrls(dsts), db, rp, meas, tx.Batch, tx.Tick) + tlog.Printf("transfer error: %s, src:%s dst:%v db:%s rp:%s mm:%s batch:%d tick:%d", err, src.Url, getBackendUrls(dsts), db, rp, mm, tx.Batch, tx.Tick) } }) } } -func (tx *Transfer) submitCleanup(cs *CircleState, be *backend.Backend, db, meas string) { +func (tx *Transfer) submitCleanup(cs *CircleState, be *backend.Backend, db, mm string) { cs.wg.Add(1) tx.pool.Submit(func() { defer cs.wg.Done() - _, err := be.DropMeasurement(db, meas) + _, err := be.DropMeasurement(db, mm) if err == nil { - tlog.Printf("cleanup done, backend:%s db:%s meas:%s", be.Url, db, meas) + tlog.Printf("cleanup done, backend:%s db:%s mm:%s", be.Url, db, mm) } else { - tlog.Printf("cleanup error: %s, backend:%s db:%s meas:%s", err, be.Url, db, meas) + tlog.Printf("cleanup error: %s, backend:%s db:%s mm:%s", err, be.Url, db, mm) } }) } @@ -389,23 +389,23 @@ func (tx *Transfer) runTransfer(cs *CircleState, be *backend.Backend, dbs []stri stats := cs.Stats[be.Url] stats.DatabaseTotal = int32(len(dbs)) - measures := make([][]string, len(dbs)) + mms := make([][]string, len(dbs)) var wg sync.WaitGroup for i, db := range dbs { wg.Add(1) go func(i int, db string) { defer wg.Done() - measures[i] = be.GetMeasurements(db) + mms[i] = be.GetMeasurements(db) }(i, db) } wg.Wait() - for i := range measures { - stats.MeasurementTotal += int32(len(measures[i])) + for i := range mms { + stats.MeasurementTotal += int32(len(mms[i])) } for i, db := range dbs { - for _, meas := range measures[i] { - require := fn(cs, be, db, meas, args) + for _, mm := range mms[i] { + require := fn(cs, be, db, mm, args) if require { atomic.AddInt32(&stats.TransferCount, 1) } else { @@ -444,12 +444,12 @@ func (tx *Transfer) Rebalance(circleId int, backends []*backend.Backend, dbs []s tlog.Printf("rebalance done: circle %d", circleId) } -func (tx *Transfer) runRebalance(cs *CircleState, be *backend.Backend, db string, meas string, _ []interface{}) (require bool) { - key := tx.getKeyFn(db, meas) +func (tx *Transfer) runRebalance(cs *CircleState, be *backend.Backend, db string, mm string, _ []interface{}) (require bool) { + key := tx.getKeyFn(db, mm) dst := cs.GetBackend(key) require = dst.Url != be.Url if require { - tx.submitTransfer(cs, be, []*backend.Backend{dst}, db, meas) + tx.submitTransfer(cs, be, []*backend.Backend{dst}, db, mm) } return } @@ -492,14 +492,14 @@ func (tx *Transfer) Recovery(fromCircleId, toCircleId int, backendUrls []string, tlog.Printf("recovery done: circle from %d to %d", fromCircleId, toCircleId) } -func (tx *Transfer) runRecovery(fcs *CircleState, be *backend.Backend, db string, meas string, args []interface{}) (require bool) { +func (tx *Transfer) runRecovery(fcs *CircleState, be *backend.Backend, db string, mm string, args []interface{}) (require bool) { tcs := args[0].(*CircleState) backendUrlSet := args[1].(util.Set) //nolint:all - key := tx.getKeyFn(db, meas) + key := tx.getKeyFn(db, mm) dst := tcs.GetBackend(key) require = backendUrlSet[dst.Url] if require { - tx.submitTransfer(fcs, be, []*backend.Backend{dst}, db, meas) + tx.submitTransfer(fcs, be, []*backend.Backend{dst}, db, mm) } return } @@ -534,8 +534,8 @@ func (tx *Transfer) Resync(dbs []string) { tlog.Printf("resync done") } -func (tx *Transfer) runResync(cs *CircleState, be *backend.Backend, db string, meas string, _ []interface{}) (require bool) { - key := tx.getKeyFn(db, meas) +func (tx *Transfer) runResync(cs *CircleState, be *backend.Backend, db string, mm string, _ []interface{}) (require bool) { + key := tx.getKeyFn(db, mm) dsts := make([]*backend.Backend, 0) for _, tcs := range tx.CircleStates { if tcs.CircleId != cs.CircleId { @@ -545,7 +545,7 @@ func (tx *Transfer) runResync(cs *CircleState, be *backend.Backend, db string, m } require = len(dsts) > 0 if require { - tx.submitTransfer(cs, be, dsts, db, meas) + tx.submitTransfer(cs, be, dsts, db, mm) } return } @@ -577,15 +577,15 @@ func (tx *Transfer) Cleanup(circleId int) { //nolint:all tlog.Printf("cleanup done: circle %d", circleId) } -func (tx *Transfer) runCleanup(cs *CircleState, be *backend.Backend, db string, meas string, _ []interface{}) (require bool) { - key := tx.getKeyFn(db, meas) +func (tx *Transfer) runCleanup(cs *CircleState, be *backend.Backend, db string, mm string, _ []interface{}) (require bool) { + key := tx.getKeyFn(db, mm) dst := cs.GetBackend(key) require = dst.Url != be.Url if require { - tlog.Printf("backend:%s db:%s meas:%s require to cleanup", be.Url, db, meas) - tx.submitCleanup(cs, be, db, meas) + tlog.Printf("backend:%s db:%s mm:%s require to cleanup", be.Url, db, mm) + tx.submitCleanup(cs, be, db, mm) } else { - tlog.Printf("backend:%s db:%s meas:%s checked", be.Url, db, meas) + tlog.Printf("backend:%s db:%s mm:%s checked", be.Url, db, mm) } return }