Skip to content

Commit

Permalink
basicstation: implement forwarding gateway statistics.
Browse files Browse the repository at this point in the history
Note that these stats are aggregated by the ChirpStack Gateway Bridge as
these are not exposed by the BasicStation.
  • Loading branch information
brocaar committed Jul 21, 2020
1 parent 5d4c0d5 commit 365d067
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 23 deletions.
6 changes: 6 additions & 0 deletions cmd/chirpstack-gateway-bridge/cmd/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type="{{ .Backend.Type }}"
# certificate of the gateway has been signed by this CA certificate.
ca_cert="{{ .Backend.BasicStation.CACert }}"
# Stats interval.
#
# This defines the interval in which the ChirpStack Gateway Bridge forwards
# the uplink / downlink statistics.
stats_interval="{{ .Backend.BasicStation.StatsInterval }}"
# Ping interval.
ping_interval="{{ .Backend.BasicStation.PingInterval }}"
Expand Down
1 change: 1 addition & 0 deletions cmd/chirpstack-gateway-bridge/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
viper.SetDefault("backend.concentratord.command_url", "ipc:///tmp/concentratord_command")

viper.SetDefault("backend.basic_station.bind", ":3001")
viper.SetDefault("backend.basic_station.stats_interval", time.Second*30)
viper.SetDefault("backend.basic_station.ping_interval", time.Minute)
viper.SetDefault("backend.basic_station.read_timeout", time.Minute+(5*time.Second))
viper.SetDefault("backend.basic_station.write_timeout", time.Second)
Expand Down
7 changes: 6 additions & 1 deletion docs/content/install/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ type="semtech_udp"
fake_rx_time=false



# ChirpStack Concentratord backend.
[backend.concentratord]

Expand Down Expand Up @@ -175,6 +174,12 @@ type="semtech_udp"
# certificate of the gateway has been signed by this CA certificate.
ca_cert=""

# Stats interval.
#
# This defines the interval in which the ChirpStack Gateway Bridge forwards
# the uplink / downlink statistics.
stats_interval="30s"

# Ping interval.
ping_interval="1m0s"

Expand Down
123 changes: 110 additions & 13 deletions internal/backend/basicstation/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"time"

"github.com/gofrs/uuid"
"github.com/golang/protobuf/ptypes"
"github.com/gorilla/websocket"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

