Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/implement autopaho rpc #98

Merged
merged 13 commits into from
Aug 26, 2022
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bin/
39 changes: 35 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
.PHONY: test unittest

unittest:
go test -race -tags=unittest ./autopaho/ -v -count 1
go test -coverprofile /tmp/autopaho_coverage.out -race -tags=unittest ./autopaho/ -v -count 1

test: unittest
go test -race ./packets/ -v -count 1
go test -race ./paho/ -v -count 1

go test -coverprofile /tmp/packets_coverage.out -race ./packets/ -v -count 1
go test -coverprofile /tmp/paho_coverage.out -race ./paho/ -v -count 1

cover:
go tool cover -func=/tmp/autopaho_coverage.out
go tool cover -func=/tmp/packets_coverage.out
go tool cover -func=/tmp/paho_coverage.out

cover_browser:
go tool cover -html=/tmp/autopaho_coverage.out
go tool cover -html=/tmp/packets_coverage.out
go tool cover -html=/tmp/paho_coverage.out

.PHONY: download
download:
go mod download

build_chat:
go build -o ./bin/chat ./paho/cmd/chat

build_rpc:
go build -o ./bin/rpc ./paho/cmd/rpc

build_rpc_cm:
go build -o ./bin/rpc_auto ./autopaho/cmd/rpc

build_pub:
go build -o ./bin/stdinpub ./paho/cmd/stdinpub

build_sub:
go build -o ./bin/stdoutsub ./paho/cmd/stdoutsub

.PHONY: build
build: build_chat build_rpc build_pub build_sub build_rpc_cm
10 changes: 10 additions & 0 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,13 @@ func (c *ConnectionManager) Publish(ctx context.Context, p *paho.Publish) (*paho
}
return cli.Publish(ctx, p)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exported functions need a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function has now been removed

func (c *ConnectionManager) UseClient(fn func(*paho.Client) error) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.cli == nil {
return ConnectionDownError
}
return fn(c.cli)
}
10 changes: 10 additions & 0 deletions autopaho/cmd/rpc/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export subdemo_serverURL=tcp://127.0.0.1:1883
export subdemo_clientID=mqtt_subscriber
export subdemo_topic=topic1
export subdemo_qos=1
export subdemo_keepAlive=30
export subdemo_connectRetryDelay=10000
export subdemo_writeToStdout="true"
export subdemo_writeToDisk="false"
export subdemo_OutputFile="/binds/receivedMessages.txt"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to leave the .env file in the PR? If so I don't think a default like this starting from / is appropriate, even if it's not used by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, I thought it may be useful for the RPC example. I've removed it now

export subdemo_debug="true"
193 changes: 193 additions & 0 deletions autopaho/cmd/rpc/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package main

import (
"context"
"fmt"
"log"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
)

// Retrieve config from environmental variables

// Configuration will be pulled from the environment using the following keys
const (
envServerURL = "subdemo_serverURL" // server URL
envClientID = "subdemo_clientID" // client id to connect with
envTopic = "subdemo_topic" // topic to publish on
envQos = "subdemo_qos" // qos to utilise when publishing

envKeepAlive = "subdemo_keepAlive" // seconds between keepalive packets
envConnectRetryDelay = "subdemo_connectRetryDelay" // milliseconds to delay between connection attempts

envWriteToStdOut = "subdemo_writeToStdout" // if "true" then received packets will be written stdout
envWriteToDisk = "subdemo_writeToDisk" // if "true" then received packets will be written to file
envOutputFile = "subdemo_OutputFile" // name of file to use if above is true

envDebug = "subdemo_debug" // if "true" then the libraries will be instructed to print debug info
)

// config holds the configuration
type config struct {
serverURL *url.URL // MQTT server URL
clientID string // Client ID to use when connecting to server
topic string // Topic on which to publish messaged
qos byte // QOS to use when publishing

keepAlive uint16 // seconds between keepalive packets
connectRetryDelay time.Duration // Period between connection attempts

writeToStdOut bool // If true received messages will be written to stdout
writeToDisk bool // if true received messages will be written to below file
outputFileName string // filename to save messages to

debug bool // autopaho and paho debug output requested
}

