diff --git a/cmd/chirpstack-gateway-bridge/cmd/configfile.go b/cmd/chirpstack-gateway-bridge/cmd/configfile.go index a5958e14..cf8851c0 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/configfile.go +++ b/cmd/chirpstack-gateway-bridge/cmd/configfile.go @@ -221,6 +221,14 @@ marshaler="{{ .Integration.Marshaler }}" # Event topic template. event_topic_template="{{ .Integration.MQTT.EventTopicTemplate }}" + # State topic template. + # + # States are sent by the gateway as retained MQTT messages so that the last + # message will be stored by the MQTT broker. When set to a blank string, this + # feature will be disabled. This feature is only supported when using the + # generic authentication type. + state_topic_template="{{ .Integration.MQTT.StateTopicTemplate }}" + # Command topic template. command_topic_template="{{ .Integration.MQTT.CommandTopicTemplate }}" diff --git a/cmd/chirpstack-gateway-bridge/cmd/root.go b/cmd/chirpstack-gateway-bridge/cmd/root.go index f4b44fb1..d2e1472d 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/root.go +++ b/cmd/chirpstack-gateway-bridge/cmd/root.go @@ -57,6 +57,7 @@ func init() { viper.SetDefault("integration.mqtt.auth.type", "generic") viper.SetDefault("integration.mqtt.event_topic_template", "gateway/{{ .GatewayID }}/event/{{ .EventType }}") + viper.SetDefault("integration.mqtt.state_topic_template", "gateway/{{ .GatewayID }}/state/{{ .StateType }}") viper.SetDefault("integration.mqtt.command_topic_template", "gateway/{{ .GatewayID }}/command/#") viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second) viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute) diff --git a/cmd/chirpstack-gateway-bridge/cmd/root_run.go b/cmd/chirpstack-gateway-bridge/cmd/root_run.go index 07529f7b..370f2ac1 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/root_run.go +++ b/cmd/chirpstack-gateway-bridge/cmd/root_run.go @@ -47,6 +47,8 @@ func run(cmd *cobra.Command, args []string) error { log.WithField("signal", <-sigChan).Info("signal received") log.Warning("shutting down server") + integration.GetIntegration().Stop() + return nil } diff --git a/go.mod b/go.mod index 2604d7d3..7d4c3d56 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/brocaar/chirpstack-gateway-bridge go 1.16 require ( - github.com/brocaar/chirpstack-api/go/v3 v3.8.1 + github.com/brocaar/chirpstack-api/go/v3 v3.9.7 github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/eclipse/paho.mqtt.golang v1.3.0 diff --git a/go.sum b/go.sum index d529ab79..523f6e68 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI= github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI= -github.com/brocaar/chirpstack-api/go/v3 v3.8.1 h1:8xNpG/GZqiL8XYkkAUWYNZJu7yn5SamK6oPBx1hCQe0= -github.com/brocaar/chirpstack-api/go/v3 v3.8.1/go.mod h1:ex/wqXQaClwDMa2zDN6crp9ZiMGc1GMVQhjxiB+OJcg= +github.com/brocaar/chirpstack-api/go/v3 v3.9.7 h1:n5Zte6zIg+qbqtb4dwp3vGQwIXpXsk5nMR4WwMUcLgA= +github.com/brocaar/chirpstack-api/go/v3 v3.9.7/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00= github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303 h1:LkE19tFPfDaRh1HIKWLCZKSBZNonMu0rIOJPCLvEjC0= github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303/go.mod h1:CciUmQHIpUYTHHMeICtyamM7d+47VV+WBZ5ReDozpoc= github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw= diff --git a/internal/config/config.go b/internal/config/config.go index aa173de7..39a3bbca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -58,6 +58,7 @@ type Config struct { MQTT struct { EventTopicTemplate string `mapstructure:"event_topic_template"` CommandTopicTemplate string `mapstructure:"command_topic_template"` + StateTopicTemplate string `mapstructure:"state_topic_template"` KeepAlive time.Duration `mapstructure:"keep_alive"` MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval"` TerminateOnConnectError bool `mapstructure:"terminate_on_connect_error"` diff --git a/internal/integration/integration.go b/internal/integration/integration.go index 4b1f7885..2d3de1d8 100644 --- a/internal/integration/integration.go +++ b/internal/integration/integration.go @@ -47,6 +47,9 @@ type Integration interface { // PublishEvent publishes the given event. PublishEvent(lorawan.EUI64, string, uuid.UUID, proto.Message) error + // PublishState publishes the given state as retained message. + PublishState(lorawan.EUI64, string, proto.Message) error + // SetDownlinkFrameFunc sets the DownlinkFrame handler func. SetDownlinkFrameFunc(func(gw.DownlinkFrame)) diff --git a/internal/integration/mqtt/auth/auth.go b/internal/integration/mqtt/auth/auth.go index 8fd95c3c..91c57539 100644 --- a/internal/integration/mqtt/auth/auth.go +++ b/internal/integration/mqtt/auth/auth.go @@ -8,6 +8,8 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/pkg/errors" + + "github.com/brocaar/lorawan" ) // Authentication defines the authentication interface. @@ -15,6 +17,9 @@ type Authentication interface { // Init applies the initial configuration. Init(*mqtt.ClientOptions) error + // GetGatewayID returns the GatewayID if available. + GetGatewayID() *lorawan.EUI64 + // Update updates the authentication options. Update(*mqtt.ClientOptions) error diff --git a/internal/integration/mqtt/auth/azure_iot_hub.go b/internal/integration/mqtt/auth/azure_iot_hub.go index d4b466b6..c75e79dd 100644 --- a/internal/integration/mqtt/auth/azure_iot_hub.go +++ b/internal/integration/mqtt/auth/azure_iot_hub.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/brocaar/chirpstack-gateway-bridge/internal/config" + "github.com/brocaar/lorawan" ) // See: @@ -201,6 +202,12 @@ func (a *AzureIoTHubAuthentication) Init(opts *mqtt.ClientOptions) error { return nil } +// GetGatewayID returns the GatewayID if available. +// TODO: implement. +func (a *AzureIoTHubAuthentication) GetGatewayID() *lorawan.EUI64 { + return nil +} + // Update updates the authentication options. func (a *AzureIoTHubAuthentication) Update(opts *mqtt.ClientOptions) error { if a.authType == authTypeSymmetric { diff --git a/internal/integration/mqtt/auth/gcp_cloud_iot_core.go b/internal/integration/mqtt/auth/gcp_cloud_iot_core.go index 98cb461b..76e6014b 100644 --- a/internal/integration/mqtt/auth/gcp_cloud_iot_core.go +++ b/internal/integration/mqtt/auth/gcp_cloud_iot_core.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/brocaar/chirpstack-gateway-bridge/internal/config" + "github.com/brocaar/lorawan" ) // GCPCloudIoTCoreAuthentication implements the Google Cloud IoT Core authentication. @@ -59,6 +60,12 @@ func (a *GCPCloudIoTCoreAuthentication) Init(opts *mqtt.ClientOptions) error { return nil } +// GetGatewayID returns the GatewayID if available. +// TODO: implement. +func (a *GCPCloudIoTCoreAuthentication) GetGatewayID() *lorawan.EUI64 { + return nil +} + // Update updates the authentication options. func (a *GCPCloudIoTCoreAuthentication) Update(opts *mqtt.ClientOptions) error { token := jwt.NewWithClaims(a.siginingMethod, jwt.StandardClaims{ diff --git a/internal/integration/mqtt/auth/generic.go b/internal/integration/mqtt/auth/generic.go index 6bc06b9f..a9da23bb 100644 --- a/internal/integration/mqtt/auth/generic.go +++ b/internal/integration/mqtt/auth/generic.go @@ -6,8 +6,10 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/brocaar/chirpstack-gateway-bridge/internal/config" + "github.com/brocaar/lorawan" ) // GenericAuthentication implements a generic MQTT authentication. @@ -59,6 +61,24 @@ func (a *GenericAuthentication) Init(opts *mqtt.ClientOptions) error { return nil } +// GetGatewayID returns the GatewayID if available. +func (a *GenericAuthentication) GetGatewayID() *lorawan.EUI64 { + if a.clientID == "" { + return nil + } + + // Try to decode the client ID as gateway ID. + var gatewayID lorawan.EUI64 + if err := gatewayID.UnmarshalText([]byte(a.clientID)); err != nil { + log.WithError(err).WithFields(log.Fields{ + "client_id": a.clientID, + }).Warning("integration/mqtt/auth: could not decode client ID to gateway ID") + return nil + } + + return &gatewayID +} + // Update updates the authentication options. func (a *GenericAuthentication) Update(opts *mqtt.ClientOptions) error { return nil diff --git a/internal/integration/mqtt/auth/generic_test.go b/internal/integration/mqtt/auth/generic_test.go new file mode 100644 index 00000000..21c4fdd4 --- /dev/null +++ b/internal/integration/mqtt/auth/generic_test.go @@ -0,0 +1,37 @@ +package auth + +import ( + "testing" + + "github.com/brocaar/chirpstack-gateway-bridge/internal/config" + "github.com/brocaar/lorawan" + "github.com/stretchr/testify/require" +) + +func TestGenericAuthentication(t *testing.T) { + gatewayID := lorawan.EUI64{1, 2, 3, 4, 5, 6, 7, 8} + + var conf config.Config + conf.Integration.Marshaler = "json" + conf.Integration.MQTT.EventTopicTemplate = "gateway/{{ .GatewayID }}/event/{{ .EventType }}" + conf.Integration.MQTT.StateTopicTemplate = "gateway/{{ .GatewayID }}/state/{{ .StateType }}" + conf.Integration.MQTT.CommandTopicTemplate = "gateway/{{ .GatewayID }}/command/#" + conf.Integration.MQTT.Auth.Type = "generic" + conf.Integration.MQTT.Auth.Generic.Servers = []string{"tcp://localhost:1883"} + conf.Integration.MQTT.Auth.Generic.Username = "foo" + conf.Integration.MQTT.Auth.Generic.Password = "bar" + conf.Integration.MQTT.Auth.Generic.CleanSession = true + conf.Integration.MQTT.Auth.Generic.ClientID = gatewayID.String() + + t.Run("New", func(t *testing.T) { + assert := require.New(t) + + auth, err := NewGenericAuthentication(conf) + assert.NoError(err) + + t.Run("GetGatewayID", func(t *testing.T) { + assert := require.New(t) + assert.Equal(&gatewayID, auth.GetGatewayID()) + }) + }) +} diff --git a/internal/integration/mqtt/backend.go b/internal/integration/mqtt/backend.go index 1a892b04..dd0ca21b 100644 --- a/internal/integration/mqtt/backend.go +++ b/internal/integration/mqtt/backend.go @@ -43,6 +43,7 @@ type Backend struct { qos uint8 eventTopicTemplate *template.Template + stateTopicTemplate *template.Template commandTopicTemplate *template.Template marshal func(msg proto.Message) ([]byte, error) @@ -75,6 +76,7 @@ func NewBackend(conf config.Config) (*Backend, error) { conf.Integration.MQTT.EventTopicTemplate = "/devices/gw-{{ .GatewayID }}/events/{{ .EventType }}" conf.Integration.MQTT.CommandTopicTemplate = "/devices/gw-{{ .GatewayID }}/commands/#" + conf.Integration.MQTT.StateTopicTemplate = "" case "azure_iot_hub": b.auth, err = auth.NewAzureIoTHubAuthentication(conf) if err != nil { @@ -83,6 +85,7 @@ func NewBackend(conf config.Config) (*Backend, error) { conf.Integration.MQTT.EventTopicTemplate = "devices/{{ .GatewayID }}/messages/events/{{ .EventType }}" conf.Integration.MQTT.CommandTopicTemplate = "devices/{{ .GatewayID }}/messages/devicebound/#" + conf.Integration.MQTT.StateTopicTemplate = "" default: return nil, fmt.Errorf("integration/mqtt: unknown auth type: %s", conf.Integration.MQTT.Auth.Type) } @@ -121,6 +124,13 @@ func NewBackend(conf config.Config) (*Backend, error) { return nil, errors.Wrap(err, "integration/mqtt: parse event-topic template error") } + if conf.Integration.MQTT.StateTopicTemplate != "" { + b.stateTopicTemplate, err = template.New("state").Parse(conf.Integration.MQTT.StateTopicTemplate) + if err != nil { + return nil, errors.Wrap(err, "integration/mqtt: parse state-topic template error") + } + } + b.commandTopicTemplate, err = template.New("event").Parse(conf.Integration.MQTT.CommandTopicTemplate) if err != nil { return nil, errors.Wrap(err, "integration/mqtt: parse event-topic template error") @@ -137,6 +147,43 @@ func NewBackend(conf config.Config) (*Backend, error) { return nil, errors.Wrap(err, "mqtt: init authentication error") } + if gatewayID := b.auth.GetGatewayID(); gatewayID != nil { + log.WithFields(log.Fields{ + "gateway_id": gatewayID, + }).Info("integration/mqtt: gateway id provided by authentication method") + + // Add GatewayID to list of gateways we must subscribe to. + b.gateways[*gatewayID] = struct{}{} + + // As we know the Gateway ID and a state topic has been configured, we set + // the last will and testament. + if b.stateTopicTemplate != nil { + pl := gw.ConnState{ + GatewayId: gatewayID[:], + State: gw.ConnState_OFFLINE, + } + bb, err := b.marshal(&pl) + if err != nil { + return nil, errors.Wrap(err, "marshal error") + } + + topic := bytes.NewBuffer(nil) + if err := b.stateTopicTemplate.Execute(topic, struct { + GatewayID lorawan.EUI64 + StateType string + }{*gatewayID, "conn"}); err != nil { + return nil, errors.Wrap(err, "execute state template error") + } + + log.WithFields(log.Fields{ + "gateway_id": gatewayID, + "topic": topic.String(), + }).Info("integration/mqtt: setting last will and testament") + + b.clientOpts.SetBinaryWill(topic.String(), bb, b.qos, true) + } + } + return &b, nil } @@ -153,6 +200,20 @@ func (b *Backend) Stop() error { b.connMux.Lock() defer b.connMux.Unlock() + b.gatewaysMux.Lock() + defer b.gatewaysMux.Unlock() + + // Set gateway state to offline for all gateways. + for gatewayID := range b.gateways { + pl := gw.ConnState{ + GatewayId: gatewayID[:], + State: gw.ConnState_OFFLINE, + } + if err := b.PublishState(gatewayID, "conn", &pl); err != nil { + log.WithError(err).Error("integration/mqtt: publish state error") + } + } + b.conn.Disconnect(250) b.connClosed = true return nil @@ -183,6 +244,16 @@ func (b *Backend) SetRawPacketForwarderCommandFunc(f func(gw.RawPacketForwarderC // race conditions in case of connection issues. This way, the gateways map // always reflect the desired state. func (b *Backend) SetGatewaySubscription(subscribe bool, gatewayID lorawan.EUI64) error { + // In this case we don't want to (un)subscribe as the Gateway ID is provided by + // the authentication and is set before connect. + if id := b.auth.GetGatewayID(); id != nil && *id == gatewayID { + log.WithFields(log.Fields{ + "gateway_id": gatewayID, + "subscribe": subscribe, + }).Debug("integration/mqtt: ignoring SetGatewaySubscription as gateway id is set by authentication") + return nil + } + log.WithFields(log.Fields{ "gateway_id": gatewayID, "subscribe": subscribe, @@ -242,11 +313,48 @@ func (b *Backend) PublishEvent(gatewayID lorawan.EUI64, event string, id uuid.UU "exec": "exec_", "raw": "raw_", } - return b.publish(gatewayID, event, log.Fields{ + return b.publishEvent(gatewayID, event, log.Fields{ idPrefix[event] + "id": id, }, v) } +// PublishState publishes the given state as retained message. +func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Message) error { + if b.stateTopicTemplate == nil { + log.WithFields(log.Fields{ + "state": state, + "gateway_id": gatewayID, + }).Debug("integration/mqtt: ignoring publish state, no state_topic_template configured") + return nil + } + + mqttStateCounter(state).Inc() + + topic := bytes.NewBuffer(nil) + if err := b.stateTopicTemplate.Execute(topic, struct { + GatewayID lorawan.EUI64 + StateType string + }{gatewayID, state}); err != nil { + return errors.Wrap(err, "execute state template error") + } + + bytes, err := b.marshal(v) + if err != nil { + return errors.Wrap(err, "marshal message error") + } + + log.WithFields(log.Fields{ + "topic": topic.String(), + "qos": b.qos, + "state": state, + "gateway_id": gatewayID, + }).Info("integration/mqtt: publishing state") + if token := b.conn.Publish(topic.String(), b.qos, true, bytes); token.Wait() && token.Error() != nil { + return token.Error() + } + return nil +} + func (b *Backend) connect() error { b.connMux.Lock() defer b.connMux.Unlock() @@ -293,13 +401,10 @@ func (b *Backend) disconnect() error { func (b *Backend) reconnectLoop() { if b.auth.ReconnectAfter() > 0 { for { - b.connMux.RLock() - closed := b.connClosed - b.connMux.RUnlock() - - if closed { + if b.isClosed() { break } + time.Sleep(b.auth.ReconnectAfter()) log.Info("mqtt: re-connect triggered") @@ -329,13 +434,16 @@ func (b *Backend) onConnected(c paho.Client) { func (b *Backend) subscribeLoop() { for { - b.connMux.RLock() - closed := b.connClosed - b.connMux.RUnlock() - if closed { + time.Sleep(time.Millisecond * 100) + + if b.isClosed() { break } + if !b.conn.IsConnected() { + continue + } + var subscribe []lorawan.EUI64 var unsubscribe []lorawan.EUI64 @@ -361,23 +469,40 @@ func (b *Backend) subscribeLoop() { b.gatewaysMux.RUnlock() for _, gatewayID := range subscribe { + statePL := gw.ConnState{ + GatewayId: gatewayID[:], + State: gw.ConnState_ONLINE, + } + if err := b.subscribeGateway(gatewayID); err != nil { log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: subscribe gateway error") } else { - b.gatewaysSubscribed[gatewayID] = struct{}{} + if err := b.PublishState(gatewayID, "conn", &statePL); err != nil { + log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: publish conn state error") + } else { + b.gatewaysSubscribed[gatewayID] = struct{}{} + } } } for _, gatewayID := range unsubscribe { + statePL := gw.ConnState{ + GatewayId: gatewayID[:], + State: gw.ConnState_OFFLINE, + } + if err := b.unsubscribeGateway(gatewayID); err != nil { log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: unsubscribe gateway error") } else { - delete(b.gatewaysSubscribed, gatewayID) + if err := b.PublishState(gatewayID, "conn", &statePL); err != nil { + log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: publish conn state error") + } else { + delete(b.gatewaysSubscribed, gatewayID) + } } } b.gatewaysSubscribedMux.Unlock() - time.Sleep(time.Millisecond * 100) } } @@ -513,7 +638,7 @@ func (b *Backend) handleCommand(c paho.Client, msg paho.Message) { } } -func (b *Backend) publish(gatewayID lorawan.EUI64, event string, fields log.Fields, msg proto.Message) error { +func (b *Backend) publishEvent(gatewayID lorawan.EUI64, event string, fields log.Fields, msg proto.Message) error { topic := bytes.NewBuffer(nil) if err := b.eventTopicTemplate.Execute(topic, struct { GatewayID lorawan.EUI64 @@ -537,3 +662,10 @@ func (b *Backend) publish(gatewayID lorawan.EUI64, event string, fields log.Fiel } return nil } + +// isClosed returns true when the integration is shutting down. +func (b *Backend) isClosed() bool { + b.connMux.RLock() + defer b.connMux.RUnlock() + return b.connClosed +} diff --git a/internal/integration/mqtt/backend_test.go b/internal/integration/mqtt/backend_test.go index 5a6b8c8c..8e35d64c 100644 --- a/internal/integration/mqtt/backend_test.go +++ b/internal/integration/mqtt/backend_test.go @@ -55,19 +55,23 @@ func (ts *MQTTBackendTestSuite) SetupSuite() { var conf config.Config conf.Integration.Marshaler = "json" conf.Integration.MQTT.EventTopicTemplate = "gateway/{{ .GatewayID }}/event/{{ .EventType }}" + conf.Integration.MQTT.StateTopicTemplate = "gateway/{{ .GatewayID }}/state/{{ .StateType }}" conf.Integration.MQTT.CommandTopicTemplate = "gateway/{{ .GatewayID }}/command/#" conf.Integration.MQTT.Auth.Type = "generic" conf.Integration.MQTT.Auth.Generic.Servers = []string{server} conf.Integration.MQTT.Auth.Generic.Username = username conf.Integration.MQTT.Auth.Generic.Password = password conf.Integration.MQTT.Auth.Generic.CleanSession = true + conf.Integration.MQTT.Auth.Generic.ClientID = ts.gatewayID.String() var err error ts.backend, err = NewBackend(conf) assert.NoError(err) assert.NoError(ts.backend.Start()) - assert.NoError(ts.backend.SetGatewaySubscription(true, ts.gatewayID)) - time.Sleep(100 * time.Millisecond) + + // The subscribe loop runs every 100ms, we will wait twice the time to make + // sure the subscription is set. + time.Sleep(200 * time.Millisecond) } func (ts *MQTTBackendTestSuite) TearDownSuite() { @@ -75,22 +79,81 @@ func (ts *MQTTBackendTestSuite) TearDownSuite() { ts.backend.Stop() } +func (ts *MQTTBackendTestSuite) TestLastWill() { + assert := require.New(ts.T()) + + assert.True(ts.backend.clientOpts.WillEnabled) + assert.Equal("gateway/0807060504030201/state/conn", ts.backend.clientOpts.WillTopic) + assert.Equal(`{"gatewayID":"CAcGBQQDAgE=","state":"OFFLINE"}`, string(ts.backend.clientOpts.WillPayload)) + assert.True(ts.backend.clientOpts.WillRetained) +} + +func (ts *MQTTBackendTestSuite) TestConnStateOnline() { + assert := require.New(ts.T()) + + connStateChan := make(chan gw.ConnState) + token := ts.mqttClient.Subscribe("gateway/0807060504030201/state/conn", 0, func(c paho.Client, msg paho.Message) { + var pl gw.ConnState + assert.NoError(ts.backend.unmarshal(msg.Payload(), &pl)) + connStateChan <- pl + }) + token.Wait() + assert.NoError(token.Error()) + + assert.Equal(gw.ConnState{ + GatewayId: ts.gatewayID[:], + State: gw.ConnState_ONLINE, + }, <-connStateChan) + + token = ts.mqttClient.Unsubscribe("gateway/0807060504030201/state/conn") + token.Wait() + assert.NoError(token.Error()) +} + func (ts *MQTTBackendTestSuite) TestSubscribeGateway() { assert := require.New(ts.T()) gatewayID := lorawan.EUI64{1, 2, 3, 4, 5, 6, 7, 8} + connStateChan := make(chan gw.ConnState) assert.NoError(ts.backend.SetGatewaySubscription(true, gatewayID)) _, ok := ts.backend.gateways[gatewayID] assert.True(ok) + // Wait 200ms to make sure that the subscribe loop has picked up the + // change and set the ConnState. If we subscribe too early, it is + // possible that we get an (old) OFFLINE retained message. + time.Sleep(200 * time.Millisecond) + + token := ts.mqttClient.Subscribe("gateway/0102030405060708/state/conn", 0, func(c paho.Client, msg paho.Message) { + var pl gw.ConnState + assert.NoError(ts.backend.unmarshal(msg.Payload(), &pl)) + connStateChan <- pl + }) + token.Wait() + assert.NoError(token.Error()) + + assert.Equal(gw.ConnState{ + GatewayId: gatewayID[:], + State: gw.ConnState_ONLINE, + }, <-connStateChan) + ts.T().Run("Unsubscribe", func(t *testing.T) { assert := require.New(t) assert.NoError(ts.backend.SetGatewaySubscription(false, gatewayID)) _, ok := ts.backend.gateways[gatewayID] assert.False(ok) + + assert.Equal(gw.ConnState{ + GatewayId: gatewayID[:], + State: gw.ConnState_OFFLINE, + }, <-connStateChan) }) + + token = ts.mqttClient.Unsubscribe("gateway/0102030405060708/state/conn") + token.Wait() + assert.NoError(token.Error()) } func (ts *MQTTBackendTestSuite) TestPublishUplinkFrame() { @@ -169,10 +232,38 @@ func (ts *MQTTBackendTestSuite) TestPublishDownlinkTXAck() { assert.NoError(token.Error()) assert.NoError(ts.backend.PublishEvent(ts.gatewayID, "ack", id, &txAck)) + txAckReceived := <-txAckChan assert.Equal(txAck, txAckReceived) } +func (ts *MQTTBackendTestSuite) TestPublishConnState() { + assert := require.New(ts.T()) + + // We publish first + state := gw.ConnState{ + GatewayId: ts.gatewayID[:], + State: gw.ConnState_ONLINE, + } + assert.NoError(ts.backend.PublishState(ts.gatewayID, "conn", &state)) + + // And then subscribe to test that the message has been retained + stateChan := make(chan gw.ConnState) + token := ts.mqttClient.Subscribe("gateway/0807060504030201/state/conn", 0, func(c paho.Client, msg paho.Message) { + var pl gw.ConnState + assert.NoError(ts.backend.unmarshal(msg.Payload(), &pl)) + stateChan <- pl + }) + token.Wait() + assert.NoError(token.Error()) + + assert.Equal(state, <-stateChan) + + token = ts.mqttClient.Unsubscribe("gateway/0807060504030201/state/conn") + token.Wait() + assert.NoError(token.Error()) +} + func (ts *MQTTBackendTestSuite) TestDownlinkFrameHandler() { assert := require.New(ts.T()) downlinkFrameChan := make(chan gw.DownlinkFrame, 1) diff --git a/internal/integration/mqtt/metrics.go b/internal/integration/mqtt/metrics.go index 7360d252..18fd6387 100644 --- a/internal/integration/mqtt/metrics.go +++ b/internal/integration/mqtt/metrics.go @@ -11,6 +11,11 @@ var ( Help: "The number of gateway events published by the MQTT integration (per event).", }, []string{"event"}) + sc = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "integration_mqtt_state_count", + Help: "The number of gateway states published by the MQTT integration (per state).", + }, []string{"state"}) + cc = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "integration_mqtt_command_count", Help: "The number of commands received by the MQTT integration (per command).", @@ -36,6 +41,10 @@ func mqttEventCounter(e string) prometheus.Counter { return pc.With(prometheus.Labels{"event": e}) } +func mqttStateCounter(s string) prometheus.Counter { + return sc.With(prometheus.Labels{"state": s}) +} + func mqttCommandCounter(c string) prometheus.Counter { return cc.With(prometheus.Labels{"command": c}) }