Expand All @@ -43,9 +45,10 @@ type Backend struct {
scheme string
isClosed bool

pingInterval time.Duration
readTimeout time.Duration
writeTimeout time.Duration
statsInterval time.Duration
pingInterval time.Duration
readTimeout time.Duration
writeTimeout time.Duration

gateways gateways

Expand All @@ -62,10 +65,11 @@ type Backend struct {
frequencyMax uint32
routerConfig structs.RouterConfig

// diidMap stores the mapping of diid to UUIDs. This should take ~ 1MB of
// memory. Optionaly this could be optimized by letting keys expire after
// a given time.
diidMap map[uint16][]byte
// Cache to store stats.
statsCache *cache.Cache

// Cache to store diid to UUIDs.
diidCache *cache.Cache
}

// NewBackend creates a new Backend.
Expand All @@ -83,15 +87,17 @@ func NewBackend(conf config.Config) (*Backend, error) {
gatewayStatsChan: make(chan gw.GatewayStats),
rawPacketForwarderEventChan: make(chan gw.RawPacketForwarderEvent),

pingInterval: conf.Backend.BasicStation.PingInterval,
readTimeout: conf.Backend.BasicStation.ReadTimeout,
writeTimeout: conf.Backend.BasicStation.WriteTimeout,
statsInterval: conf.Backend.BasicStation.StatsInterval,
pingInterval: conf.Backend.BasicStation.PingInterval,
readTimeout: conf.Backend.BasicStation.ReadTimeout,
writeTimeout: conf.Backend.BasicStation.WriteTimeout,

region: band.Name(conf.Backend.BasicStation.Region),
frequencyMin: conf.Backend.BasicStation.FrequencyMin,
frequencyMax: conf.Backend.BasicStation.FrequencyMax,

diidMap: make(map[uint16][]byte),
diidCache: cache.New(time.Minute, time.Minute),
statsCache: cache.New(conf.Backend.BasicStation.StatsInterval*2, conf.Backend.BasicStation.StatsInterval*2),
}

for _, n := range conf.Filters.NetIDs {
Expand Down Expand Up @@ -238,8 +244,10 @@ func (b *Backend) SendDownlinkFrame(df gw.DownlinkFrame) error {
copy(gatewayID[:], df.GetGatewayId())
copy(downID[:], df.GetDownlinkId())

b.incrementTxStats(gatewayID)

// store token to UUID mapping
b.diidMap[uint16(df.Token)] = df.GetDownlinkId()
b.diidCache.SetDefault(fmt.Sprintf("%d", df.Token), df.GetDownlinkId())

websocketSendCounter("dnmsg").Inc()
if err := b.sendToGateway(gatewayID, pl); err != nil {
Expand Down Expand Up @@ -382,15 +390,69 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
"remote_addr": r.RemoteAddr,
}).Info("backend/basicstation: gateway connected")

done := make(chan struct{})

// remove the gateway on return
defer func() {
done <- struct{}{}
b.gateways.remove(gatewayID)
log.WithFields(log.Fields{
"gateway_id": gatewayID,
"remote_addr": r.RemoteAddr,
}).Info("backend/basicstation: gateway disconnected")
}()

statsTicker := time.NewTicker(b.statsInterval)
defer statsTicker.Stop()

// stats publishing loop
go func() {
gwIDStr := gatewayID.String()

for {
select {
case <-statsTicker.C:
id, err := uuid.NewV4()
if err != nil {
log.WithError(err).Error("backend/basicstation: new uuid error")
continue
}

var rx, rxOK, tx, txOK uint32
if v, ok := b.statsCache.Get(gwIDStr + ":rx"); ok {
rx = v.(uint32)
}
if v, ok := b.statsCache.Get(gwIDStr + ":rxOK"); ok {
rxOK = v.(uint32)
}
if v, ok := b.statsCache.Get(gwIDStr + ":tx"); ok {
tx = v.(uint32)
}
if v, ok := b.statsCache.Get(gwIDStr + ":txOK"); ok {
txOK = v.(uint32)
}

b.statsCache.Delete(gwIDStr + ":rx")
b.statsCache.Delete(gwIDStr + ":rxOK")
b.statsCache.Delete(gwIDStr + ":tx")
b.statsCache.Delete(gwIDStr + ":txOK")

b.gatewayStatsChan <- gw.GatewayStats{
GatewayId: gatewayID[:],
Time: ptypes.TimestampNow(),
StatsId: id[:],
RxPacketsReceived: rx,
RxPacketsReceivedOk: rxOK,
TxPacketsReceived: tx,
TxPacketsEmitted: txOK,
}
case <-done:
return
}
}

}()

// receive data
for {
mt, msg, err := c.ReadMessage()
Expand Down Expand Up @@ -447,6 +509,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
b.handleVersion(gatewayID, pl)
case structs.UplinkDataFrameMessage:
// handle uplink
b.incrementRxStats(gatewayID)
var pl structs.UplinkDataFrame
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand All @@ -459,6 +522,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
b.handleUplinkDataFrame(gatewayID, pl)
case structs.JoinRequestMessage:
// handle join-request
b.incrementRxStats(gatewayID)
var pl structs.JoinRequest
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand All @@ -471,6 +535,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
b.handleJoinRequest(gatewayID, pl)
case structs.ProprietaryDataFrameMessage:
// handle proprietary uplink
b.incrementRxStats(gatewayID)
var pl structs.UplinkProprietaryFrame
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand All @@ -483,6 +548,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
b.handleProprietaryDataFrame(gatewayID, pl)
case structs.DownlinkTransmittedMessage:
// handle downlink transmitted
b.incrementTxOkStats(gatewayID)
var pl structs.DownlinkTransmitted
if err := json.Unmarshal(msg, &pl); err != nil {
log.WithError(err).WithFields(log.Fields{
Expand Down Expand Up @@ -584,7 +650,10 @@ func (b *Backend) handleDownlinkTransmittedMessage(gatewayID lorawan.EUI64, v st
}).Error("backend/basicstation: error converting downlink transmitted to protobuf message")
return
}
txack.DownlinkId = b.diidMap[uint16(v.DIID)]

if v, ok := b.diidCache.Get(fmt.Sprintf("%d", v.DIID)); ok {
txack.DownlinkId = v.([]byte)
}

var downID uuid.UUID
copy(downID[:], txack.GetDownlinkId())
Expand Down Expand Up @@ -723,3 +792,31 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *websocket.Conn), w
handler(r, conn)
done <- struct{}{}
}

func (b *Backend) incrementRxStats(id lorawan.EUI64) {
idStr := id.String()

if _, err := b.statsCache.IncrementUint32(idStr+":rx", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":rx", uint32(1))
}

if _, err := b.statsCache.IncrementUint32(idStr+":rxOK", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":rxOK", uint32(1))
}
}

func (b *Backend) incrementTxOkStats(id lorawan.EUI64) {
idStr := id.String()

if _, err := b.statsCache.IncrementUint32(idStr+"txOK", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":txOK", uint32(1))
}
}

func (b *Backend) incrementTxStats(id lorawan.EUI64) {
idStr := id.String()

if _, err := b.statsCache.IncrementUint32(idStr+"tx", uint32(1)); err != nil {
b.statsCache.SetDefault(idStr+":tx", uint32(1))
}
}
47 changes: 45 additions & 2 deletions internal/backend/basicstation/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (ts *BackendTestSuite) SetupTest() {
conf.Backend.BasicStation.Region = "EU868"
conf.Backend.BasicStation.FrequencyMin = 867000000
conf.Backend.BasicStation.FrequencyMax = 869000000
conf.Backend.BasicStation.StatsInterval = 30 * time.Second
conf.Backend.BasicStation.PingInterval = time.Minute
conf.Backend.BasicStation.ReadTimeout = 2 * time.Minute
conf.Backend.BasicStation.WriteTimeout = time.Second
Expand Down Expand Up @@ -168,6 +169,14 @@ func (ts *BackendTestSuite) TestUplinkDataFrame() {
CrcStatus: gw.CRCStatus_CRC_OK,
},
}, uplinkFrame)

