From 68ed758faef4410e279e6997e55cb4f84f3bfcc6 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Tue, 11 May 2021 10:50:13 +0100 Subject: [PATCH] Make it possible to disable state retained flag. --- cmd/chirpstack-gateway-bridge/cmd/configfile.go | 15 +++++++++++---- cmd/chirpstack-gateway-bridge/cmd/root.go | 1 + internal/config/config.go | 1 + internal/integration/mqtt/backend.go | 4 +++- internal/integration/mqtt/backend_test.go | 1 + 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/cmd/chirpstack-gateway-bridge/cmd/configfile.go b/cmd/chirpstack-gateway-bridge/cmd/configfile.go index cf8851c0..96e4906b 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/configfile.go +++ b/cmd/chirpstack-gateway-bridge/cmd/configfile.go @@ -223,15 +223,22 @@ marshaler="{{ .Integration.Marshaler }}" # State topic template. # - # States are sent by the gateway as retained MQTT messages so that the last - # message will be stored by the MQTT broker. When set to a blank string, this - # feature will be disabled. This feature is only supported when using the - # generic authentication type. + # States are sent by the gateway as retained MQTT messages (by default) + # so that the last message will be stored by the MQTT broker. When set to + # a blank string, this feature will be disabled. This feature is only + # supported when using the generic authentication type. state_topic_template="{{ .Integration.MQTT.StateTopicTemplate }}" # Command topic template. command_topic_template="{{ .Integration.MQTT.CommandTopicTemplate }}" + # State retained. + # + # By default this value is set to true and states are published as retained + # MQTT messages. Setting this to false means that states will not be retained + # by the MQTT broker. + state_retained={{ .Integration.MQTT.StateRetained }} + # Keep alive will set the amount of time (in seconds) that the client should # wait before sending a PING request to the broker. This will allow the client # to know that a connection has not been lost with the server. diff --git a/cmd/chirpstack-gateway-bridge/cmd/root.go b/cmd/chirpstack-gateway-bridge/cmd/root.go index d2e1472d..f5603f4d 100644 --- a/cmd/chirpstack-gateway-bridge/cmd/root.go +++ b/cmd/chirpstack-gateway-bridge/cmd/root.go @@ -59,6 +59,7 @@ func init() { viper.SetDefault("integration.mqtt.event_topic_template", "gateway/{{ .GatewayID }}/event/{{ .EventType }}") viper.SetDefault("integration.mqtt.state_topic_template", "gateway/{{ .GatewayID }}/state/{{ .StateType }}") viper.SetDefault("integration.mqtt.command_topic_template", "gateway/{{ .GatewayID }}/command/#") + viper.SetDefault("integration.mqtt.state_retained", true) viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second) viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute) diff --git a/internal/config/config.go b/internal/config/config.go index 39a3bbca..6eb064f1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -59,6 +59,7 @@ type Config struct { EventTopicTemplate string `mapstructure:"event_topic_template"` CommandTopicTemplate string `mapstructure:"command_topic_template"` StateTopicTemplate string `mapstructure:"state_topic_template"` + StateRetained bool `mapstructure:"state_retained"` KeepAlive time.Duration `mapstructure:"keep_alive"` MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval"` TerminateOnConnectError bool `mapstructure:"terminate_on_connect_error"` diff --git a/internal/integration/mqtt/backend.go b/internal/integration/mqtt/backend.go index dd0ca21b..0a8a759b 100644 --- a/internal/integration/mqtt/backend.go +++ b/internal/integration/mqtt/backend.go @@ -40,6 +40,7 @@ type Backend struct { gatewaysSubscribedMux sync.Mutex gatewaysSubscribed map[lorawan.EUI64]struct{} terminateOnConnectError bool + stateRetained bool qos uint8 eventTopicTemplate *template.Template @@ -60,6 +61,7 @@ func NewBackend(conf config.Config) (*Backend, error) { clientOpts: paho.NewClientOptions(), gateways: make(map[lorawan.EUI64]struct{}), gatewaysSubscribed: make(map[lorawan.EUI64]struct{}), + stateRetained: conf.Integration.MQTT.StateRetained, } switch conf.Integration.MQTT.Auth.Type { @@ -349,7 +351,7 @@ func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Me "state": state, "gateway_id": gatewayID, }).Info("integration/mqtt: publishing state") - if token := b.conn.Publish(topic.String(), b.qos, true, bytes); token.Wait() && token.Error() != nil { + if token := b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes); token.Wait() && token.Error() != nil { return token.Error() } return nil diff --git a/internal/integration/mqtt/backend_test.go b/internal/integration/mqtt/backend_test.go index 8e35d64c..ef0380ef 100644 --- a/internal/integration/mqtt/backend_test.go +++ b/internal/integration/mqtt/backend_test.go @@ -57,6 +57,7 @@ func (ts *MQTTBackendTestSuite) SetupSuite() { conf.Integration.MQTT.EventTopicTemplate = "gateway/{{ .GatewayID }}/event/{{ .EventType }}" conf.Integration.MQTT.StateTopicTemplate = "gateway/{{ .GatewayID }}/state/{{ .StateType }}" conf.Integration.MQTT.CommandTopicTemplate = "gateway/{{ .GatewayID }}/command/#" + conf.Integration.MQTT.StateRetained = true conf.Integration.MQTT.Auth.Type = "generic" conf.Integration.MQTT.Auth.Generic.Servers = []string{server} conf.Integration.MQTT.Auth.Generic.Username = username