diff --git a/backend/mqttpubsub/backend.go b/backend/mqttpubsub/backend.go index 465ae593..4e67b7f4 100644 --- a/backend/mqttpubsub/backend.go +++ b/backend/mqttpubsub/backend.go @@ -3,6 +3,8 @@ package mqttpubsub import ( "encoding/json" "fmt" + "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/brocaar/loraserver/models" @@ -14,19 +16,23 @@ import ( type Backend struct { conn mqtt.Client txPacketChan chan models.TXPacket + gateways map[lorawan.EUI64]struct{} + mutex sync.RWMutex } // NewBackend creates a new Backend. func NewBackend(server, username, password string) (*Backend, error) { b := Backend{ txPacketChan: make(chan models.TXPacket), + gateways: make(map[lorawan.EUI64]struct{}), } opts := mqtt.NewClientOptions() opts.AddBroker(server) opts.SetUsername(username) opts.SetPassword(password) - opts.SetClientID("lora-semtech-bridge") + opts.SetOnConnectHandler(b.onConnected) + opts.SetConnectionLostHandler(b.onConnectionLost) log.WithField("server", server).Info("backend/mqttpubsub: connecting to MQTT server") b.conn = mqtt.NewClient(opts) @@ -50,22 +56,30 @@ func (b *Backend) TXPacketChan() chan models.TXPacket { // SubscribeGatewayTX subscribes the backend to the gateway TXPacket // topic (packets the gateway needs to transmit). func (b *Backend) SubscribeGatewayTX(mac lorawan.EUI64) error { + defer b.mutex.Unlock() + b.mutex.Lock() + topic := fmt.Sprintf("gateway/%s/tx", mac.String()) log.WithField("topic", topic).Info("backend/mqttpubsub: subscribing to topic") if token := b.conn.Subscribe(topic, 0, b.txPacketHandler); token.Wait() && token.Error() != nil { return token.Error() } + b.gateways[mac] = struct{}{} return nil } // UnSubscribeGatewayTX unsubscribes the backend from the gateway TXPacket // topic. func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error { + defer b.mutex.Unlock() + b.mutex.Lock() + topic := fmt.Sprintf("gateway/%s/tx", mac.String()) log.WithField("topic", topic).Info("backend/mqttpubsub: unsubscribing from topic") if token := b.conn.Unsubscribe(topic); token.Wait() && token.Error() != nil { return token.Error() } + delete(b.gateways, mac) return nil } @@ -102,3 +116,29 @@ func (b *Backend) txPacketHandler(c mqtt.Client, msg mqtt.Message) { } b.txPacketChan <- txPacket } + +func (b *Backend) onConnected(c mqtt.Client) { + defer b.mutex.RUnlock() + b.mutex.RLock() + + log.Info("backend/mqttpubsub: connected to mqtt server") + if len(b.gateways) > 0 { + for { + log.WithField("topic_count", len(b.gateways)).Info("Backend/mqttpubsub: re-registering to gateway topics") + topics := make(map[string]byte) + for k := range b.gateways { + topics[fmt.Sprintf("gateway/%s/tx", k)] = 0 + } + if token := b.conn.SubscribeMultiple(topics, b.txPacketHandler); token.Wait() && token.Error() != nil { + log.WithField("topic_count", len(topics)).Errorf("backend/mqttpubsub: subscribe multiple failed: %s", token.Error()) + time.Sleep(time.Second) + continue + } + return + } + } +} + +func (b *Backend) onConnectionLost(c mqtt.Client, reason error) { + log.Errorf("backend/mqttpubsub: mqtt connection error: %s", reason) +}