-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/implement autopaho rpc #98
Changes from 5 commits
2db2542
e4fda94
e818d6a
6eedf91
8780381
e66bbf6
1b134ad
10f9f48
e095ad9
5549e39
3d51531
ace041a
c650e72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
bin/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,40 @@ | ||
.PHONY: test unittest | ||
|
||
unittest: | ||
go test -race -tags=unittest ./autopaho/ -v -count 1 | ||
go test -coverprofile /tmp/autopaho_coverage.out -race -tags=unittest ./autopaho/ -v -count 1 | ||
|
||
test: unittest | ||
go test -race ./packets/ -v -count 1 | ||
go test -race ./paho/ -v -count 1 | ||
|
||
go test -coverprofile /tmp/packets_coverage.out -race ./packets/ -v -count 1 | ||
go test -coverprofile /tmp/paho_coverage.out -race ./paho/ -v -count 1 | ||
|
||
cover: | ||
go tool cover -func=/tmp/autopaho_coverage.out | ||
go tool cover -func=/tmp/packets_coverage.out | ||
go tool cover -func=/tmp/paho_coverage.out | ||
|
||
cover_browser: | ||
go tool cover -html=/tmp/autopaho_coverage.out | ||
go tool cover -html=/tmp/packets_coverage.out | ||
go tool cover -html=/tmp/paho_coverage.out | ||
|
||
.PHONY: download | ||
download: | ||
go mod download | ||
|
||
build_chat: | ||
go build -o ./bin/chat ./paho/cmd/chat | ||
|
||
build_rpc: | ||
go build -o ./bin/rpc ./paho/cmd/rpc | ||
|
||
build_rpc_cm: | ||
go build -o ./bin/rpc_auto ./autopaho/cmd/rpc | ||
|
||
build_pub: | ||
go build -o ./bin/stdinpub ./paho/cmd/stdinpub | ||
|
||
build_sub: | ||
go build -o ./bin/stdoutsub ./paho/cmd/stdoutsub | ||
|
||
.PHONY: build | ||
build: build_chat build_rpc build_pub build_sub build_rpc_cm |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
export subdemo_serverURL=tcp://127.0.0.1:1883 | ||
export subdemo_clientID=mqtt_subscriber | ||
export subdemo_topic=topic1 | ||
export subdemo_qos=1 | ||
export subdemo_keepAlive=30 | ||
export subdemo_connectRetryDelay=10000 | ||
export subdemo_writeToStdout="true" | ||
export subdemo_writeToDisk="false" | ||
export subdemo_OutputFile="/binds/receivedMessages.txt" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean to leave the .env file in the PR? If so I don't think a default like this starting from / is appropriate, even if it's not used by default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did, I thought it may be useful for the RPC example. I've removed it now |
||
export subdemo_debug="true" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"net/url" | ||
"os" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/eclipse/paho.golang/autopaho" | ||
"github.com/eclipse/paho.golang/paho" | ||
) | ||
|
||
// Retrieve config from environmental variables | ||
|
||
// Configuration will be pulled from the environment using the following keys | ||
const ( | ||
envServerURL = "subdemo_serverURL" // server URL | ||
envClientID = "subdemo_clientID" // client id to connect with | ||
envTopic = "subdemo_topic" // topic to publish on | ||
envQos = "subdemo_qos" // qos to utilise when publishing | ||
|
||
envKeepAlive = "subdemo_keepAlive" // seconds between keepalive packets | ||
envConnectRetryDelay = "subdemo_connectRetryDelay" // milliseconds to delay between connection attempts | ||
|
||
envWriteToStdOut = "subdemo_writeToStdout" // if "true" then received packets will be written stdout | ||
envWriteToDisk = "subdemo_writeToDisk" // if "true" then received packets will be written to file | ||
envOutputFile = "subdemo_OutputFile" // name of file to use if above is true | ||
|
||
envDebug = "subdemo_debug" // if "true" then the libraries will be instructed to print debug info | ||
) | ||
|
||
// config holds the configuration | ||
type config struct { | ||
serverURL *url.URL // MQTT server URL | ||
clientID string // Client ID to use when connecting to server | ||
topic string // Topic on which to publish messaged | ||
qos byte // QOS to use when publishing | ||
|
||
keepAlive uint16 // seconds between keepalive packets | ||
connectRetryDelay time.Duration // Period between connection attempts | ||
|
||
writeToStdOut bool // If true received messages will be written to stdout | ||
writeToDisk bool // if true received messages will be written to below file | ||
outputFileName string // filename to save messages to | ||
|
||
debug bool // autopaho and paho debug output requested | ||
} | ||
|
||
// getConfig - Retrieves the configuration from the environment | ||
func getConfig() (config, error) { | ||
var cfg config | ||
var err error | ||
|
||
srvURL, err := stringFromEnv(envServerURL) | ||
if err != nil { | ||
return config{}, err | ||
} | ||
cfg.serverURL, err = url.Parse(srvURL) | ||
if err != nil { | ||
return config{}, fmt.Errorf("environmental variable %s must be a valid URL (%w)", envServerURL, err) | ||
} | ||
|
||
if cfg.clientID, err = stringFromEnv(envClientID); err != nil { | ||
return config{}, err | ||
} | ||
if cfg.topic, err = stringFromEnv(envTopic); err != nil { | ||
return config{}, err | ||
} | ||
|
||
iQos, err := intFromEnv(envQos) | ||
if err != nil { | ||
return config{}, err | ||
} | ||
cfg.qos = byte(iQos) | ||
|
||
iKa, err := intFromEnv(envKeepAlive) | ||
if err != nil { | ||
return config{}, err | ||
} | ||
cfg.keepAlive = uint16(iKa) | ||
|
||
if cfg.connectRetryDelay, err = milliSecondsFromEnv(envConnectRetryDelay); err != nil { | ||
return config{}, err | ||
} | ||
|
||
if cfg.writeToStdOut, err = booleanFromEnv(envWriteToStdOut); err != nil { | ||
return config{}, err | ||
} | ||
if cfg.writeToDisk, err = booleanFromEnv(envWriteToDisk); err != nil { | ||
return config{}, err | ||
} | ||
if cfg.outputFileName, err = stringFromEnv(envOutputFile); cfg.writeToDisk && err != nil { | ||
return config{}, err | ||
} | ||
|
||
if cfg.debug, err = booleanFromEnv(envDebug); err != nil { | ||
return config{}, err | ||
} | ||
|
||
return cfg, nil | ||
} | ||
|
||
func getCmConfig(cfg config) autopaho.ClientConfig { | ||
return autopaho.ClientConfig{ | ||
BrokerUrls: []*url.URL{cfg.serverURL}, | ||
KeepAlive: cfg.keepAlive, | ||
ConnectRetryDelay: cfg.connectRetryDelay, | ||
ConnectTimeout: time.Duration(5 * time.Second), | ||
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { | ||
fmt.Println("mqtt connection up") | ||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) | ||
if _, err := cm.Subscribe(ctx, &paho.Subscribe{ | ||
Subscriptions: map[string]paho.SubscribeOptions{ | ||
cfg.topic: {QoS: cfg.qos}, | ||
}, | ||
}); err != nil { | ||
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) | ||
return | ||
} | ||
fmt.Println("mqtt subscription made") | ||
}, | ||
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) }, | ||
ClientConfig: paho.ClientConfig{ | ||
ClientID: cfg.clientID, | ||
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { | ||
log.Printf("%v+", m) | ||
}), | ||
OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, err) }, | ||
OnServerDisconnect: func(d *paho.Disconnect) { | ||
if d.Properties != nil { | ||
fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, d.Properties.ReasonString) | ||
} else { | ||
fmt.Printf("%s requested disconnect; reason code: %d\n", cfg.clientID, d.ReasonCode) | ||
} | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
// stringFromEnv - Retrieves a string from the environment and ensures it is not blank (ort non-existent) | ||
func stringFromEnv(key string) (string, error) { | ||
s := os.Getenv(key) | ||
if len(s) == 0 { | ||
return "", fmt.Errorf("environmental variable %s must not be blank", key) | ||
} | ||
return s, nil | ||
} | ||
|
||
// intFromEnv - Retrieves an integer from the environment (must be present and valid) | ||
func intFromEnv(key string) (int, error) { | ||
s := os.Getenv(key) | ||
if len(s) == 0 { | ||
return 0, fmt.Errorf("environmental variable %s must not be blank", key) | ||
} | ||
i, err := strconv.Atoi(s) | ||
if err != nil { | ||
return 0, fmt.Errorf("environmental variable %s must be an integer", key) | ||
} | ||
return i, nil | ||
} | ||
|
||
// milliSecondsFromEnv - Retrieves milliseconds (as time.Duration) from the environment (must be present and valid) | ||
func milliSecondsFromEnv(key string) (time.Duration, error) { | ||
s := os.Getenv(key) | ||
if len(s) == 0 { | ||
return 0, fmt.Errorf("environmental variable %s must not be blank", key) | ||
} | ||
i, err := strconv.Atoi(s) | ||
if err != nil { | ||
return 0, fmt.Errorf("environmental variable %s must be an integer", key) | ||
} | ||
return time.Duration(i) * time.Millisecond, nil | ||
} | ||
|
||
// booleanFromEnv - Retrieves boolean from the environment (must be present and valid) | ||
func booleanFromEnv(key string) (bool, error) { | ||
s := os.Getenv(key) | ||
if len(s) == 0 { | ||
return false, fmt.Errorf("environmental variable %s must not be blank", key) | ||
} | ||
switch strings.ToUpper(s) { | ||
case "TRUE", "T", "1": | ||
return true, nil | ||
case "FALSE", "F", "0": | ||
return false, nil | ||
default: | ||
return false, fmt.Errorf("environmental variable %s be a valid boolean option (is %s)", key, s) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exported functions need a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function has now been removed