Skip to content

Commit

Permalink
Make it possible to disable state retained flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed May 11, 2021
1 parent 08dc3c9 commit 68ed758
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 5 deletions.
15 changes: 11 additions & 4 deletions cmd/chirpstack-gateway-bridge/cmd/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,22 @@ marshaler="{{ .Integration.Marshaler }}"
# State topic template.
#
# States are sent by the gateway as retained MQTT messages so that the last
# message will be stored by the MQTT broker. When set to a blank string, this
# feature will be disabled. This feature is only supported when using the
# generic authentication type.
# States are sent by the gateway as retained MQTT messages (by default)
# so that the last message will be stored by the MQTT broker. When set to
# a blank string, this feature will be disabled. This feature is only
# supported when using the generic authentication type.
state_topic_template="{{ .Integration.MQTT.StateTopicTemplate }}"
# Command topic template.
command_topic_template="{{ .Integration.MQTT.CommandTopicTemplate }}"
# State retained.
#
# By default this value is set to true and states are published as retained
# MQTT messages. Setting this to false means that states will not be retained
# by the MQTT broker.
state_retained={{ .Integration.MQTT.StateRetained }}
# Keep alive will set the amount of time (in seconds) that the client should
# wait before sending a PING request to the broker. This will allow the client
# to know that a connection has not been lost with the server.
Expand Down
1 change: 1 addition & 0 deletions cmd/chirpstack-gateway-bridge/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func init() {
viper.SetDefault("integration.mqtt.event_topic_template", "gateway/{{ .GatewayID }}/event/{{ .EventType }}")
viper.SetDefault("integration.mqtt.state_topic_template", "gateway/{{ .GatewayID }}/state/{{ .StateType }}")
viper.SetDefault("integration.mqtt.command_topic_template", "gateway/{{ .GatewayID }}/command/#")
viper.SetDefault("integration.mqtt.state_retained", true)
viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second)
viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute)

Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
EventTopicTemplate string `mapstructure:"event_topic_template"`
CommandTopicTemplate string `mapstructure:"command_topic_template"`
StateTopicTemplate string `mapstructure:"state_topic_template"`
StateRetained bool `mapstructure:"state_retained"`
KeepAlive time.Duration `mapstructure:"keep_alive"`
MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval"`
TerminateOnConnectError bool `mapstructure:"terminate_on_connect_error"`
Expand Down
4 changes: 3 additions & 1 deletion internal/integration/mqtt/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Backend struct {
gatewaysSubscribedMux sync.Mutex
gatewaysSubscribed map[lorawan.EUI64]struct{}
terminateOnConnectError bool
stateRetained bool

qos uint8
eventTopicTemplate *template.Template
Expand All @@ -60,6 +61,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
clientOpts: paho.NewClientOptions(),
gateways: make(map[lorawan.EUI64]struct{}),
gatewaysSubscribed: make(map[lorawan.EUI64]struct{}),
stateRetained: conf.Integration.MQTT.StateRetained,
}

switch conf.Integration.MQTT.Auth.Type {
Expand Down Expand Up @@ -349,7 +351,7 @@ func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Me
"state": state,
"gateway_id": gatewayID,
}).Info("integration/mqtt: publishing state")
if token := b.conn.Publish(topic.String(), b.qos, true, bytes); token.Wait() && token.Error() != nil {
if token := b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
Expand Down
1 change: 1 addition & 0 deletions internal/integration/mqtt/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (ts *MQTTBackendTestSuite) SetupSuite() {
conf.Integration.MQTT.EventTopicTemplate = "gateway/{{ .GatewayID }}/event/{{ .EventType }}"
conf.Integration.MQTT.StateTopicTemplate = "gateway/{{ .GatewayID }}/state/{{ .StateType }}"
conf.Integration.MQTT.CommandTopicTemplate = "gateway/{{ .GatewayID }}/command/#"
conf.Integration.MQTT.StateRetained = true
conf.Integration.MQTT.Auth.Type = "generic"
conf.Integration.MQTT.Auth.Generic.Servers = []string{server}
conf.Integration.MQTT.Auth.Generic.Username = username
Expand Down

0 comments on commit 68ed758

Please sign in to comment.