Skip to content

Commit

Permalink
initial implementation of quick mode. added lots of todos
Browse files Browse the repository at this point in the history
  • Loading branch information
shawn hartsell committed Sep 29, 2016
1 parent 2f64eb2 commit 32c53f5
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 57 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
vendor/
glide.lock
debug
launch.json
launch.json
rabbit-probe
30 changes: 20 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
# Rabbit Probe

CLI utility to observe the behavior of a topology/consumers
by publishing messages to an exchange from a provided JSON schema. Also meant as a way to dive into Go :]
CLI utility for observing the behavior of a topology/consumers by publishing messages over time

### Usage

```rabbit-probe quick``` displays available commands
```rabbit-probe \<command\> -h``` displays options and examples for a particular command


### Modes

This section describes the various commands that are available. For a detailed description of available flags and examples
run a command with the -h flag (ex: ```rabbit-probe quick -h```)

quick: publishes a message to an exchange over a duration of time. Only json message bodies are supported


TODOs/Ideas:

- Read schema from a file
- Produce messages from schema
* would be cool to produce malformed messages
- Publish based on a frequency
* would be cool to have a config to drive this
* number of messages, simulate burst, steady stream, etc
- Maybe set up a reply-to queue for consumers that support it
- Support content types other than JSON (thrift, protobuf, etc)
* Support other message bodies besides JSON outside of quick mode (content-type header)
* Provide a mode that starts probes via a yml file with more fliexible options
* options such as warm up time
* variations of the default message (or a collection of messages ) to simulate failure conditions of consumers
* Load testing mode
65 changes: 25 additions & 40 deletions cmd/quick.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,49 @@
package cmd

import (
"encoding/json"
"errors"

"github.com/shawnHartsell/rabbit-probe/probe"
"github.com/shawnHartsell/rabbit-probe/timer"
"github.com/spf13/cobra"
)

var (
duration int
rate int
uri string
exchange string
payload string
)

