Skip to content

Commit

Permalink
[iman] testing for command producer and consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Iman Tung committed Jun 11, 2018
1 parent 5ae3115 commit b68b7cd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 16 deletions.
38 changes: 22 additions & 16 deletions cmds/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,13 @@ func Consumer(c *cli.Context) (err error) {
topics := getKafkaConsumerTopics()
esUrl := getElasticsearchUrl()

pushMetricUrl := getPushMetricUrl()
pushMetricToken := getPushMetricToken()
pushMetricInterval := getPushMetricInterval()

log.Infof("[Start Consumer]")
log.Infof("KafkaBrokers: %v", EnvKafkaBrokers, brokers)
log.Infof("KafkaGroupID: %s", EnvKafkaGroupID, groupID)
log.Infof("KafkaConsumerTopics:%v", EnvKafkaConsumerTopics, topics)
log.Infof("ElasticsearchUrl:%v", EnvElasticsearchUrl, esUrl)
log.Infof("PushMetricUrl: %v", EnvPushMetricUrl, pushMetricUrl)
log.Infof("PushMetricToken: %v", EnvPushMetricToken, pushMetricToken)
log.Infof("PushMetricInterval: %v", EnvPushMetricInterval, pushMetricInterval)

if pushMetricToken != "" && pushMetricUrl != "" {
log.Infof("Set callback to instrumentation")
instru.SetCallback(
timekit.Duration(pushMetricInterval),
flow.NewMetricMarketCallback(pushMetricUrl, pushMetricToken),
)
} else {
log.Infof("No callback for instrumentation")
}
callbackInstrumentation()

// elastic client
client, err := elastic.NewClient(
Expand Down Expand Up @@ -73,3 +58,24 @@ func Consumer(c *cli.Context) (err error) {
return agent.Start()

}

func callbackInstrumentation() bool {
pushMetricUrl := getPushMetricUrl()
pushMetricToken := getPushMetricToken()
pushMetricInterval := getPushMetricInterval()

if pushMetricToken == "" || pushMetricUrl == "" {
log.Infof("No callback for instrumentation")
return false
}

log.Infof("PushMetricUrl: %v", EnvPushMetricUrl, pushMetricUrl)
log.Infof("PushMetricToken: %v", EnvPushMetricToken, pushMetricToken)
log.Infof("PushMetricInterval: %v", EnvPushMetricInterval, pushMetricInterval)
instru.SetCallback(
timekit.Duration(pushMetricInterval),
flow.NewMetricMarketCallback(pushMetricUrl, pushMetricToken),
)
return true

}
30 changes: 30 additions & 0 deletions cmds/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cmds

import (
"os"
"testing"

. "github.com/BaritoLog/go-boilerplate/testkit"
"github.com/BaritoLog/instru"
log "github.com/sirupsen/logrus"
)

func TestConsumer(t *testing.T) {
log.SetLevel(log.ErrorLevel)
os.Setenv(EnvKafkaBrokers, "wronghost:2349")
defer os.Clearenv()

err := Consumer(nil)

FatalIfWrongError(t, err, "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
}

func TestCallbackInstrumentation(t *testing.T) {

FatalIf(t, callbackInstrumentation(), "it should be return false")

os.Setenv(EnvPushMetricToken, "some-token")
os.Setenv(EnvPushMetricUrl, "http://some-url")
FatalIf(t, !callbackInstrumentation(), "it should be return true")
FatalIf(t, instru.DefaultCallback == nil, "it should be set callback")
}
18 changes: 18 additions & 0 deletions cmds/producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cmds

import (
"os"
"testing"

. "github.com/BaritoLog/go-boilerplate/testkit"
log "github.com/sirupsen/logrus"
)

func TestProducer(t *testing.T) {
log.SetLevel(log.ErrorLevel)
os.Setenv(EnvKafkaBrokers, "wronghost:2349")
defer os.Clearenv()

err := Producer(nil)
FatalIfWrongError(t, err, "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
}

0 comments on commit b68b7cd

Please sign in to comment.