// getConfig - Retrieves the configuration from the environment
func getConfig() (config, error) {
var cfg config
var err error

srvURL, err := stringFromEnv(envServerURL)
if err != nil {
return config{}, err
}
cfg.serverURL, err = url.Parse(srvURL)
if err != nil {
return config{}, fmt.Errorf("environmental variable %s must be a valid URL (%w)", envServerURL, err)
}

if cfg.clientID, err = stringFromEnv(envClientID); err != nil {
return config{}, err
}
if cfg.topic, err = stringFromEnv(envTopic); err != nil {
return config{}, err
}

iQos, err := intFromEnv(envQos)
if err != nil {
return config{}, err
}
cfg.qos = byte(iQos)

iKa, err := intFromEnv(envKeepAlive)
if err != nil {
return config{}, err
}
cfg.keepAlive = uint16(iKa)

if cfg.connectRetryDelay, err = milliSecondsFromEnv(envConnectRetryDelay); err != nil {
return config{}, err
}

if cfg.writeToStdOut, err = booleanFromEnv(envWriteToStdOut); err != nil {
return config{}, err
}
if cfg.writeToDisk, err = booleanFromEnv(envWriteToDisk); err != nil {
return config{}, err
}
if cfg.outputFileName, err = stringFromEnv(envOutputFile); cfg.writeToDisk && err != nil {
return config{}, err
}

if cfg.debug, err = booleanFromEnv(envDebug); err != nil {
return config{}, err
}

return cfg, nil
}

func getCmConfig(cfg config) autopaho.ClientConfig {
return autopaho.ClientConfig{
BrokerUrls: []*url.URL{cfg.serverURL},
KeepAlive: cfg.keepAlive,
ConnectRetryDelay: cfg.connectRetryDelay,
ConnectTimeout: time.Duration(5 * time.Second),
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
cfg.topic: {QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
return
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
ClientID: cfg.clientID,
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
log.Printf("%v+", m)
}),
OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, d.Properties.ReasonString)
} else {
fmt.Printf("%s requested disconnect; reason code: %d\n", cfg.clientID, d.ReasonCode)
}
},
},
}
}

// stringFromEnv - Retrieves a string from the environment and ensures it is not blank (ort non-existent)
func stringFromEnv(key string) (string, error) {
s := os.Getenv(key)
if len(s) == 0 {
return "", fmt.Errorf("environmental variable %s must not be blank", key)
}
return s, nil
}

// intFromEnv - Retrieves an integer from the environment (must be present and valid)
func intFromEnv(key string) (int, error) {
s := os.Getenv(key)
if len(s) == 0 {
return 0, fmt.Errorf("environmental variable %s must not be blank", key)
}
i, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("environmental variable %s must be an integer", key)
}
return i, nil
}

// milliSecondsFromEnv - Retrieves milliseconds (as time.Duration) from the environment (must be present and valid)
func milliSecondsFromEnv(key string) (time.Duration, error) {
s := os.Getenv(key)
if len(s) == 0 {
return 0, fmt.Errorf("environmental variable %s must not be blank", key)
}
i, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("environmental variable %s must be an integer", key)
}
return time.Duration(i) * time.Millisecond, nil
}

// booleanFromEnv - Retrieves boolean from the environment (must be present and valid)
func booleanFromEnv(key string) (bool, error) {
s := os.Getenv(key)
if len(s) == 0 {
return false, fmt.Errorf("environmental variable %s must not be blank", key)
}
switch strings.ToUpper(s) {
case "TRUE", "T", "1":
return true, nil
case "FALSE", "F", "0":
return false, nil
default:
return false, fmt.Errorf("environmental variable %s be a valid boolean option (is %s)", key, s)
}
}
Loading