Skip to content

Commit

Permalink
Implement additional gateway stats aggregations.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Aug 4, 2021
1 parent bbb0d54 commit e39633f
Show file tree
Hide file tree
Showing 10 changed files with 689 additions and 124 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/brocaar/chirpstack-gateway-bridge
go 1.16

require (
github.com/brocaar/chirpstack-api/go/v3 v3.10.1
github.com/brocaar/chirpstack-api/go/v3 v3.11.0
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/eclipse/paho.mqtt.golang v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI=
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI=
github.com/brocaar/chirpstack-api/go/v3 v3.10.1 h1:ijf3AKDw++aSPJE41lkiPxXtshxZF++cXzV2l9e7EYk=
github.com/brocaar/chirpstack-api/go/v3 v3.10.1/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
github.com/brocaar/chirpstack-api/go/v3 v3.11.0 h1:RV4DVNRlDqilrv7ttqg4REdTxFyQ8F8ETmymUgpC/cI=
github.com/brocaar/chirpstack-api/go/v3 v3.11.0/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303 h1:LkE19tFPfDaRh1HIKWLCZKSBZNonMu0rIOJPCLvEjC0=
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303/go.mod h1:CciUmQHIpUYTHHMeICtyamM7d+47VV+WBZ5ReDozpoc=
github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw=
Expand Down
101 changes: 29 additions & 72 deletions internal/backend/basicstation/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/brocaar/chirpstack-api/go/v3/gw"
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/basicstation/structs"
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/events"
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/stats"
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
"github.com/brocaar/lorawan"
"github.com/brocaar/lorawan/band"
Expand Down Expand Up @@ -71,9 +72,6 @@ type Backend struct {
frequencyMax uint32
routerConfig structs.RouterConfig

// Cache to store stats.
statsCache *cache.Cache

// Cache to store diid to UUIDs.
diidCache *cache.Cache
}
Expand All @@ -100,8 +98,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
frequencyMin: conf.Backend.BasicStation.FrequencyMin,
frequencyMax: conf.Backend.BasicStation.FrequencyMax,

diidCache: cache.New(time.Minute, time.Minute),
statsCache: cache.New(conf.Backend.BasicStation.StatsInterval*2, conf.Backend.BasicStation.StatsInterval*2),
diidCache: cache.New(time.Minute, time.Minute),
}