rx, ok := ts.backend.statsCache.Get("0102030405060708:rx")
assert.True(ok)
assert.Equal(uint32(1), rx)

rxOK, ok := ts.backend.statsCache.Get("0102030405060708:rxOK")
assert.True(ok)
assert.Equal(uint32(1), rxOK)
}

func (ts *BackendTestSuite) TestJoinRequest() {
Expand Down Expand Up @@ -221,6 +230,14 @@ func (ts *BackendTestSuite) TestJoinRequest() {
CrcStatus: gw.CRCStatus_CRC_OK,
},
}, uplinkFrame)

rx, ok := ts.backend.statsCache.Get("0102030405060708:rx")
assert.True(ok)
assert.Equal(uint32(1), rx)

rxOK, ok := ts.backend.statsCache.Get("0102030405060708:rxOK")
assert.True(ok)
assert.Equal(uint32(1), rxOK)
}

func (ts *BackendTestSuite) TestProprietaryDataFrame() {
Expand Down Expand Up @@ -269,14 +286,22 @@ func (ts *BackendTestSuite) TestProprietaryDataFrame() {
CrcStatus: gw.CRCStatus_CRC_OK,
},
}, uplinkFrame)

rx, ok := ts.backend.statsCache.Get("0102030405060708:rx")
assert.True(ok)
assert.Equal(uint32(1), rx)

rxOK, ok := ts.backend.statsCache.Get("0102030405060708:rxOK")
assert.True(ok)
assert.Equal(uint32(1), rxOK)
}

func (ts *BackendTestSuite) TestDownlinkTransmitted() {
assert := require.New(ts.T())
id, err := uuid.NewV4()
assert.NoError(err)

ts.backend.diidMap[12345] = id[:]
ts.backend.diidCache.SetDefault("12345", id[:])

dtx := structs.DownlinkTransmitted{
MessageType: structs.DownlinkTransmittedMessage,
Expand All @@ -292,6 +317,14 @@ func (ts *BackendTestSuite) TestDownlinkTransmitted() {
Token: 12345,
DownlinkId: id[:],
}, txAck)

// this variable is not yet stored
_, ok := ts.backend.statsCache.Get("0102030405060708:tx")
assert.False(ok)

txOK, ok := ts.backend.statsCache.Get("0102030405060708:txOK")
assert.True(ok)
assert.Equal(uint32(1), txOK)
}

func (ts *BackendTestSuite) TestSendDownlinkFrame() {
Expand Down Expand Up @@ -331,7 +364,9 @@ func (ts *BackendTestSuite) TestSendDownlinkFrame() {
})
assert.NoError(err)

assert.Equal(id[:], ts.backend.diidMap[1234])
idResp, ok := ts.backend.diidCache.Get("1234")
assert.True(ok)
assert.Equal(id[:], idResp)

var df structs.DownlinkFrame
assert.NoError(ts.wsClient.ReadJSON(&df))
Expand All @@ -355,6 +390,14 @@ func (ts *BackendTestSuite) TestSendDownlinkFrame() {
RX1DR: &dr2,
RX1Freq: &freq,
}, df)

tx, ok := ts.backend.statsCache.Get("0102030405060708:tx")
assert.True(ok)
assert.Equal(uint32(1), tx)

// this variable is not yet stored
_, ok = ts.backend.statsCache.Get("0102030405060708:txOK")
assert.False(ok)
}

func (ts *BackendTestSuite) TestRawPacketForwarderCommand() {
Expand Down
15 changes: 8 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type Config struct {
} `mapstructure:"semtech_udp"`

BasicStation struct {
Bind string `mapstructure:"bind"`
TLSCert string `mapstructure:"tls_cert"`
TLSKey string `mapstructure:"tls_key"`
CACert string `mapstructure:"ca_cert"`
PingInterval time.Duration `mapstructure:"ping_interval"`
ReadTimeout time.Duration `mapstructure:"read_timeout"`
WriteTimeout time.Duration `mapstructure:"write_timeout"`
Bind string `mapstructure:"bind"`
TLSCert string `mapstructure:"tls_cert"`
TLSKey string `mapstructure:"tls_key"`
CACert string `mapstructure:"ca_cert"`
StatsInterval time.Duration `mapstructure:"stats_interval"`
PingInterval time.Duration `mapstructure:"ping_interval"`
ReadTimeout time.Duration `mapstructure:"read_timeout"`
WriteTimeout time.Duration `mapstructure:"write_timeout"`
// TODO: remove Filters in the next major release, use global filters instead
Filters struct {
NetIDs []string `mapstructure:"net_ids"`
Expand Down

0 comments on commit 365d067

Please sign in to comment.