From 365d06735514ccdba9ba71e005bd9c2150df191e Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Tue, 21 Jul 2020 11:52:40 +0200 Subject: [PATCH] basicstation: implement forwarding gateway statistics. Note that these stats are aggregated by the ChirpStack Gateway Bridge as these are not exposed by the BasicStation. --- .../cmd/configfile.go | 6 + cmd/chirpstack-gateway-bridge/cmd/root.go | 1 + docs/content/install/config.md | 7 +- internal/backend/basicstation/backend.go | 123 ++++++++++++++++-- internal/backend/basicstation/backend_test.go | 47 ++++++- internal/config/config.go | 15 ++- 6 files changed, 176 insertions(+), 23 deletions(-) diff --git a/cmd/chirpstack-gateway-bridge/cmd/configfile.go b/cmd/chirpstack-gateway-bridge/cmd/configfile.go index e33122fb..16f62b65 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/configfile.go +++ b/cmd/chirpstack-gateway-bridge/cmd/configfile.go @@ -124,6 +124,12 @@ type="{{ .Backend.Type }}" # certificate of the gateway has been signed by this CA certificate. ca_cert="{{ .Backend.BasicStation.CACert }}" + # Stats interval. + # + # This defines the interval in which the ChirpStack Gateway Bridge forwards + # the uplink / downlink statistics. + stats_interval="{{ .Backend.BasicStation.StatsInterval }}" + # Ping interval. ping_interval="{{ .Backend.BasicStation.PingInterval }}" diff --git a/cmd/chirpstack-gateway-bridge/cmd/root.go b/cmd/chirpstack-gateway-bridge/cmd/root.go index acd4e798..1e2d4ba5 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/root.go +++ b/cmd/chirpstack-gateway-bridge/cmd/root.go @@ -45,6 +45,7 @@ func init() { viper.SetDefault("backend.concentratord.command_url", "ipc:///tmp/concentratord_command") viper.SetDefault("backend.basic_station.bind", ":3001") + viper.SetDefault("backend.basic_station.stats_interval", time.Second*30) viper.SetDefault("backend.basic_station.ping_interval", time.Minute) viper.SetDefault("backend.basic_station.read_timeout", time.Minute+(5*time.Second)) viper.SetDefault("backend.basic_station.write_timeout", time.Second) diff --git a/docs/content/install/config.md b/docs/content/install/config.md index cf9f6df6..f05f80e3 100644 --- a/docs/content/install/config.md +++ b/docs/content/install/config.md @@ -142,7 +142,6 @@ type="semtech_udp" fake_rx_time=false - # ChirpStack Concentratord backend. [backend.concentratord] @@ -175,6 +174,12 @@ type="semtech_udp" # certificate of the gateway has been signed by this CA certificate. ca_cert="" + # Stats interval. + # + # This defines the interval in which the ChirpStack Gateway Bridge forwards + # the uplink / downlink statistics. + stats_interval="30s" + # Ping interval. ping_interval="1m0s" diff --git a/internal/backend/basicstation/backend.go b/internal/backend/basicstation/backend.go index 971d221b..8e6e8249 100644 --- a/internal/backend/basicstation/backend.go +++ b/internal/backend/basicstation/backend.go @@ -16,7 +16,9 @@ import ( "time" "github.com/gofrs/uuid" + "github.com/golang/protobuf/ptypes" "github.com/gorilla/websocket" + "github.com/patrickmn/go-cache" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -43,9 +45,10 @@ type Backend struct { scheme string isClosed bool - pingInterval time.Duration - readTimeout time.Duration - writeTimeout time.Duration + statsInterval time.Duration + pingInterval time.Duration + readTimeout time.Duration + writeTimeout time.Duration gateways gateways @@ -62,10 +65,11 @@ type Backend struct { frequencyMax uint32 routerConfig structs.RouterConfig - // diidMap stores the mapping of diid to UUIDs. This should take ~ 1MB of - // memory. Optionaly this could be optimized by letting keys expire after - // a given time. - diidMap map[uint16][]byte + // Cache to store stats. + statsCache *cache.Cache + + // Cache to store diid to UUIDs. + diidCache *cache.Cache } // NewBackend creates a new Backend. @@ -83,15 +87,17 @@ func NewBackend(conf config.Config) (*Backend, error) { gatewayStatsChan: make(chan gw.GatewayStats), rawPacketForwarderEventChan: make(chan gw.RawPacketForwarderEvent), - pingInterval: conf.Backend.BasicStation.PingInterval, - readTimeout: conf.Backend.BasicStation.ReadTimeout, - writeTimeout: conf.Backend.BasicStation.WriteTimeout, + statsInterval: conf.Backend.BasicStation.StatsInterval, + pingInterval: conf.Backend.BasicStation.PingInterval, + readTimeout: conf.Backend.BasicStation.ReadTimeout, + writeTimeout: conf.Backend.BasicStation.WriteTimeout, region: band.Name(conf.Backend.BasicStation.Region), frequencyMin: conf.Backend.BasicStation.FrequencyMin, frequencyMax: conf.Backend.BasicStation.FrequencyMax, - diidMap: make(map[uint16][]byte), + diidCache: cache.New(time.Minute, time.Minute), + statsCache: cache.New(conf.Backend.BasicStation.StatsInterval*2, conf.Backend.BasicStation.StatsInterval*2), } for _, n := range conf.Filters.NetIDs { @@ -238,8 +244,10 @@ func (b *Backend) SendDownlinkFrame(df gw.DownlinkFrame) error { copy(gatewayID[:], df.GetGatewayId()) copy(downID[:], df.GetDownlinkId()) + b.incrementTxStats(gatewayID) + // store token to UUID mapping - b.diidMap[uint16(df.Token)] = df.GetDownlinkId() + b.diidCache.SetDefault(fmt.Sprintf("%d", df.Token), df.GetDownlinkId()) websocketSendCounter("dnmsg").Inc() if err := b.sendToGateway(gatewayID, pl); err != nil { @@ -382,8 +390,11 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) { "remote_addr": r.RemoteAddr, }).Info("backend/basicstation: gateway connected") + done := make(chan struct{}) + // remove the gateway on return defer func() { + done <- struct{}{} b.gateways.remove(gatewayID) log.WithFields(log.Fields{ "gateway_id": gatewayID, @@ -391,6 +402,57 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) { }).Info("backend/basicstation: gateway disconnected") }() + statsTicker := time.NewTicker(b.statsInterval) + defer statsTicker.Stop() + + // stats publishing loop + go func() { + gwIDStr := gatewayID.String() + + for { + select { + case <-statsTicker.C: + id, err := uuid.NewV4() + if err != nil { + log.WithError(err).Error("backend/basicstation: new uuid error") + continue + } + + var rx, rxOK, tx, txOK uint32 + if v, ok := b.statsCache.Get(gwIDStr + ":rx"); ok { + rx = v.(uint32) + } + if v, ok := b.statsCache.Get(gwIDStr + ":rxOK"); ok { + rxOK = v.(uint32) + } + if v, ok := b.statsCache.Get(gwIDStr + ":tx"); ok { + tx = v.(uint32) + } + if v, ok := b.statsCache.Get(gwIDStr + ":txOK"); ok { + txOK = v.(uint32) + } + + b.statsCache.Delete(gwIDStr + ":rx") + b.statsCache.Delete(gwIDStr + ":rxOK") + b.statsCache.Delete(gwIDStr + ":tx") + b.statsCache.Delete(gwIDStr + ":txOK") + + b.gatewayStatsChan <- gw.GatewayStats{ + GatewayId: gatewayID[:], + Time: ptypes.TimestampNow(), + StatsId: id[:], + RxPacketsReceived: rx, + RxPacketsReceivedOk: rxOK, + TxPacketsReceived: tx, + TxPacketsEmitted: txOK, + } + case <-done: + return + } + } + + }() + // receive data for { mt, msg, err := c.ReadMessage() @@ -447,6 +509,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) { b.handleVersion(gatewayID, pl) case structs.UplinkDataFrameMessage: // handle uplink + b.incrementRxStats(gatewayID) var pl structs.UplinkDataFrame if err := json.Unmarshal(msg, &pl); err != nil { log.WithError(err).WithFields(log.Fields{ @@ -459,6 +522,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) { b.handleUplinkDataFrame(gatewayID, pl) case structs.JoinRequestMessage: // handle join-request + b.incrementRxStats(gatewayID) var pl structs.JoinRequest if err := json.Unmarshal(msg, &pl); err != nil { log.WithError(err).WithFields(log.Fields{ @@ -471,6 +535,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) { b.handleJoinRequest(gatewayID, pl) case structs.ProprietaryDataFrameMessage: // handle proprietary uplink + b.incrementRxStats(gatewayID) var pl structs.UplinkProprietaryFrame if err := json.Unmarshal(msg, &pl); err != nil { log.WithError(err).WithFields(log.Fields{ @@ -483,6 +548,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) { b.handleProprietaryDataFrame(gatewayID, pl) case structs.DownlinkTransmittedMessage: // handle downlink transmitted + b.incrementTxOkStats(gatewayID) var pl structs.DownlinkTransmitted if err := json.Unmarshal(msg, &pl); err != nil { log.WithError(err).WithFields(log.Fields{ @@ -584,7 +650,10 @@ func (b *Backend) handleDownlinkTransmittedMessage(gatewayID lorawan.EUI64, v st }).Error("backend/basicstation: error converting downlink transmitted to protobuf message") return } - txack.DownlinkId = b.diidMap[uint16(v.DIID)] + + if v, ok := b.diidCache.Get(fmt.Sprintf("%d", v.DIID)); ok { + txack.DownlinkId = v.([]byte) + } var downID uuid.UUID copy(downID[:], txack.GetDownlinkId()) @@ -723,3 +792,31 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *websocket.Conn), w handler(r, conn) done <- struct{}{} } + +func (b *Backend) incrementRxStats(id lorawan.EUI64) { + idStr := id.String() + + if _, err := b.statsCache.IncrementUint32(idStr+":rx", uint32(1)); err != nil { + b.statsCache.SetDefault(idStr+":rx", uint32(1)) + } + + if _, err := b.statsCache.IncrementUint32(idStr+":rxOK", uint32(1)); err != nil { + b.statsCache.SetDefault(idStr+":rxOK", uint32(1)) + } +} + +func (b *Backend) incrementTxOkStats(id lorawan.EUI64) { + idStr := id.String() + + if _, err := b.statsCache.IncrementUint32(idStr+"txOK", uint32(1)); err != nil { + b.statsCache.SetDefault(idStr+":txOK", uint32(1)) + } +} + +func (b *Backend) incrementTxStats(id lorawan.EUI64) { + idStr := id.String() + + if _, err := b.statsCache.IncrementUint32(idStr+"tx", uint32(1)); err != nil { + b.statsCache.SetDefault(idStr+":tx", uint32(1)) + } +} diff --git a/internal/backend/basicstation/backend_test.go b/internal/backend/basicstation/backend_test.go index e9af30d6..bf486599 100644 --- a/internal/backend/basicstation/backend_test.go +++ b/internal/backend/basicstation/backend_test.go @@ -44,6 +44,7 @@ func (ts *BackendTestSuite) SetupTest() { conf.Backend.BasicStation.Region = "EU868" conf.Backend.BasicStation.FrequencyMin = 867000000 conf.Backend.BasicStation.FrequencyMax = 869000000 + conf.Backend.BasicStation.StatsInterval = 30 * time.Second conf.Backend.BasicStation.PingInterval = time.Minute conf.Backend.BasicStation.ReadTimeout = 2 * time.Minute conf.Backend.BasicStation.WriteTimeout = time.Second @@ -168,6 +169,14 @@ func (ts *BackendTestSuite) TestUplinkDataFrame() { CrcStatus: gw.CRCStatus_CRC_OK, }, }, uplinkFrame) + + rx, ok := ts.backend.statsCache.Get("0102030405060708:rx") + assert.True(ok) + assert.Equal(uint32(1), rx) + + rxOK, ok := ts.backend.statsCache.Get("0102030405060708:rxOK") + assert.True(ok) + assert.Equal(uint32(1), rxOK) } func (ts *BackendTestSuite) TestJoinRequest() { @@ -221,6 +230,14 @@ func (ts *BackendTestSuite) TestJoinRequest() { CrcStatus: gw.CRCStatus_CRC_OK, }, }, uplinkFrame) + + rx, ok := ts.backend.statsCache.Get("0102030405060708:rx") + assert.True(ok) + assert.Equal(uint32(1), rx) + + rxOK, ok := ts.backend.statsCache.Get("0102030405060708:rxOK") + assert.True(ok) + assert.Equal(uint32(1), rxOK) } func (ts *BackendTestSuite) TestProprietaryDataFrame() { @@ -269,6 +286,14 @@ func (ts *BackendTestSuite) TestProprietaryDataFrame() { CrcStatus: gw.CRCStatus_CRC_OK, }, }, uplinkFrame) + + rx, ok := ts.backend.statsCache.Get("0102030405060708:rx") + assert.True(ok) + assert.Equal(uint32(1), rx) + + rxOK, ok := ts.backend.statsCache.Get("0102030405060708:rxOK") + assert.True(ok) + assert.Equal(uint32(1), rxOK) } func (ts *BackendTestSuite) TestDownlinkTransmitted() { @@ -276,7 +301,7 @@ func (ts *BackendTestSuite) TestDownlinkTransmitted() { id, err := uuid.NewV4() assert.NoError(err) - ts.backend.diidMap[12345] = id[:] + ts.backend.diidCache.SetDefault("12345", id[:]) dtx := structs.DownlinkTransmitted{ MessageType: structs.DownlinkTransmittedMessage, @@ -292,6 +317,14 @@ func (ts *BackendTestSuite) TestDownlinkTransmitted() { Token: 12345, DownlinkId: id[:], }, txAck) + + // this variable is not yet stored + _, ok := ts.backend.statsCache.Get("0102030405060708:tx") + assert.False(ok) + + txOK, ok := ts.backend.statsCache.Get("0102030405060708:txOK") + assert.True(ok) + assert.Equal(uint32(1), txOK) } func (ts *BackendTestSuite) TestSendDownlinkFrame() { @@ -331,7 +364,9 @@ func (ts *BackendTestSuite) TestSendDownlinkFrame() { }) assert.NoError(err) - assert.Equal(id[:], ts.backend.diidMap[1234]) + idResp, ok := ts.backend.diidCache.Get("1234") + assert.True(ok) + assert.Equal(id[:], idResp) var df structs.DownlinkFrame assert.NoError(ts.wsClient.ReadJSON(&df)) @@ -355,6 +390,14 @@ func (ts *BackendTestSuite) TestSendDownlinkFrame() { RX1DR: &dr2, RX1Freq: &freq, }, df) + + tx, ok := ts.backend.statsCache.Get("0102030405060708:tx") + assert.True(ok) + assert.Equal(uint32(1), tx) + + // this variable is not yet stored + _, ok = ts.backend.statsCache.Get("0102030405060708:txOK") + assert.False(ok) } func (ts *BackendTestSuite) TestRawPacketForwarderCommand() { diff --git a/internal/config/config.go b/internal/config/config.go index a54d9b53..259e0a6d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,13 +26,14 @@ type Config struct { } `mapstructure:"semtech_udp"` BasicStation struct { - Bind string `mapstructure:"bind"` - TLSCert string `mapstructure:"tls_cert"` - TLSKey string `mapstructure:"tls_key"` - CACert string `mapstructure:"ca_cert"` - PingInterval time.Duration `mapstructure:"ping_interval"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` + Bind string `mapstructure:"bind"` + TLSCert string `mapstructure:"tls_cert"` + TLSKey string `mapstructure:"tls_key"` + CACert string `mapstructure:"ca_cert"` + StatsInterval time.Duration `mapstructure:"stats_interval"` + PingInterval time.Duration `mapstructure:"ping_interval"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` // TODO: remove Filters in the next major release, use global filters instead Filters struct { NetIDs []string `mapstructure:"net_ids"`