Skip to content

Commit

Permalink
Re-subscribe to tx topics on re-connect.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Apr 16, 2016
1 parent f376e9b commit c646318
Showing 1 changed file with 41 additions and 1 deletion.
42 changes: 41 additions & 1 deletion backend/mqttpubsub/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mqttpubsub
import (
"encoding/json"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/brocaar/loraserver/models"
Expand All @@ -14,19 +16,23 @@ import (
type Backend struct {
conn mqtt.Client
txPacketChan chan models.TXPacket
gateways map[lorawan.EUI64]struct{}
mutex sync.RWMutex
}

// NewBackend creates a new Backend.
func NewBackend(server, username, password string) (*Backend, error) {
b := Backend{
txPacketChan: make(chan models.TXPacket),
gateways: make(map[lorawan.EUI64]struct{}),
}

opts := mqtt.NewClientOptions()
opts.AddBroker(server)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetClientID("lora-semtech-bridge")
opts.SetOnConnectHandler(b.onConnected)
opts.SetConnectionLostHandler(b.onConnectionLost)

log.WithField("server", server).Info("backend/mqttpubsub: connecting to MQTT server")
b.conn = mqtt.NewClient(opts)
Expand All @@ -50,22 +56,30 @@ func (b *Backend) TXPacketChan() chan models.TXPacket {
// SubscribeGatewayTX subscribes the backend to the gateway TXPacket
// topic (packets the gateway needs to transmit).
func (b *Backend) SubscribeGatewayTX(mac lorawan.EUI64) error {
defer b.mutex.Unlock()
b.mutex.Lock()

topic := fmt.Sprintf("gateway/%s/tx", mac.String())
log.WithField("topic", topic).Info("backend/mqttpubsub: subscribing to topic")
if token := b.conn.Subscribe(topic, 0, b.txPacketHandler); token.Wait() && token.Error() != nil {
return token.Error()
}
b.gateways[mac] = struct{}{}
return nil
}

// UnSubscribeGatewayTX unsubscribes the backend from the gateway TXPacket
// topic.
func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error {
defer b.mutex.Unlock()
b.mutex.Lock()

topic := fmt.Sprintf("gateway/%s/tx", mac.String())
log.WithField("topic", topic).Info("backend/mqttpubsub: unsubscribing from topic")
if token := b.conn.Unsubscribe(topic); token.Wait() && token.Error() != nil {
return token.Error()
}
delete(b.gateways, mac)
return nil
}

Expand Down Expand Up @@ -102,3 +116,29 @@ func (b *Backend) txPacketHandler(c mqtt.Client, msg mqtt.Message) {
}
b.txPacketChan <- txPacket
}

func (b *Backend) onConnected(c mqtt.Client) {
defer b.mutex.RUnlock()
b.mutex.RLock()

log.Info("backend/mqttpubsub: connected to mqtt server")
if len(b.gateways) > 0 {
for {
log.WithField("topic_count", len(b.gateways)).Info("Backend/mqttpubsub: re-registering to gateway topics")
topics := make(map[string]byte)
for k := range b.gateways {
topics[fmt.Sprintf("gateway/%s/tx", k)] = 0
}
if token := b.conn.SubscribeMultiple(topics, b.txPacketHandler); token.Wait() && token.Error() != nil {
log.WithField("topic_count", len(topics)).Errorf("backend/mqttpubsub: subscribe multiple failed: %s", token.Error())
time.Sleep(time.Second)
continue
}
return
}
}
}

func (b *Backend) onConnectionLost(c mqtt.Client, reason error) {
log.Errorf("backend/mqttpubsub: mqtt connection error: %s", reason)
}

0 comments on commit c646318

Please sign in to comment.