From c1b4525a810ffdaa3f4960d3cd88987a58d8e204 Mon Sep 17 00:00:00 2001 From: Googler Date: Thu, 26 Jan 2023 23:44:58 +0000 Subject: [PATCH] Project import generated by Copybara. FolderOrigin-RevId: /usr/local/google/home/hines/copybara/temp/folder-destination7086375037878102507/. --- .github/workflows/ci-cpp-build-gnmi.yml | 37 ++++ cache/cache.go | 17 +- cache/cache_test.go | 12 +- cli/cli.go | 4 +- cli/cli_test.go | 45 ++++ client/gnmi/client.go | 12 -- client/grpcutil/lookup.go | 59 ------ client/grpcutil/lookup_test.go | 70 ------ cmd/gnmi_cli/gnmi_cli.go | 66 ++++-- cmd/gnmi_collector/gnmi_collector.go | 7 +- latency/latency.go | 80 ++++--- latency/latency_test.go | 100 +++++++-- metadata/metadata.go | 20 ++ metadata/metadata_test.go | 38 +++- subscribe/stats.go | 1 - subscribe/subscribe.go | 189 ++++++++--------- subscribe/subscribe_test.go | 270 ++++-------------------- 17 files changed, 472 insertions(+), 555 deletions(-) create mode 100644 .github/workflows/ci-cpp-build-gnmi.yml delete mode 100644 client/grpcutil/lookup.go delete mode 100644 client/grpcutil/lookup_test.go diff --git a/.github/workflows/ci-cpp-build-gnmi.yml b/.github/workflows/ci-cpp-build-gnmi.yml new file mode 100644 index 0000000..bf317cd --- /dev/null +++ b/.github/workflows/ci-cpp-build-gnmi.yml @@ -0,0 +1,37 @@ +name: "bazel build" + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + schedule: + - cron: "0 0 * * *" + +jobs: + build: + runs-on: ubuntu-latest + env: + BAZEL: bazelisk-linux-amd64 + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + - name: Mount bazel cache + uses: actions/cache@v2 + with: + # See https://docs.bazel.build/versions/master/output_directories.html + path: "~/.cache/bazel" + # Create a new cache entry whenever Bazel files change. + # See https://docs.github.com/en/actions/guides/caching-dependencies-to-speed-up-workflows + key: bazel-${{ runner.os }}-build-${{ hashFiles('**/*.bzl', '**/*.bazel') }} + restore-keys: | + bazel-${{ runner.os }}-build- + - name: Install bazelisk + run: | + curl -LO "https://github.com/bazelbuild/bazelisk/releases/download/v1.8.1/$BAZEL" + chmod +x $BAZEL + sudo mv $BAZEL /usr/local/bin/bazel + - name: Build + run: bazel build //... + diff --git a/cache/cache.go b/cache/cache.go index 41c9467..45b594d 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -125,7 +125,7 @@ func New(targets []string, opts ...Option) *Cache { opt(&c.opts) } } - latency.RegisterMetadata(c.opts.latencyWindows) + metadata.RegisterLatencyMetadata(c.opts.latencyWindows) for _, t := range targets { c.Add(t) @@ -331,7 +331,12 @@ func (c *Cache) GnmiUpdate(n *pb.Notification) error { // each individual Update/Delete is sent to cache as // a separate gnmi.Notification. func (t *Target) GnmiUpdate(n *pb.Notification) error { - t.checkTimestamp(T(n.GetTimestamp())) + if u := n.GetUpdate(); len(u) > 0 { + if p := u[0].GetPath().GetElem(); len(p) > 0 && p[0].GetName() != metadata.Root { + // Record latest timestamp from the device, excluding all 'meta' paths. + t.checkTimestamp(T(n.GetTimestamp())) + } + } switch { // Store atomic notifications as a single leaf in the tree. case n.Atomic: @@ -424,6 +429,12 @@ func (t *Target) checkTimestamp(ts time.Time) { } } +func (t *Target) resetTimestamp() { + defer t.tsmu.Unlock() + t.tsmu.Lock() + t.ts = time.Time{} +} + func (t *Target) gnmiUpdate(n *pb.Notification) (*ctree.Leaf, error) { realData := true suffix := n.Update[0].Path @@ -647,6 +658,8 @@ func (t *Target) updateMeta(clients func(*ctree.Leaf)) { // Reset clears the Target of stale data upon a reconnection and notifies // cache client of the removal. func (t *Target) Reset() { + // Clear latest timestamp received from device. + t.resetTimestamp() // Reset metadata to zero values (e.g. connected = false) and notify clients. t.meta.Clear() t.updateMeta(t.client) diff --git a/cache/cache_test.go b/cache/cache_test.go index 3c6174d..223e459 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -221,9 +221,9 @@ func TestMetadataLatency(t *testing.T) { opt, _ := WithLatencyWindows([]string{"2s"}, 2*time.Second) c := New([]string{"dev1"}, opt) for _, path := range [][]string{ - latency.Path(window, latency.Avg), - latency.Path(window, latency.Max), - latency.Path(window, latency.Min), + metadata.LatencyPath(window, latency.Avg), + metadata.LatencyPath(window, latency.Max), + metadata.LatencyPath(window, latency.Min), } { c.Query("dev1", path, func(_ []string, _ *ctree.Leaf, v interface{}) error { t.Errorf("%s exists when device not in sync", strings.Join(path, "/")) @@ -235,9 +235,9 @@ func TestMetadataLatency(t *testing.T) { c.GnmiUpdate(gnmiNotification("dev1", nil, []string{"a", "1"}, timestamp, "b", false)) c.GetTarget("dev1").updateMeta(nil) for _, path := range [][]string{ - latency.Path(window, latency.Avg), - latency.Path(window, latency.Max), - latency.Path(window, latency.Min), + metadata.LatencyPath(window, latency.Avg), + metadata.LatencyPath(window, latency.Max), + metadata.LatencyPath(window, latency.Min), } { c.Query("dev1", path, func(_ []string, _ *ctree.Leaf, v interface{}) error { l := v.(*pb.Notification).Update[0].Val.GetIntVal() diff --git a/cli/cli.go b/cli/cli.go index 037ffe5..0d3c4d6 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -300,8 +300,8 @@ func displayStreamingResults(ctx context.Context, query client.Query, cfg *Confi return } b := make(pathmap) - if cfg.Timestamp != "" { - b.add(append(path, "timestamp"), ts) + if t := formatTime(ts, cfg); t != nil { + b.add(append(path, "timestamp"), t) b.add(append(path, "value"), val) } else { b.add(path, val) diff --git a/cli/cli_test.go b/cli/cli_test.go index 929b2aa..66fcf89 100644 --- a/cli/cli_test.go +++ b/cli/cli_test.go @@ -568,6 +568,51 @@ update: { } } } +`, + }, { + desc: "single target multiple paths with timestamp format (streaming)", + updates: []*fpb.Value{ + {Path: []string{"a", "b"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 5}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 100}}, + {Value: &fpb.Value_Sync{Sync: 1}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 300}}, + {Path: []string{"a", "b"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 6}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 400}}, + }, + query: client.Query{ + Target: "dev1", + Queries: []client.Path{{"a"}}, + Type: client.Stream, + TLS: &tls.Config{InsecureSkipVerify: true}, + }, + cfg: Config{ + Display: display, + DisplayPrefix: "", + DisplayIndent: " ", + DisplayType: "group", + // StreamingDuration will expire before Count updates are received because + // no updates are being streamed in the test. + Count: 3, + StreamingDuration: 100 * time.Millisecond, + Timestamp: "raw", + }, + want: `{ + "dev1": { + "a": { + "b": { + "timestamp": 100, + "value": 5 + } + } + } +} +{ + "dev1": { + "a": { + "b": { + "timestamp": 400, + "value": 6 + } + } + } +} `, }, { desc: "single target multiple paths (single line)", diff --git a/client/gnmi/client.go b/client/gnmi/client.go index 8475920..781b7df 100644 --- a/client/gnmi/client.go +++ b/client/gnmi/client.go @@ -36,7 +36,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/openconfig/ygot/ygot" "github.com/openconfig/gnmi/client" - "github.com/openconfig/gnmi/client/grpcutil" "github.com/openconfig/gnmi/path" "github.com/openconfig/gnmi/value" @@ -112,17 +111,6 @@ func New(ctx context.Context, d client.Destination) (client.Impl, error) { // NewFromConn creates and returns the client based on the provided transport. func NewFromConn(ctx context.Context, conn *grpc.ClientConn, d client.Destination) (*Client, error) { - ok, err := grpcutil.Lookup(ctx, conn, "gnmi.gNMI") - if err != nil { - log.V(1).Infof("gRPC reflection lookup on %q for service gnmi.gNMI failed: %v", d.Addrs, err) - // This check is disabled for now. Reflection will become part of gNMI - // specification in the near future, so we can't enforce it yet. - } - if !ok { - // This check is disabled for now. Reflection will become part of gNMI - // specification in the near future, so we can't enforce it yet. - } - cl := gpb.NewGNMIClient(conn) return &Client{ conn: conn, diff --git a/client/grpcutil/lookup.go b/client/grpcutil/lookup.go deleted file mode 100644 index fa64e66..0000000 --- a/client/grpcutil/lookup.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package grpcutil provides helper functions for working with gRPC targets. -package grpcutil - -import ( - "context" - - "google.golang.org/grpc" - - rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" -) - -// Lookup uses ServerReflection service on conn to find a named service. -// It returns an error if the remote side doesn't support ServerReflection or -// if any other error occurs. -// -// If lookup succeeds and service is found, true is returned. -func Lookup(ctx context.Context, conn *grpc.ClientConn, service string) (bool, error) { - c, err := rpb.NewServerReflectionClient(conn).ServerReflectionInfo(ctx) - if err != nil { - return false, err - } - defer c.CloseSend() - - if err := c.Send(&rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_ListServices{}, - }); err != nil { - return false, err - } - - resp, err := c.Recv() - if err != nil { - return false, err - } - - lsResp := resp.GetListServicesResponse() - for _, s := range lsResp.GetService() { - if s.Name == service { - return true, nil - } - } - - return false, nil -} diff --git a/client/grpcutil/lookup_test.go b/client/grpcutil/lookup_test.go deleted file mode 100644 index fff1a17..0000000 --- a/client/grpcutil/lookup_test.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package grpcutil - -import ( - "golang.org/x/net/context" - "net" - "testing" - - log "github.com/golang/glog" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - - gpb "github.com/openconfig/gnmi/proto/gnmi" -) - -func TestLookup(t *testing.T) { - l, err := net.Listen("tcp", ":0") - if err != nil { - t.Fatal(err) - } - srv := grpc.NewServer() - defer srv.Stop() - - gpb.RegisterGNMIServer(srv, &gpb.UnimplementedGNMIServer{}) - reflection.Register(srv) - - go srv.Serve(l) - - c, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure()) - if err != nil { - log.Fatal(err) - } - - ctx := context.Background() - - t.Run("valid service", func(t *testing.T) { - ok, err := Lookup(ctx, c, "gnmi.gNMI") - if err != nil { - log.Error(err) - } - if !ok { - log.Error("got false, want true") - } - }) - t.Run("unknown service", func(t *testing.T) { - ok, err := Lookup(ctx, c, "unknown.Unknown") - if err != nil { - log.Error(err) - } - if ok { - log.Error("got true, want false") - } - }) - -} diff --git a/cmd/gnmi_cli/gnmi_cli.go b/cmd/gnmi_cli/gnmi_cli.go index 355cb59..d9065e6 100644 --- a/cmd/gnmi_cli/gnmi_cli.go +++ b/cmd/gnmi_cli/gnmi_cli.go @@ -17,10 +17,11 @@ limitations under the License. // The gnmi_cli program implements the GNMI CLI. // // usage: -// gnmi_cli --address=
\ -// -q= \ -// [-qt=] \ -// [-] +// +// gnmi_cli --address=
\ +// -q= \ +// [-qt=] \ +// [-] package main import ( @@ -64,7 +65,8 @@ var ( queryType = flag.String("query_type", client.Once.String(), "Type of result, one of: (o, once, p, polling, s, streaming).") queryAddr = flags.NewStringList(&q.Addrs, nil) - reqProto = flag.String("proto", "", "Text proto for gNMI request.") + reqProto = flag.String("proto", "", "Text proto for gNMI request.") + protoFile = flag.String("proto_file", "", "Text proto file for gNMI request.") capabilitiesFlag = flag.Bool("capabilities", false, `When set, CLI will perform a Capabilities request. Usage: gnmi_cli -capabilities [-proto ] -address
[other flags ...]`) getFlag = flag.Bool("get", false, `When set, CLI will perform a Get request. Usage: gnmi_cli -get -proto -address
[other flags ...]`) @@ -189,9 +191,13 @@ func main() { } func executeCapabilities(ctx context.Context) error { + s, err := protoRequestFromFlags() + if err != nil { + return err + } r := &gpb.CapabilityRequest{} - if err := prototext.Unmarshal([]byte(*reqProto), r); err != nil { - return fmt.Errorf("unable to parse gnmi.CapabilityRequest from %q : %v", *reqProto, err) + if err := prototext.Unmarshal([]byte(s), r); err != nil { + return fmt.Errorf("unable to parse gnmi.CapabilityRequest from %q : %v", s, err) } c, err := gclient.New(ctx, client.Destination{ Addrs: q.Addrs, @@ -212,12 +218,16 @@ func executeCapabilities(ctx context.Context) error { } func executeGet(ctx context.Context) error { - if *reqProto == "" { - return errors.New("-proto must be set") + s, err := protoRequestFromFlags() + if err != nil { + return err + } + if s == "" { + return errors.New("-proto must be set or -proto_file must contain proto") } r := &gpb.GetRequest{} - if err := prototext.Unmarshal([]byte(*reqProto), r); err != nil { - return fmt.Errorf("unable to parse gnmi.GetRequest from %q : %v", *reqProto, err) + if err := prototext.Unmarshal([]byte(s), r); err != nil { + return fmt.Errorf("unable to parse gnmi.GetRequest from %q : %v", s, err) } c, err := gclient.New(ctx, client.Destination{ Addrs: q.Addrs, @@ -238,15 +248,19 @@ func executeGet(ctx context.Context) error { } func executeSet(ctx context.Context) error { - if *reqProto == "" { - return errors.New("-proto must be set") + s, err := protoRequestFromFlags() + if err != nil { + return err + } + if s == "" { + return errors.New("-proto must be set or -proto_file must contain proto") } r := &gpb.SetRequest{} - if err := prototext.Unmarshal([]byte(*reqProto), r); err != nil { - return fmt.Errorf("unable to parse gnmi.SetRequest from %q : %v", *reqProto, err) + if err := prototext.Unmarshal([]byte(s), r); err != nil { + return fmt.Errorf("unable to parse gnmi.SetRequest from %q : %v", s, err) } if *setReqFlag { - cfg.Display([]byte(prototext.Format(r))) + cfg.Display([]byte(prototext.Format(r))) } c, err := gclient.New(ctx, client.Destination{ Addrs: q.Addrs, @@ -267,7 +281,11 @@ func executeSet(ctx context.Context) error { } func executeSubscribe(ctx context.Context) error { - if *reqProto != "" { + s, err := protoRequestFromFlags() + if err != nil { + return err + } + if s != "" { // Convert SubscribeRequest to a client.Query tq, err := cli.ParseSubscribeProto(*reqProto) if err != nil { @@ -360,3 +378,17 @@ func parseQuery(query, delim string) ([]string, error) { } return strings.Split(string(buf), string(null)), nil } + +func protoRequestFromFlags() (string, error) { + if *protoFile != "" { + if *reqProto != "" { + return "", errors.New("only one of -proto and -proto_file are allowed to be set") + } + b, err := os.ReadFile(*protoFile) + if err != nil { + return "", fmt.Errorf("could not read %q: %v", *protoFile, err) + } + return string(b), nil + } + return *reqProto, nil +} diff --git a/cmd/gnmi_collector/gnmi_collector.go b/cmd/gnmi_collector/gnmi_collector.go index 7d7f09d..5ce0d99 100644 --- a/cmd/gnmi_collector/gnmi_collector.go +++ b/cmd/gnmi_collector/gnmi_collector.go @@ -181,7 +181,7 @@ func runCollector(ctx context.Context) error { Connect: c.cache.Connect, ConnectError: c.cache.ConnectError, ConnectionManager: c.cm, - Timeout: subscribe.Timeout, + Timeout: *dialTimeout, Update: func(target string, v *gnmipb.Notification) { // Explicitly set all updates as having come from this target even if // the target itself doesn't report or incorrectly reports the target @@ -217,10 +217,7 @@ func runCollector(ctx context.Context) error { // Initialize the Collector server. cpb.RegisterCollectorServer(srv, coll.New(c.tm.Reconnect)) // Initialize gNMI Proxy Subscribe server. - subscribeSrv, err := subscribe.NewServer(c.cache) - if err != nil { - return fmt.Errorf("Could not instantiate gNMI server: %v", err) - } + subscribeSrv, _ := subscribe.NewServer(c.cache) gnmipb.RegisterGNMIServer(srv, subscribeSrv) // Forward streaming updates to clients. c.cache.SetClient(subscribeSrv.Update) diff --git a/latency/latency.go b/latency/latency.go index 298a384..f7e372a 100644 --- a/latency/latency.go +++ b/latency/latency.go @@ -22,8 +22,6 @@ import ( "fmt" "sync" "time" - - "github.com/openconfig/gnmi/metadata" ) const ( @@ -41,7 +39,8 @@ const ( metaName = "LatencyWindow" ) -var now = time.Now +// Now is a stub for testing. +var Now = time.Now // StatType is the type of latency statistics supported for a time window. type StatType int @@ -55,6 +54,11 @@ const ( Min ) +// Metadata defines the interface required by latency metadata. +type Metadata interface { + SetInt(name string, value int64) error +} + // CompactDurationString returns a compact string for a time window d. It // removes unnecessary suffixes like "0m0s" and "0s" from the Golang // fmt.Sprint generated string of a time.Duration. @@ -94,14 +98,14 @@ func (s stat) metaName() string { } // metaPath returns the metadata path corresponding to the Stat s. -func (s stat) metaPath() []string { - return []string{metadata.Root, ElemLatency, ElemWindow, CompactDurationString(s.window), s.typ.String()} +func (s stat) metaPath(prefix []string) []string { + return append(prefix, ElemLatency, ElemWindow, CompactDurationString(s.window), s.typ.String()) } // Path returns the metadata path for the latency statistics of window w // and type typ. -func Path(w time.Duration, typ StatType) []string { - return stat{window: w, typ: typ}.metaPath() +func Path(w time.Duration, typ StatType, prefix []string) []string { + return stat{window: w, typ: typ}.metaPath(prefix) } // MetadataName returns the metadata name for the latency statistics @@ -120,7 +124,7 @@ type slot struct { } type window struct { - stats map[string]func(string, *metadata.Metadata) + stats map[string]func(string, Metadata) size time.Duration // window size total time.Duration // cumulative latency of this time window sf int64 // scaling factor of total @@ -131,11 +135,11 @@ type window struct { func newWindow(size time.Duration, sf int64) *window { w := &window{ - stats: map[string]func(string, *metadata.Metadata){}, + stats: map[string]func(string, Metadata){}, size: size, sf: sf, } - for st, f := range map[StatType]func(string, *metadata.Metadata){ + for st, f := range map[StatType]func(string, Metadata){ Avg: w.setAvg, Max: w.setMax, Min: w.setMin} { @@ -154,7 +158,7 @@ func (w *window) add(ls *slot) { w.slots = append(w.slots, ls) } -func (w *window) setAvg(name string, m *metadata.Metadata) { +func (w *window) setAvg(name string, m Metadata) { if w.count == 0 { return } @@ -164,7 +168,7 @@ func (w *window) setAvg(name string, m *metadata.Metadata) { } } -func (w *window) setMax(name string, m *metadata.Metadata) { +func (w *window) setMax(name string, m Metadata) { var max time.Duration for _, slot := range w.slots { if slot.max > max { @@ -176,7 +180,7 @@ func (w *window) setMax(name string, m *metadata.Metadata) { } } -func (w *window) setMin(name string, m *metadata.Metadata) { +func (w *window) setMin(name string, m Metadata) { if len(w.slots) == 0 { return } @@ -218,8 +222,8 @@ func (w *window) isCovered(ts time.Time) bool { return false } -func (w *window) updateMeta(m *metadata.Metadata, ts time.Time) { - if !w.isCovered(ts) { +func (w *window) updateMeta(m Metadata, ts time.Time, ignoreInitialWindowCoverage bool) { + if !ignoreInitialWindowCoverage && !w.isCovered(ts) { return } w.slide(ts) @@ -239,6 +243,7 @@ type Latency struct { min time.Duration // minimum latency max time.Duration // maximum latency windows []*window + compute func(ts, now time.Time) time.Duration } // Options contains the options for creating a Latency. @@ -250,6 +255,9 @@ type Options struct { // durations needed to calculate averages. The precision of the Max and Min // stats are not affected by this setting. AvgPrecision time.Duration + // ComputeFunc defines how to calculate latency for the timestamp provided + // to Compute. If unspecified, the latency is calculated as now.Sub(ts). + ComputeFunc func(ts, now time.Time) time.Duration } // New returns a Latency object supporting latency stats for time windows @@ -260,11 +268,19 @@ func New(windowSizes []time.Duration, opts *Options) *Latency { precision = opts.AvgPrecision } sf := precision / time.Nanosecond + compute := func(ts, now time.Time) time.Duration { return now.Sub(ts) } + if opts != nil && opts.ComputeFunc != nil { + compute = opts.ComputeFunc + } var windows []*window for _, size := range windowSizes { windows = append(windows, newWindow(size, sf.Nanoseconds())) } - return &Latency{windows: windows, scaleFactor: sf} + return &Latency{ + windows: windows, + scaleFactor: sf, + compute: compute, + } } // Compute calculates the time difference between now and ts (the timestamp @@ -272,8 +288,8 @@ func New(windowSizes []time.Duration, opts *Options) *Latency { func (l *Latency) Compute(ts time.Time) { l.mu.Lock() defer l.mu.Unlock() - nowTime := now() - lat := nowTime.Sub(ts) + nowTime := Now() + lat := l.compute(ts, nowTime) l.totalDiff += lat / l.scaleFactor l.count++ if lat > l.max { @@ -292,13 +308,23 @@ func (l *Latency) Compute(ts time.Time) { // the corresponding stats in Metadata m. // UpdateReset is expected to be called periodically at a fixed interval // (e.g. 2s) of which the time windows should be multiples of this interval. -func (l *Latency) UpdateReset(m *metadata.Metadata) { +func (l *Latency) UpdateReset(m Metadata) { l.update(m, false) } + +// UpdateLast is the same as UpdateReset except that it doesn't require a +// window to meet the initial window size requirement to update its stats. +// UpdateLast is expected to be called at most once at the end of the life +// of Latency to force updating the windows that haven't received latency +// stats long enough to generate even a single update. For a long lasting +// Latency, there is no need to call UpdateLast. +func (l *Latency) UpdateLast(m Metadata) { l.update(m, true) } + +func (l *Latency) update(m Metadata, ignoreInitialWindowCoverage bool) { l.mu.Lock() defer l.mu.Unlock() - ts := now() + ts := Now() defer func() { for _, window := range l.windows { - window.updateMeta(m, ts) + window.updateMeta(m, ts, ignoreInitialWindowCoverage) } l.start = ts }() @@ -322,18 +348,6 @@ func (l *Latency) UpdateReset(m *metadata.Metadata) { l.max = 0 } -// RegisterMetadata registers latency stats metadata for time windows -// specified in windowSizes. RegisterMetadata is not thread-safe and -// should be called before any metadata.Metadata is instantiated. -func RegisterMetadata(windowSizes []time.Duration) { - for _, size := range windowSizes { - for _, typ := range []StatType{Avg, Max, Min} { - st := stat{window: size, typ: typ} - metadata.RegisterIntValue(st.metaName(), &metadata.IntValue{Path: st.metaPath()}) - } - } -} - // ParseWindows parses the time durations of latency windows and verify they // are multiples of the metadata update period. func ParseWindows(tds []string, metaUpdatePeriod time.Duration) ([]time.Duration, error) { diff --git a/latency/latency_test.go b/latency/latency_test.go index 6461ef1..22599be 100644 --- a/latency/latency_test.go +++ b/latency/latency_test.go @@ -17,32 +17,52 @@ limitations under the License. package latency import ( + "errors" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/openconfig/gnmi/errdiff" - "github.com/openconfig/gnmi/metadata" ) +type fakeMeta struct { + m map[string]int64 + err error +} + +func (m *fakeMeta) SetInt(name string, val int64) error { + if m.err != nil { + return m.err + } + m.m[name] = val + return nil +} + +func (m *fakeMeta) GetInt(name string) (int64, error) { + v, ok := m.m[name] + if !ok { + return 0, errors.New("error") + } + return v, nil +} + func TestLatencyWithoutWindows(t *testing.T) { defer func() { - now = time.Now + Now = time.Now }() var windows []time.Duration - RegisterMetadata(windows) lat := New(windows, nil) - m := metadata.New() + m := &fakeMeta{err: errors.New("error")} compute := func(ts, nts time.Time) { - now = func() time.Time { return nts } + Now = func() time.Time { return nts } lat.Compute(ts) } updateReset := func(nts time.Time) { - now = func() time.Time { return nts } + Now = func() time.Time { return nts } lat.UpdateReset(m) } - // Make sure it is still ok to call Compute and UpdateReset functions - // even if no latency windows are set. + // Make sure it is still ok (no panics) to call Compute and UpdateReset + // functions even if no latency windows are set. compute(time.Unix(97, 0), time.Unix(98, 0)) // 1 second compute(time.Unix(96, 0), time.Unix(99, 0)) // 3 second updateReset(time.Unix(100, 0)) @@ -53,19 +73,18 @@ func TestLatencyWithoutWindows(t *testing.T) { func TestAvgLatency(t *testing.T) { defer func() { - now = time.Now + Now = time.Now }() win := 2 * time.Second windows := []time.Duration{win} - RegisterMetadata(windows) lat := New(windows, &Options{AvgPrecision: time.Microsecond}) - m := metadata.New() + m := &fakeMeta{m: map[string]int64{}} compute := func(ts, nts time.Time) { - now = func() time.Time { return nts } + Now = func() time.Time { return nts } lat.Compute(ts) } updateReset := func(nts time.Time) { - now = func() time.Time { return nts } + Now = func() time.Time { return nts } lat.UpdateReset(m) } compute(time.Unix(96, 999398800), time.Unix(98, 0)) // 1 second 601200 ns @@ -86,13 +105,56 @@ func TestAvgLatency(t *testing.T) { } } +func TestUpdateLast(t *testing.T) { + defer func() { + Now = time.Now + }() + win := time.Minute + windows := []time.Duration{win} + lat := New(windows, &Options{ + AvgPrecision: time.Microsecond, + ComputeFunc: func(ts time.Time, now time.Time) time.Duration { return now.Sub(ts) / 2 }, + }) + m := &fakeMeta{m: map[string]int64{}} + compute := func(ts, nts time.Time) { + Now = func() time.Time { return nts } + lat.Compute(ts) + } + updateReset := func(nts time.Time) { + Now = func() time.Time { return nts } + lat.UpdateReset(m) + } + compute(time.Unix(96, 999398800), time.Unix(98, 0)) // 1 second 601200 ns + compute(time.Unix(96, 0), time.Unix(99, 803400)) // 3 second 803400 ns + updateReset(time.Unix(100, 0)) // No update because only 2 seconds have passed. + for _, typ := range []StatType{Avg, Max, Min} { + if _, err := m.GetInt(MetadataName(win, typ)); err == nil { + t.Fatalf("metadata %q: didn't get expected error", MetadataName(win, typ)) + } + } + Now = func() time.Time { return time.Unix(100, 0) } + lat.UpdateLast(m) // Force an update even though 2 seconds < 1 minute + for name, want := range map[string]int64{ + MetadataName(win, Avg): 1000350000, + MetadataName(win, Max): 1500401700, + MetadataName(win, Min): 500300600, + } { + val, err := m.GetInt(name) + if err != nil { + t.Fatalf("metadata %q: got unexpected error %v", name, err) + } + if val != want { + t.Errorf("metadata %q: got %d, want %d", name, val, want) + } + } +} + func TestLatency(t *testing.T) { defer func() { - now = time.Now + Now = time.Now }() smWin, mdWin, lgWin := 2*time.Second, 4*time.Second, 8*time.Second windows := []time.Duration{smWin, mdWin, lgWin} - RegisterMetadata(windows) meta := func(w time.Duration, typ StatType) string { return stat{window: w, typ: typ}.metaName() } @@ -118,7 +180,7 @@ func TestLatency(t *testing.T) { for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { lat := New(windows, tt.opts) - m := metadata.New() + m := &fakeMeta{m: map[string]int64{}} checkLatency := func(desc string, lm map[string]time.Duration) { for name, want := range lm { val, err := m.GetInt(name) @@ -141,11 +203,11 @@ func TestLatency(t *testing.T) { checkLatency("initial state", nil) compute := func(ts, nts time.Time) { - now = func() time.Time { return nts } + Now = func() time.Time { return nts } lat.Compute(ts) } updateReset := func(nts time.Time) { - now = func() time.Time { return nts } + Now = func() time.Time { return nts } lat.UpdateReset(m) } @@ -254,7 +316,7 @@ func TestParseWindows(t *testing.T) { windows []string period time.Duration want []time.Duration - err interface{} + err any }{{ desc: "wrong time Duration", windows: []string{"abc"}, diff --git a/metadata/metadata.go b/metadata/metadata.go index 22dbd9c..865889f 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -21,6 +21,9 @@ import ( "errors" "fmt" "sync" + "time" + + "github.com/openconfig/gnmi/latency" ) const ( @@ -314,3 +317,20 @@ func (m *Metadata) GetStr(value string) (string, error) { } return v, nil } + +// LatencyPath returns the metadata path for the latency statistics of +// window w and type typ. +func LatencyPath(w time.Duration, typ latency.StatType) []string { + return latency.Path(w, typ, []string{Root}) +} + +// RegisterLatencyMetadata registers latency stats metadata for time windows +// specified in windowSizes. RegisterLatencyMetadata is not thread-safe and +// should be called before any metadata.Metadata is instantiated. +func RegisterLatencyMetadata(windowSizes []time.Duration) { + for _, size := range windowSizes { + for _, typ := range []latency.StatType{latency.Avg, latency.Max, latency.Min} { + RegisterIntValue(latency.MetadataName(size, typ), &IntValue{Path: LatencyPath(size, typ)}) + } + } +} diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 54c5164..4b82baa 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -16,7 +16,13 @@ limitations under the License. package metadata -import "testing" +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/openconfig/gnmi/latency" +) func TestPath(t *testing.T) { if path := Path("invalid"); path != nil { @@ -287,3 +293,33 @@ func TestClear(t *testing.T) { } } + +func TestRegisterLatencyMetadata(t *testing.T) { + windows := []time.Duration{2 * time.Second, 5 * time.Minute} + latMetas := map[string][]string{} + for _, w := range windows { + for _, typ := range []latency.StatType{latency.Avg, latency.Max, latency.Min} { + latMetas[latency.MetadataName(w, typ)] = latency.Path(w, typ, []string{Root}) + } + } + defer func() { // cleanup registered latency metadata + for metaName := range latMetas { + UnregisterIntValue(metaName) + } + }() + for metaName := range latMetas { + if _, exist := TargetIntValues[metaName]; exist { + t.Fatalf("Latency metadata %q existed before registration", metaName) + } + } + RegisterLatencyMetadata(windows) + for metaName, metaPath := range latMetas { + meta, exist := TargetIntValues[metaName] + if !exist { + t.Fatalf("Latency metadata %q does not exist after registration", metaName) + } + if diff := cmp.Diff(metaPath, meta.Path); diff != "" { + t.Errorf("Got wrong path for %q (-want+got): %s", metaName, diff) + } + } +} diff --git a/subscribe/stats.go b/subscribe/stats.go index b8a7ba3..f268194 100644 --- a/subscribe/stats.go +++ b/subscribe/stats.go @@ -24,7 +24,6 @@ import ( // subscribe mode, e.g. stream, once, or poll. type TypeStats struct { ActiveSubscriptionCount int64 // currently active subscription count - PendingSubscriptionCount int64 // currently pending subscription count SubscriptionCount int64 // total subscription count, cumulative } diff --git a/subscribe/subscribe.go b/subscribe/subscribe.go index 3c91c02..2be7a29 100644 --- a/subscribe/subscribe.go +++ b/subscribe/subscribe.go @@ -40,27 +40,6 @@ import ( pb "github.com/openconfig/gnmi/proto/gnmi" ) -var ( - // Value overridden in tests to simulate flow control. - flowControlTest = func() {} - // Timeout specifies how long a send can be pending before the RPC is closed. - Timeout = time.Minute - // SubscriptionLimit specifies how many queries can be processing simultaneously. - // This number includes Once queries, Polling subscriptions, and Streaming - // subscriptions that have not yet synced. Once a streaming subscription has - // synced, it no longer counts against the limit. A polling subscription - // counts against the limit during each polling cycle while it is processed. - SubscriptionLimit = 0 - // Value overridden in tests to evaluate SubscriptionLimit enforcement. - subscriptionLimitTest = func() {} - // Values overridden in tests to verify subscription counters. - updateSubsCountEnterTest = func() {} - updateSubsCountExitTest = func() {} - pendingSubsCountTest = func(int64) {} - // Values overridden in tests to verify subscription client counters. - clientStatsTest = func(int64, int64) {} -) - type aclStub struct{} func (a *aclStub) Check(string) bool { @@ -80,18 +59,69 @@ type ACL interface { // options contains options for creating a Server. type options struct { - enableStats bool noDupReport bool + timeout time.Duration + stats *stats + acl ACL + // Test override functions + flowControlTest func() + clientStatsTest func(int64, int64) + updateSubsCountEnterTest func() + updateSubsCountExitTest func() } // Option defines the function prototype to set options for creating a Server. type Option func(*options) +// WithTimeout returns an Option to specify how long a send can be pending +// before the RPC is closed. +func WithTimeout(t time.Duration) Option { + return func(o *options) { + o.timeout = t + } +} + +// WithFlowControlTest returns an Option to override a test function to +// simulate flow control. +func WithFlowControlTest(f func()) Option { + return func(o *options) { + o.flowControlTest = f + } +} + +// WithClientStatsTest test override function. +func WithClientStatsTest(f func(int64, int64)) Option { + return func(o *options) { + o.clientStatsTest = f + } +} + +// WithUpdateSubsCountEnterTest test override function. +func WithUpdateSubsCountEnterTest(f func()) Option { + return func(o *options) { + o.updateSubsCountEnterTest = f + } +} + +// WithUpdateSubsCountExitTest test override function. +func WithUpdateSubsCountExitTest(f func()) Option { + return func(o *options) { + o.updateSubsCountExitTest = f + } +} + // WithStats returns an Option to enable statistics collection of client // queries to the server. func WithStats() Option { return func(o *options) { - o.enableStats = true + o.stats = newStats() + } +} + +// WithACL sets server ACL. +func WithACL(a ACL) Option { + return func(o *options) { + o.acl = a } } @@ -112,24 +142,7 @@ type Server struct { c *cache.Cache // The cache queries are performed against. m *match.Match // Structure to match updates against active subscriptions. - a ACL // server ACL. - // subscribeSlots is a channel of size SubscriptionLimit to restrict how many - // queries are in flight. - subscribeSlots chan struct{} - timeout time.Duration - stats *stats - noDupReport bool -} - -func newServer(c *cache.Cache, o options) (*Server, error) { - s := &Server{c: c, m: match.New(), timeout: Timeout, noDupReport: o.noDupReport} - if o.enableStats { - s.stats = newStats() - } - if SubscriptionLimit > 0 { - s.subscribeSlots = make(chan struct{}, SubscriptionLimit) - } - return s, nil + o options } // NewServer instantiates server to handle client queries. The cache should be @@ -141,12 +154,10 @@ func NewServer(c *cache.Cache, opts ...Option) (*Server, error) { opt(&o) } } - return newServer(c, o) -} - -// SetACL sets server ACL. This method is called before server starts to run. -func (s *Server) SetACL(a ACL) { - s.a = a + if o.timeout == 0 { + o.timeout = time.Minute + } + return &Server{c: c, m: match.New(), o: o}, nil } // UpdateNotification uses paths in a pb.Notification n to match registered @@ -179,26 +190,30 @@ func (s *Server) Update(n *ctree.Leaf) { } func (s *Server) updateTargetCounts(target string) func() { - if s.stats == nil { + if s.o.stats == nil { return func() {} } - st := s.stats.targetStats(target) + st := s.o.stats.targetStats(target) atomic.AddInt64(&st.ActiveSubscriptionCount, 1) atomic.AddInt64(&st.SubscriptionCount, 1) return func() { atomic.AddInt64(&st.ActiveSubscriptionCount, -1) - updateSubsCountExitTest() + if s.o.updateSubsCountExitTest != nil { + s.o.updateSubsCountExitTest() + } } } func (s *Server) updateTypeCounts(typ string) func() { - if s.stats == nil { + if s.o.stats == nil { return func() {} } - st := s.stats.typeStats(typ) + st := s.o.stats.typeStats(typ) atomic.AddInt64(&st.ActiveSubscriptionCount, 1) atomic.AddInt64(&st.SubscriptionCount, 1) - updateSubsCountEnterTest() + if s.o.updateSubsCountEnterTest != nil { + s.o.updateSubsCountEnterTest() + } return func() { atomic.AddInt64(&st.ActiveSubscriptionCount, -1) } @@ -232,8 +247,8 @@ func addSubscription(m *match.Match, s *pb.SubscriptionList, c *matchClient) (re func (s *Server) Subscribe(stream pb.GNMI_SubscribeServer) error { c := streamClient{stream: stream, acl: &aclStub{}} var err error - if s.a != nil { - a, err := s.a.NewRPCACL(stream.Context()) + if s.o.acl != nil { + a, err := s.o.acl.NewRPCACL(stream.Context()) if err != nil { log.Errorf("NewRPCACL fails due to %v", err) return status.Error(codes.Unauthenticated, "no authentication/authorization for requested operation") @@ -334,12 +349,14 @@ func (s *Server) sendSubscribeResponse(r *resp, c *streamClient) error { } // Start the timeout before attempting to send. - r.t.Reset(s.timeout) + r.t.Reset(s.o.timeout) // Clear the timeout upon sending. defer r.t.Stop() // An empty function in production, replaced in test to simulate flow control // by blocking before send. - flowControlTest() + if s.o.flowControlTest != nil { + s.o.flowControlTest() + } return r.stream.Send(notification) } @@ -375,15 +392,6 @@ type streamClient struct { errC chan<- error } -func (s *Server) addPendingSubsCount(mode string, inc int64) { - if s.stats == nil { - return - } - st := s.stats.typeStats(mode) - atomic.AddInt64(&st.PendingSubscriptionCount, inc) - pendingSubsCountTest(inc) -} - // processSubscription walks the cache tree and inserts all of the matching // nodes into the coalesce queue followed by a subscriptionSync response. func (s *Server) processSubscription(c *streamClient) { @@ -393,31 +401,10 @@ func (s *Server) processSubscription(c *streamClient) { defer func() { if err != nil { log.Error(err) - c.queue.Close() c.errC <- err } log.V(2).Infof("end processSubscription for %p", c) }() - if s.subscribeSlots != nil { - select { - // Register a subscription in the channel, which will block if SubscriptionLimit queries - // are already in flight. - case s.subscribeSlots <- struct{}{}: - default: - mode := strings.ToLower(c.sr.GetSubscribe().Mode.String()) - s.addPendingSubsCount(mode, 1) - log.V(2).Infof("subscription %s delayed waiting for 1 of %d subscription slots.", c.sr, len(s.subscribeSlots)) - s.subscribeSlots <- struct{}{} - s.addPendingSubsCount(mode, -1) - log.V(2).Infof("subscription %s resumed", c.sr) - } - // Remove subscription from the channel upon completion. - defer func() { - // Artificially hold subscription processing in tests to synchronously test limit. - subscriptionLimitTest() - <-s.subscribeSlots - }() - } if !c.sr.GetSubscribe().GetUpdatesOnly() { for _, subscription := range c.sr.GetSubscribe().Subscription { var fullPath []string @@ -473,13 +460,15 @@ func (s *Server) processPollingSubscription(c *streamClient) { } func (s *Server) updateClientStats(client, target string, dup, queueSize int64) { - if s.stats == nil { + if s.o.stats == nil { return } - st := s.stats.clientStats(client, target) + st := s.o.stats.clientStats(client, target) atomic.AddInt64(&st.CoalesceCount, dup) atomic.StoreInt64(&st.QueueSize, queueSize) - clientStatsTest(dup, queueSize) + if s.o.clientStatsTest != nil { + s.o.clientStatsTest(dup, queueSize) + } } // sendStreamingResults forwards all streaming updates to a given streaming @@ -491,12 +480,12 @@ func (s *Server) sendStreamingResults(c *streamClient) { // same Peer and has no meaning otherwise. szKey := fmt.Sprintf("%s:%p", peer.Addr, c.sr) defer func() { - if s.stats != nil { - s.stats.removeClientStats(szKey) + if s.o.stats != nil { + s.o.stats.removeClientStats(szKey) } }() - t := time.NewTimer(s.timeout) + t := time.NewTimer(s.o.timeout) // Make sure the timer doesn't expire waiting for a value to send, only // waiting to send. t.Stop() @@ -576,7 +565,7 @@ func (s *Server) MakeSubscribeResponse(n interface{}, dup uint32) (*pb.Subscribe // update is set with duplicate count to be on the side of efficiency. // Only attempt to set the duplicate count if it is greater than 0. The default // value in the message is already 0. - if !s.noDupReport && dup > 0 && len(notification.Update) > 0 { + if !s.o.noDupReport && dup > 0 && len(notification.Update) > 0 { // We need a copy of the cached notification before writing a client specific // duplicate count as the notification is shared across all clients. notification = proto.Clone(notification).(*pb.Notification) @@ -613,28 +602,28 @@ func isTargetDelete(l *ctree.Leaf) bool { // stream, once, or poll. Statistics is available only if the Server is // created with NewServerWithStats. func (s *Server) TypeStats() map[string]TypeStats { - if s.stats == nil { + if s.o.stats == nil { return nil } - return s.stats.allTypeStats() + return s.o.stats.allTypeStats() } // TargetStats returns statistics of subscribe queries for all targets. // Statistics is available only if the Server is created with // NewServerWithStats. func (s *Server) TargetStats() map[string]TargetStats { - if s.stats == nil { + if s.o.stats == nil { return nil } - return s.stats.allTargetStats() + return s.o.stats.allTargetStats() } // ClientStats returns states of all subscribe clients such as queue size, // coalesce count. Statistics is available only if the Server is created // with NewServerWithStats. func (s *Server) ClientStats() map[string]ClientStats { - if s.stats == nil { + if s.o.stats == nil { return nil } - return s.stats.allClientStats() + return s.o.stats.allClientStats() } diff --git a/subscribe/subscribe_test.go b/subscribe/subscribe_test.go index 73ebd55..4f0f5f5 100644 --- a/subscribe/subscribe_test.go +++ b/subscribe/subscribe_test.go @@ -25,6 +25,7 @@ import ( "net" "strings" "sync" + "sync/atomic" "testing" "time" @@ -46,10 +47,7 @@ import ( func startServer(targets []string, opts ...Option) (string, *Server, *cache.Cache, func(), error) { c := cache.New(targets) - p, err := NewServer(c, opts...) - if err != nil { - return "", nil, nil, nil, fmt.Errorf("NewServer: %v", err) - } + p, _ := NewServer(c, opts...) c.SetClient(p.Update) lis, err := net.Listen("tcp", "") if err != nil { @@ -477,7 +475,7 @@ func TestGNMIStreamAtomic(t *testing.T) { ns := []*pb.Notification{} sync := false - count := 0 + var count int32 c := client.BaseClient{} q := client.Query{ Addrs: []string{addr}, @@ -491,10 +489,9 @@ func TestGNMIStreamAtomic(t *testing.T) { } switch r := resp.Response.(type) { case *pb.SubscribeResponse_Update: - count++ ns = append(ns, resp.GetUpdate()) // The total updates received should be 3, 1 before sync, 2 after. - if count == 3 { + if atomic.AddInt32(&count, 1) == 3 { c.Close() } case *pb.SubscribeResponse_Error: @@ -503,8 +500,8 @@ func TestGNMIStreamAtomic(t *testing.T) { if sync { t.Fatal("received more than one sync message") } - if count != 1 { - t.Fatalf("initial updates before sync, got %d, want 1", count) + if x := atomic.LoadInt32(&count); x != 1 { + t.Fatalf("initial updates before sync, got %d, want 1", x) } sync = true // Send some updates after the sync occurred. @@ -740,7 +737,7 @@ func TestGNMISubscribeUnresponsiveClient(t *testing.T) { // Start the second client for the same target and actually accept // responses. - count := 0 + var count int32 client2 := client.BaseClient{} q2 := client.Query{ Addrs: []string{addr}, @@ -754,7 +751,7 @@ func TestGNMISubscribeUnresponsiveClient(t *testing.T) { } switch r := resp.Response.(type) { case *pb.SubscribeResponse_Update: - count++ + atomic.AddInt32(&count, 1) case *pb.SubscribeResponse_SyncResponse: client2.Close() default: @@ -852,14 +849,13 @@ func TestGNMICoalescedDupCount(t *testing.T) { }} { t.Run(tt.desc, func(t *testing.T) { // Inject a simulated flow control to block sends and induce coalescing. - flowControlTest = func() { time.Sleep(10 * time.Millisecond) } - dequeCount := 0 + var dequeCount int32 qszReady := make(chan struct{}) dupReady := make(chan struct{}) qszWait := make(chan struct{}) dupWait := make(chan struct{}) - clientStatsTest = func(dup, qsz int64) { - dequeCount++ + clientStatsTest := func(dup, qsz int64) { + atomic.AddInt32(&dequeCount, 1) if qsz > 0 { close(qszReady) <-qszWait @@ -869,11 +865,10 @@ func TestGNMICoalescedDupCount(t *testing.T) { <-dupWait } } - defer func() { - flowControlTest = func() {} - clientStatsTest = func(int64, int64) {} - }() - addr, s, cache, teardown, err := startServer([]string{"dev1"}, tt.opts...) + flowControlTest := func() { time.Sleep(10 * time.Millisecond) } + opts := append(tt.opts, WithFlowControlTest(flowControlTest)) + opts = append(opts, WithClientStatsTest(clientStatsTest)) + addr, s, cache, teardown, err := startServer([]string{"dev1"}, opts...) if err != nil { t.Fatal(err) } @@ -882,7 +877,7 @@ func TestGNMICoalescedDupCount(t *testing.T) { stall := make(chan struct{}) done := make(chan struct{}) coalesced := uint32(0) - count := 0 + var count int32 c := client.BaseClient{} q := client.Query{ Addrs: []string{addr}, @@ -896,11 +891,11 @@ func TestGNMICoalescedDupCount(t *testing.T) { } switch r := resp.Response.(type) { case *pb.SubscribeResponse_Update: - count++ + atomic.AddInt32(&count, 1) if r.Update.Update[0].GetDuplicates() > 0 { coalesced = r.Update.Update[0].GetDuplicates() } - switch count { + switch atomic.LoadInt32(&count) { case 1: close(stall) case 2: @@ -944,17 +939,20 @@ func TestGNMICoalescedDupCount(t *testing.T) { } func TestGNMISubscribeTimeout(t *testing.T) { - // Set a low timeout that is below the induced flowControl delay. - Timeout = 100 * time.Millisecond - // Cause query to hang indefinitely to induce timeout. - flowControlTest = func() { select {} } - // Reset the global variables so as not to interfere with other tests. - defer func() { - Timeout = time.Minute - flowControlTest = func() {} - }() + stall := make(chan struct{}) + defer close(stall) + opts := []Option{ + // Set a low timeout that is below the induced flowControl delay. + WithTimeout(100 * time.Millisecond), + // Cause query to hang indefinitely to induce timeout. + WithFlowControlTest(func() { + select { + case <-stall: + } + }), + } - addr, _, cache, teardown, err := startServer([]string{"dev1", "dev2"}) + addr, _, cache, teardown, err := startServer([]string{"dev1", "dev2"}, opts...) if err != nil { t.Fatal(err) } @@ -984,190 +982,9 @@ func TestGNMISubscribeTimeout(t *testing.T) { } } -func TestSubscriptionLimit(t *testing.T) { - totalQueries := 20 - SubscriptionLimit = 7 - pendingQueries := totalQueries - SubscriptionLimit - causeLimit := make(chan struct{}) - subscriptionLimitTest = func() { - <-causeLimit - } - var pendingSubsMu sync.Mutex - pendingSubs := 0 - pendingSubsReadyCh := make(chan struct{}) - pendingSubsCh := make(chan struct{}) - pendingSubsCountTest = func(inc int64) { - if inc <= 0 { - return - } - pendingSubsMu.Lock() - pendingSubs++ - if pendingSubs == pendingQueries { - close(pendingSubsReadyCh) - pendingSubsMu.Unlock() - <-pendingSubsCh - return - } - pendingSubsMu.Unlock() - } - // Clear the global variables so as not to interfere with other tests. - defer func() { - SubscriptionLimit = 0 - subscriptionLimitTest = func() {} - pendingSubsCountTest = func(int64) {} - }() - - addr, s, _, teardown, err := startServer([]string{"dev1", "dev2"}, WithStats()) - if err != nil { - t.Fatal(err) - } - defer teardown() - - fc := make(chan struct{}) - q := client.Query{ - Addrs: []string{addr}, - Target: "dev1", - Queries: []client.Path{{"a"}}, - Type: client.Once, - NotificationHandler: func(n client.Notification) error { - switch n.(type) { - case client.Sync: - fc <- struct{}{} - } - return nil - }, - TLS: &tls.Config{InsecureSkipVerify: true}, - } - - // Launch parallel queries. - for i := 0; i < totalQueries; i++ { - c := client.BaseClient{} - go c.Subscribe(context.Background(), q, gnmiclient.Type) - } - - timeout := time.After(500 * time.Millisecond) - finished := 0 -firstQueries: - for { - select { - case <-fc: - finished++ - case <-timeout: - break firstQueries - } - } - if finished != SubscriptionLimit { - t.Fatalf("got %d finished queries, want %d", finished, SubscriptionLimit) - } - <-pendingSubsReadyCh - st := s.TypeStats()["once"] - if st.ActiveSubscriptionCount != int64(totalQueries) { - t.Errorf("ActiveSubscriptionCount: got %d, want %d", st.ActiveSubscriptionCount, totalQueries) - } - if st.SubscriptionCount != int64(totalQueries) { - t.Errorf("SubscriptionCount: got %d, want %d", st.SubscriptionCount, totalQueries) - } - if st.PendingSubscriptionCount != int64(pendingQueries) { - t.Errorf("PendingSubscriptionCount: got %d, want %d", st.PendingSubscriptionCount, pendingQueries) - } - close(causeLimit) - close(pendingSubsCh) - timeout = time.After(time.Second) -remainingQueries: - for { - select { - case <-fc: - if finished++; finished == totalQueries { - break remainingQueries - } - case <-timeout: - t.Errorf("Remaining queries did not proceed after limit removed. got %d, want %d", finished, totalQueries) - } - } -} - -func TestGNMISubscriptionLimit(t *testing.T) { - totalQueries := 20 - SubscriptionLimit = 7 - causeLimit := make(chan struct{}) - subscriptionLimitTest = func() { - <-causeLimit - } - // Clear the global variables so as not to interfere with other tests. - defer func() { - SubscriptionLimit = 0 - subscriptionLimitTest = func() {} - }() - - addr, _, _, teardown, err := startServer([]string{"dev1", "dev2"}) - if err != nil { - t.Fatal(err) - } - defer teardown() - - fc := make(chan struct{}) - q := client.Query{ - Addrs: []string{addr}, - Target: "dev1", - Queries: []client.Path{{"a"}}, - Type: client.Once, - ProtoHandler: func(msg proto.Message) error { - resp, ok := msg.(*pb.SubscribeResponse) - if !ok { - return fmt.Errorf("failed to type assert message %#v", msg) - } - switch resp.Response.(type) { - case *pb.SubscribeResponse_SyncResponse: - fc <- struct{}{} - } - return nil - }, - TLS: &tls.Config{InsecureSkipVerify: true}, - } - - // Launch parallel queries. - for i := 0; i < totalQueries; i++ { - c := client.BaseClient{} - go c.Subscribe(context.Background(), q, gnmiclient.Type) - } - - timeout := time.After(500 * time.Millisecond) - finished := 0 -firstQueries: - for { - select { - case <-fc: - finished++ - case <-timeout: - break firstQueries - } - } - if finished != SubscriptionLimit { - t.Fatalf("got %d finished queries, want %d", finished, SubscriptionLimit) - } - - close(causeLimit) - timeout = time.After(time.Second) -remainingQueries: - for { - select { - case <-fc: - if finished++; finished == totalQueries { - break remainingQueries - } - case <-timeout: - t.Errorf("Remaining queries did not proceed after limit removed. got %d, want %d", finished, totalQueries) - } - } -} - func TestGNMIMultipleSubscriberCoalescion(t *testing.T) { // Inject a simulated flow control to block sends and induce coalescing. - flowControlTest = func() { time.Sleep(time.Second) } - defer func() { - flowControlTest = func() {} - }() - addr, _, cache, teardown, err := startServer([]string{"dev1"}) + addr, _, cache, teardown, err := startServer([]string{"dev1"}, WithFlowControlTest(func() { time.Sleep(time.Second) })) if err != nil { t.Fatal(err) } @@ -1229,7 +1046,7 @@ func TestGNMIServerStats(t *testing.T) { ready2 := make(chan struct{}) ready3 := make(chan struct{}) subsEnterCount := 0 - updateSubsCountEnterTest = func() { + updateSubsCountEnterTest := func() { subsEnterCount++ switch subsEnterCount { case 1: @@ -1244,7 +1061,7 @@ func TestGNMIServerStats(t *testing.T) { wait2 := make(chan struct{}) wait3 := make(chan struct{}) subsExitCount := 0 - updateSubsCountExitTest = func() { + updateSubsCountExitTest := func() { subsExitCount++ switch subsExitCount { case 1: @@ -1255,11 +1072,12 @@ func TestGNMIServerStats(t *testing.T) { close(wait3) } } - defer func() { - updateSubsCountEnterTest = func() {} - updateSubsCountExitTest = func() {} - }() - addr, s, cache, teardown, err := startServer([]string{"dev1", "dev2"}, WithStats()) + opts := []Option{ + WithStats(), + WithUpdateSubsCountEnterTest(updateSubsCountEnterTest), + WithUpdateSubsCountExitTest(updateSubsCountExitTest), + } + addr, s, cache, teardown, err := startServer([]string{"dev1", "dev2"}, opts...) if err != nil { t.Fatal(err) } @@ -1468,11 +1286,7 @@ func (s *fakeSubServer) Context() context.Context { func TestGNMIACL(t *testing.T) { targets := []string{"dev-pii", "dev-no-pii"} c := cache.New(targets) - p, err := NewServer(c) - if err != nil { - t.Errorf("NewServer: %v", err) - return - } + p, _ := NewServer(c) paths := []client.Path{ {"dev-pii", "a", "b"}, {"dev-pii", "a", "c"}, @@ -1541,7 +1355,7 @@ func TestGNMIACL(t *testing.T) { for _, test := range tests { facl := &fakeACL{} - p.SetACL(facl) + p.o.acl = facl var cancel context.CancelFunc subSvr := &fakeSubServer{user: test.user, req: make(chan *pb.SubscribeRequest, 2),