// quickCmd represents the quick command
var quickCmd = &cobra.Command{
Use: "quick",
Short: "start a simple job to publish messages to an excahnge",
Long: `
The quick command will publish the provided message to an exchange an an iterval determined by rate and
duration. The command only supports JSON message bodies and does not modify the body
between publishes (to simulate malformed messages, etc).
Use quick mode as an easy way to smoke/stress test consumers
Example usage:
The quick command will publish the provided message to an exchange an an iterval determined by rate and
duration. The command only supports JSON message bodies and does not modify the body
between publishes (to simulate malformed messages, etc).
Use quick mode as an easy way to smoke/stress test consumers
Example usage:
1) rabbit-probe quick
1) rabbit-probe quick
start a probe using defaults
start a probe using defaults
2) rabbit-probe quick -r 2 -d 10 -u amqp://guest:guest@localhost:5672/&2f -e default -p "{\"foo\":\"bar\"}"
publish the provided message at a rate of 2 messages/second over a duration of 10 secs
`,
2) rabbit-probe quick -r 2 -d 10 -u amqp://guest:guest@localhost:5672/&2f -e default -p "{\"foo\":\"bar\"}"
publish the provided message at a rate of 2 messages/second over a duration of 10 secs
`,
}

var quickProbe = &probe.Quick{}

func init() {
RootCmd.AddCommand(quickCmd)
quickCmd.Flags().IntVarP(&duration, "duration", "d", 60, "duration (in seconds) that the probe should run")
quickCmd.Flags().IntVarP(&rate, "rate", "r", 20, "message publish rate (per second) ")
quickCmd.Flags().StringVarP(&uri, "uri", "u", "amqp://guest:guest@localhost:5672/%2f", "uri of the rabbitMQ broker to probe")
quickCmd.Flags().StringVarP(&exchange, "exchange", "e", "default", "exchange to publish the payload to")
quickCmd.Flags().StringVarP(&payload, "payload", "p", "{}", "message to publish (json)")

quickCmd.Flags().IntVarP(&quickProbe.Duration, "duration", "d", 60, "duration (in seconds) that the probe should run")
quickCmd.Flags().IntVarP(&quickProbe.Rate, "rate", "r", 20, "message publish rate (per second) ")
quickCmd.Flags().StringVarP(&quickProbe.URI, "uri", "u", "amqp://guest:guest@localhost:5672/%2f", "uri of the rabbitMQ broker to probe")
quickCmd.Flags().StringVarP(&quickProbe.RoutingKey, "key", "k", "#", "routing key for message")
quickCmd.Flags().StringVarP(&quickProbe.Exchange, "exchange", "e", "", "exchange to publish the payload to")
quickCmd.Flags().StringVarP(&quickProbe.Payload, "payload", "p", "{}", "message to publish (json)")
quickCmd.RunE = quick
}

func quick(cmd *cobra.Command, args []string) error {
if !isJSON(payload) {
return errors.New("payload is not valid JSON")
}

timer.Start(duration, rate)
return nil
}

func isJSON(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
err := timer.Start(quickProbe)
return err
}
23 changes: 23 additions & 0 deletions probe/probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package probe

import "github.com/streadway/amqp"

//TODO: improve the name
type Actions interface {
GetProbe() *Probe
Validate() error
PublishMessage(channel *amqp.Channel) error
DisplayResults()
}

type Probe struct {
Duration int
Rate int
URI string
Exchange string
RoutingKey string
}

func (p *Probe) GetProbe() *Probe {
return p
}
48 changes: 48 additions & 0 deletions probe/quick.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package probe

import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/streadway/amqp"
)

type Quick struct {
Payload string
Probe
}

func (p *Quick) Validate() error {
var js map[string]interface{}

if err := json.Unmarshal([]byte(p.Payload), &js); err != nil {
errMsg := fmt.Sprintf("payload must be valid JSON \n %s \n", err.Error())
return errors.New(errMsg)
}

if _, err := amqp.ParseURI(p.URI); err != nil {
errMsg := fmt.Sprintf("uri is not a valid amqp uri \n %s \n", err.Error())
return errors.New(errMsg)
}

return nil
}

func (p *Quick) PublishMessage(channel *amqp.Channel) error {

msg := amqp.Publishing{
DeliveryMode: amqp.Transient,
ContentType: "application/json",
Timestamp: time.Now(),
Body: []byte(p.Payload),
MessageId: "muh id",
}

return channel.Publish(p.Exchange, p.RoutingKey, false, false, msg)
}

func (p *Quick) DisplayResults() {
fmt.Println("we done!")
}
36 changes: 30 additions & 6 deletions timer/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,58 @@ import (
"errors"
"fmt"
"time"

"github.com/shawnHartsell/rabbit-probe/probe"
"github.com/streadway/amqp"
)

//Start begins a timer that will invoke an operation at a rate of x times/sec over a duration of y secs.
func Start(duration int, rate int) (err error) {
//TODO: name is terrible, it implies an async operation
func Start(actions probe.Actions) (err error) {
probe := actions.GetProbe()
if err := actions.Validate(); err != nil {
return err
}

//TODO: abstract, abstract, abstract
fmt.Println("opening amqp connection")
conn, err := amqp.Dial(probe.URI)
if err != nil {
return err
}

channel, err := conn.Channel()
if err != nil {
return err
}

tickerRate, err := getTickerRate(rate)
tickerRate, err := getTickerRate(probe.Rate)
if err != nil {
return err
}

ticker := time.NewTicker(tickerRate)
doneChan := make(chan bool)

fmt.Printf("probe started at a rate of %d/s over %d seconds\n", probe.Rate, probe.Duration)
go func() {
time.Sleep(time.Second * time.Duration(duration))
time.Sleep(time.Second * time.Duration(probe.Duration))
doneChan <- true
}()

for {
select {
case <-ticker.C:
fmt.Println("got a tick!")
//TODO: handle channel errors (restablish connection, new channel, etc)
actions.PublishMessage(channel)
case <-doneChan:
fmt.Println("we done")
fmt.Println("probe has completed")
ticker.Stop()
return
actions.DisplayResults()
return nil
}
}

}

func getTickerRate(rate int) (time.Duration, error) {
Expand Down

0 comments on commit 32c53f5

Please sign in to comment.