for _, n := range conf.Filters.NetIDs {
Expand Down Expand Up @@ -226,10 +223,8 @@ func (b *Backend) SendDownlinkFrame(df gw.DownlinkFrame) error {
copy(gatewayID[:], df.GetGatewayId())
copy(downID[:], df.GetDownlinkId())

b.incrementTxStats(gatewayID)

// store token to UUID mapping
b.diidCache.SetDefault(fmt.Sprintf("%d", df.Token), df.GetDownlinkId())
// Store downlink under DIID in cache
b.diidCache.SetDefault(fmt.Sprintf("%d", pl.DIID), df)

websocketSendCounter("dnmsg").Inc()
if err := b.sendToGateway(gatewayID, pl); err != nil {
Expand Down Expand Up @@ -419,8 +414,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {

// stats publishing loop
go func() {
gwIDStr := gatewayID.String()

for {
select {
case <-statsTicker.C:
Expand All @@ -430,35 +423,13 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
continue
}

var rx, rxOK, tx, txOK uint32
if v, ok := b.statsCache.Get(gwIDStr + ":rx"); ok {
rx = v.(uint32)
}
if v, ok := b.statsCache.Get(gwIDStr + ":rxOK"); ok {
rxOK = v.(uint32)
}
if v, ok := b.statsCache.Get(gwIDStr + ":tx"); ok {
tx = v.(uint32)
}
if v, ok := b.statsCache.Get(gwIDStr + ":txOK"); ok {
txOK = v.(uint32)
}

b.statsCache.Delete(gwIDStr + ":rx")
b.statsCache.Delete(gwIDStr + ":rxOK")
b.statsCache.Delete(gwIDStr + ":tx")
b.statsCache.Delete(gwIDStr + ":txOK")
stats := conn.stats.ExportStats()
stats.GatewayId = gatewayID[:]
stats.Time = ptypes.TimestampNow()
stats.StatsId = id[:]

if b.gatewayStatsFunc != nil {
b.gatewayStatsFunc(gw.GatewayStats{
GatewayId: gatewayID[:],
Time: ptypes.TimestampNow(),
StatsId: id[:],
RxPacketsReceived: rx,
RxPacketsReceivedOk: rxOK,
TxPacketsReceived: tx,
TxPacketsEmitted: txOK,
})
b.gatewayStatsFunc(stats)
}
case <-done:
return
Expand Down Expand Up @@ -523,7 +494,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
b.handleVersion(gatewayID, pl)
case structs.UplinkDataFrameMessage:
// handle uplink
b.incrementRxStats(gatewayID)
var pl structs.UplinkDataFrame
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand All @@ -536,7 +506,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
b.handleUplinkDataFrame(gatewayID, pl)
case structs.JoinRequestMessage:
// handle join-request
b.incrementRxStats(gatewayID)
var pl structs.JoinRequest
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand All @@ -549,7 +518,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
b.handleJoinRequest(gatewayID, pl)
case structs.ProprietaryDataFrameMessage:
// handle proprietary uplink
b.incrementRxStats(gatewayID)
var pl structs.UplinkProprietaryFrame
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand All @@ -562,7 +530,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
b.handleProprietaryDataFrame(gatewayID, pl)
case structs.DownlinkTransmittedMessage:
// handle downlink transmitted
b.incrementTxOkStats(gatewayID)
var pl structs.DownlinkTransmitted
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand Down Expand Up @@ -630,6 +597,10 @@ func (b *Backend) handleJoinRequest(gatewayID lorawan.EUI64, v structs.JoinReque
}
uplinkFrame.RxInfo.UplinkId = uplinkID[:]

if conn, err := b.gateways.get(gatewayID); err == nil {
conn.stats.CountUplink(&uplinkFrame)
}

log.WithFields(log.Fields{
"gateway_id": gatewayID,
"uplink_id": uplinkID,
Expand Down Expand Up @@ -659,6 +630,10 @@ func (b *Backend) handleProprietaryDataFrame(gatewayID lorawan.EUI64, v structs.
}
uplinkFrame.RxInfo.UplinkId = uplinkID[:]

if conn, err := b.gateways.get(gatewayID); err == nil {
conn.stats.CountUplink(&uplinkFrame)
}

log.WithFields(log.Fields{
"gateway_id": gatewayID,
"uplink_id": uplinkID,
Expand All @@ -682,7 +657,12 @@ func (b *Backend) handleDownlinkTransmittedMessage(gatewayID lorawan.EUI64, v st
}

if v, ok := b.diidCache.Get(fmt.Sprintf("%d", v.DIID)); ok {
txack.DownlinkId = v.([]byte)
pl := v.(gw.DownlinkFrame)
txack.DownlinkId = pl.DownlinkId

if conn, err := b.gateways.get(gatewayID); err == nil {
conn.stats.CountDownlink(&pl, &txack)
}
}

var downID uuid.UUID
Expand Down Expand Up @@ -717,6 +697,11 @@ func (b *Backend) handleUplinkDataFrame(gatewayID lorawan.EUI64, v structs.Uplin
}
uplinkFrame.RxInfo.UplinkId = uplinkID[:]

// count metrics
if conn, err := b.gateways.get(gatewayID); err == nil {
conn.stats.CountUplink(&uplinkFrame)
}

log.WithFields(log.Fields{
"gateway_id": gatewayID,
"uplink_id": uplinkID,
Expand Down Expand Up @@ -835,7 +820,7 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *connection), w http

// Wrap the conn inside a gateway struct, so that we can lock it when writing
// data.
c := connection{conn: conn}
c := connection{conn: conn, stats: stats.NewCollector()}

go func() {
for {
Expand All @@ -858,31 +843,3 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *connection), w http
handler(r, &c)
done <- struct{}{}
}

func (b *Backend) incrementRxStats(id lorawan.EUI64) {
idStr := id.String()

if _, err := b.statsCache.IncrementUint32(idStr+":rx", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":rx", uint32(1))
}

if _, err := b.statsCache.IncrementUint32(idStr+":rxOK", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":rxOK", uint32(1))
}
}

func (b *Backend) incrementTxOkStats(id lorawan.EUI64) {
idStr := id.String()

if _, err := b.statsCache.IncrementUint32(idStr+"txOK", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":txOK", uint32(1))
}
}

func (b *Backend) incrementTxStats(id lorawan.EUI64) {
idStr := id.String()

if _, err := b.statsCache.IncrementUint32(idStr+"tx", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":tx", uint32(1))
}
}
Loading

0 comments on commit e39633f

Please sign in to comment.