Skip to content

Commit

Permalink
Merge pull request #6206 from TheThingsNetwork/fix/monotonic-timestamps
Browse files Browse the repository at this point in the history
Fix connection timestamp monotonicity
  • Loading branch information
adriansmares authored May 11, 2023
2 parents 1e06ffe + 46bfb91 commit 1da8d60
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 64 deletions.
47 changes: 40 additions & 7 deletions pkg/gatewayserver/gatewayserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,22 +955,44 @@ func (gs *GatewayServer) handleUpstream(ctx context.Context, conn connectionEntr
}
}

func earliestTimestamp(a, b *timestamppb.Timestamp) *timestamppb.Timestamp {
switch {
case a == nil && b == nil:
return nil
case a == nil:
return b
case b == nil:
return a
default:
if aT, bT := a.AsTime(), b.AsTime(); aT.Before(bT) {
return a
}
return b
}
}

func (gs *GatewayServer) updateConnStats(ctx context.Context, conn connectionEntry) {
decoupledCtx := gs.FromRequestContext(ctx)
logger := log.FromContext(ctx)

ids := conn.Connection.Gateway().GetIds()
connectTime := conn.Connection.ConnectTime()

// Initial update, so that the gateway appears connected.
stats := &ttnpb.GatewayConnectionStats{
ConnectedAt: timestamppb.New(connectTime),
ConnectedAt: timestamppb.New(conn.Connection.ConnectTime()),
Protocol: conn.Connection.Frontend().Protocol(),
GatewayRemoteAddress: conn.Connection.GatewayRemoteAddress(),
}
registerGatewayConnectionStats(ctx, ids, stats)
if gs.statsRegistry != nil {
if err := gs.statsRegistry.Set(decoupledCtx, ids, stats, ttnpb.GatewayConnectionStatsFieldPathsTopLevel, gs.config.ConnectionStatsTTL); err != nil {
if err := gs.statsRegistry.Set(
decoupledCtx,
ids,
func(*ttnpb.GatewayConnectionStats) (*ttnpb.GatewayConnectionStats, []string, error) {
return stats, ttnpb.GatewayConnectionStatsFieldPathsTopLevel, nil
},
gs.config.ConnectionStatsTTL,
); err != nil {
logger.WithError(err).Warn("Failed to initialize connection stats")
}
}
Expand All @@ -986,8 +1008,11 @@ func (gs *GatewayServer) updateConnStats(ctx context.Context, conn connectionEnt
return
}
if err := gs.statsRegistry.Set(
decoupledCtx, ids, stats,
[]string{"connected_at", "disconnected_at"},
decoupledCtx,
ids,
func(*ttnpb.GatewayConnectionStats) (*ttnpb.GatewayConnectionStats, []string, error) {
return stats, []string{"connected_at", "disconnected_at"}, nil
},
gs.config.ConnectionStatsDisconnectTTL,
); err != nil {
logger.WithError(err).Warn("Failed to clear connection stats")
Expand Down Expand Up @@ -1023,12 +1048,20 @@ func (gs *GatewayServer) updateConnStats(ctx context.Context, conn connectionEnt
lastUpdate = time.Now()

stats, paths := conn.Stats()
paths = ttnpb.ExcludeFields(paths, "connected_at", "disconnected_at", "protocol", "gateway_remote_address")
registerGatewayConnectionStats(decoupledCtx, ids, stats)
if gs.statsRegistry == nil {
continue
}
if err := gs.statsRegistry.Set(decoupledCtx, ids, stats, paths, gs.config.ConnectionStatsTTL); err != nil {
if err := gs.statsRegistry.Set(
decoupledCtx,
ids,
func(pb *ttnpb.GatewayConnectionStats) (*ttnpb.GatewayConnectionStats, []string, error) {
stats.ConnectedAt = earliestTimestamp(stats.ConnectedAt, pb.ConnectedAt)
return stats, paths, nil
},
gs.config.ConnectionStatsTTL,
"connected_at",
); err != nil {
logger.WithError(err).Warn("Failed to update connection stats")
}
}
Expand Down
18 changes: 7 additions & 11 deletions pkg/gatewayserver/gatewayserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ func TestGatewayServer(t *testing.T) {
gsConfig := &gatewayserver.Config{
RequireRegisteredGateways: false,
UpdateGatewayLocationDebounceTime: 0,
UpdateConnectionStatsInterval: time.Second,
ConnectionStatsTTL: (1 << 3) * test.Delay,
ConnectionStatsDisconnectTTL: (1 << 6) * test.Delay,
UpdateConnectionStatsInterval: (1 << 5) * test.Delay,
ConnectionStatsTTL: (1 << 6) * test.Delay,
ConnectionStatsDisconnectTTL: (1 << 7) * test.Delay,
Stats: statsRegistry,
FetchGatewayInterval: time.Minute,
FetchGatewayJitter: 1,
Expand Down Expand Up @@ -734,7 +734,7 @@ func TestGatewayServer(t *testing.T) {
})

// Wait for gateway disconnection to be processed.
time.Sleep(6 * timeout)
time.Sleep(2 * config.ConnectionStatsDisconnectTTL)

t.Run(fmt.Sprintf("Traffic/%v", ptc.Protocol), func(t *testing.T) {
a := assertions.New(t)
Expand Down Expand Up @@ -1472,9 +1472,7 @@ func TestGatewayServer(t *testing.T) {

stats, paths := conn.Stats()
a.So(stats, should.NotBeNil)
if statsRegistry != nil {
a.So(statsRegistry.Set(conn.Context(), ids, stats, paths, 0), should.BeNil)
}
a.So(paths, should.NotBeEmpty)

stats, err := statsClient.GetGatewayConnectionStats(statsCtx, ids)
if !a.So(err, should.BeNil) {
Expand Down Expand Up @@ -1803,9 +1801,7 @@ func TestGatewayServer(t *testing.T) {

stats, paths := conn.Stats()
a.So(stats, should.NotBeNil)
if config.Stats != nil {
a.So(config.Stats.Set(conn.Context(), ids, stats, paths, 0), should.BeNil)
}
a.So(paths, should.NotBeEmpty)

stats, err = statsClient.GetGatewayConnectionStats(statsCtx, ids)
if !a.So(err, should.BeNil) {
Expand All @@ -1823,7 +1819,7 @@ func TestGatewayServer(t *testing.T) {
}

// Wait for disconnection to be processed.
time.Sleep(4 * config.ConnectionStatsDisconnectTTL)
time.Sleep(2 * config.ConnectionStatsDisconnectTTL)

// After canceling the context and awaiting the link, the connection should be gone.
t.Run("Disconnected", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gatewayserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (gs *GatewayServer) BatchGetGatewayConnectionStats(
}

if gs.statsRegistry != nil {
entries, err := gs.statsRegistry.BatchGet(ctx, req.GatewayIds, req.FieldMask.GetPaths())
entries, err := gs.statsRegistry.BatchGet(ctx, req.GatewayIds, req.FieldMask.GetPaths()...)
if err != nil {
return nil, err
}
Expand Down
79 changes: 62 additions & 17 deletions pkg/gatewayserver/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ type GatewayConnectionStatsRegistry struct {

// Init initializes the GatewayConnectionStatsRegistry.
func (r *GatewayConnectionStatsRegistry) Init(ctx context.Context) error {
if err := ttnredis.InitMutex(ctx, r.Redis); err != nil {
return err
}
return nil
return ttnredis.InitMutex(ctx, r.Redis)
}

func (r *GatewayConnectionStatsRegistry) key(uid string) string {
return r.Redis.Key("uid", uid)
}

// Set sets or clears the connection stats for a gateway.
func (r *GatewayConnectionStatsRegistry) Set(ctx context.Context, ids *ttnpb.GatewayIdentifiers, stats *ttnpb.GatewayConnectionStats, paths []string, ttl time.Duration) error {
func (r *GatewayConnectionStatsRegistry) Set(
ctx context.Context,
ids *ttnpb.GatewayIdentifiers,
f func(*ttnpb.GatewayConnectionStats) (*ttnpb.GatewayConnectionStats, []string, error),
ttl time.Duration,
gets ...string,
) error {
uid := unique.ID(ctx, ids)

lockerID, err := ttnredis.GenerateLockerID()
Expand All @@ -57,29 +60,71 @@ func (r *GatewayConnectionStatsRegistry) Set(ctx context.Context, ids *ttnpb.Gat
defer trace.StartRegion(ctx, "set gateway connection stats").End()

uk := r.key(uid)
if stats == nil {
err = r.Redis.Del(ctx, uk).Err()
} else {
err = ttnredis.LockedWatch(ctx, r.Redis, uk, lockerID, r.LockTTL, func(tx *redis.Tx) error {
pb := &ttnpb.GatewayConnectionStats{}
if err := ttnredis.GetProto(ctx, tx, uk).ScanProto(pb); err != nil && !errors.IsNotFound(err) {
err = ttnredis.LockedWatch(ctx, r.Redis, uk, lockerID, r.LockTTL, func(tx *redis.Tx) error {
stored := &ttnpb.GatewayConnectionStats{}
cmd := ttnredis.GetProto(ctx, tx, uk)
if err := cmd.ScanProto(stored); errors.IsNotFound(err) {
stored = nil
} else if err != nil {
return err
}

var pb *ttnpb.GatewayConnectionStats
if stored != nil {
pb = &ttnpb.GatewayConnectionStats{}
if err := cmd.ScanProto(pb); err != nil {
return err
}
if err := pb.SetFields(stats, paths...); err != nil {
if pb, err = applyGatewayConnectionStatsFieldMask(nil, pb, gets...); err != nil {
return err
}
_, err := ttnredis.SetProto(ctx, tx, uk, pb, ttl)
}

var sets []string
pb, sets, err = f(pb)
if err != nil {
return err
})
}
}
if stored == nil && pb == nil {
return nil
}
var pipelined func(redis.Pipeliner) error
if pb == nil {
pipelined = func(p redis.Pipeliner) error {
p.Del(ctx, uk)
return nil
}
} else {
updated := &ttnpb.GatewayConnectionStats{}
if stored != nil {
if err := cmd.ScanProto(updated); err != nil {
return err
}
}
if updated, err = applyGatewayConnectionStatsFieldMask(updated, pb, sets...); err != nil {
return err
}
if err := updated.ValidateFields(); err != nil {
return err
}
pipelined = func(p redis.Pipeliner) error {
_, err = ttnredis.SetProto(ctx, p, uk, updated, ttl)
return err
}
}
_, err = tx.TxPipelined(ctx, pipelined)
return err
})
if err != nil {
return ttnredis.ConvertError(err)
}
return nil
}

// Get returns the connection stats for a gateway.
func (r *GatewayConnectionStatsRegistry) Get(ctx context.Context, ids *ttnpb.GatewayIdentifiers) (*ttnpb.GatewayConnectionStats, error) {
func (r *GatewayConnectionStatsRegistry) Get(
ctx context.Context, ids *ttnpb.GatewayIdentifiers,
) (*ttnpb.GatewayConnectionStats, error) {
uid := unique.ID(ctx, ids)
result := &ttnpb.GatewayConnectionStats{}
if err := ttnredis.GetProto(ctx, r.Redis, r.key(uid)).ScanProto(result); err != nil {
Expand All @@ -104,7 +149,7 @@ func applyGatewayConnectionStatsFieldMask(
func (r *GatewayConnectionStatsRegistry) BatchGet(
ctx context.Context,
ids []*ttnpb.GatewayIdentifiers,
paths []string,
paths ...string,
) (map[string]*ttnpb.GatewayConnectionStats, error) {
ret := make(map[string]*ttnpb.GatewayConnectionStats, len(ids))
keys := make([]string, 0, len(ids))
Expand Down
Loading

0 comments on commit 1da8d60

Please sign in to comment.