From b68b7cde2472aa177329a19eddb19b92ad6c9f24 Mon Sep 17 00:00:00 2001 From: Iman Tung Date: Mon, 11 Jun 2018 13:44:52 +0700 Subject: [PATCH] [iman] testing for command producer and consumer --- cmds/consumer.go | 38 ++++++++++++++++++++++---------------- cmds/consumer_test.go | 30 ++++++++++++++++++++++++++++++ cmds/producer_test.go | 18 ++++++++++++++++++ 3 files changed, 70 insertions(+), 16 deletions(-) create mode 100644 cmds/consumer_test.go create mode 100644 cmds/producer_test.go diff --git a/cmds/consumer.go b/cmds/consumer.go index 5bea48a7..daec7b4d 100644 --- a/cmds/consumer.go +++ b/cmds/consumer.go @@ -19,28 +19,13 @@ func Consumer(c *cli.Context) (err error) { topics := getKafkaConsumerTopics() esUrl := getElasticsearchUrl() - pushMetricUrl := getPushMetricUrl() - pushMetricToken := getPushMetricToken() - pushMetricInterval := getPushMetricInterval() - log.Infof("[Start Consumer]") log.Infof("KafkaBrokers: %v", EnvKafkaBrokers, brokers) log.Infof("KafkaGroupID: %s", EnvKafkaGroupID, groupID) log.Infof("KafkaConsumerTopics:%v", EnvKafkaConsumerTopics, topics) log.Infof("ElasticsearchUrl:%v", EnvElasticsearchUrl, esUrl) - log.Infof("PushMetricUrl: %v", EnvPushMetricUrl, pushMetricUrl) - log.Infof("PushMetricToken: %v", EnvPushMetricToken, pushMetricToken) - log.Infof("PushMetricInterval: %v", EnvPushMetricInterval, pushMetricInterval) - if pushMetricToken != "" && pushMetricUrl != "" { - log.Infof("Set callback to instrumentation") - instru.SetCallback( - timekit.Duration(pushMetricInterval), - flow.NewMetricMarketCallback(pushMetricUrl, pushMetricToken), - ) - } else { - log.Infof("No callback for instrumentation") - } + callbackInstrumentation() // elastic client client, err := elastic.NewClient( @@ -73,3 +58,24 @@ func Consumer(c *cli.Context) (err error) { return agent.Start() } + +func callbackInstrumentation() bool { + pushMetricUrl := getPushMetricUrl() + pushMetricToken := getPushMetricToken() + pushMetricInterval := getPushMetricInterval() + + if pushMetricToken == "" || pushMetricUrl == "" { + log.Infof("No callback for instrumentation") + return false + } + + log.Infof("PushMetricUrl: %v", EnvPushMetricUrl, pushMetricUrl) + log.Infof("PushMetricToken: %v", EnvPushMetricToken, pushMetricToken) + log.Infof("PushMetricInterval: %v", EnvPushMetricInterval, pushMetricInterval) + instru.SetCallback( + timekit.Duration(pushMetricInterval), + flow.NewMetricMarketCallback(pushMetricUrl, pushMetricToken), + ) + return true + +} diff --git a/cmds/consumer_test.go b/cmds/consumer_test.go new file mode 100644 index 00000000..6f462391 --- /dev/null +++ b/cmds/consumer_test.go @@ -0,0 +1,30 @@ +package cmds + +import ( + "os" + "testing" + + . "github.com/BaritoLog/go-boilerplate/testkit" + "github.com/BaritoLog/instru" + log "github.com/sirupsen/logrus" +) + +func TestConsumer(t *testing.T) { + log.SetLevel(log.ErrorLevel) + os.Setenv(EnvKafkaBrokers, "wronghost:2349") + defer os.Clearenv() + + err := Consumer(nil) + + FatalIfWrongError(t, err, "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") +} + +func TestCallbackInstrumentation(t *testing.T) { + + FatalIf(t, callbackInstrumentation(), "it should be return false") + + os.Setenv(EnvPushMetricToken, "some-token") + os.Setenv(EnvPushMetricUrl, "http://some-url") + FatalIf(t, !callbackInstrumentation(), "it should be return true") + FatalIf(t, instru.DefaultCallback == nil, "it should be set callback") +} diff --git a/cmds/producer_test.go b/cmds/producer_test.go new file mode 100644 index 00000000..221ab7cc --- /dev/null +++ b/cmds/producer_test.go @@ -0,0 +1,18 @@ +package cmds + +import ( + "os" + "testing" + + . "github.com/BaritoLog/go-boilerplate/testkit" + log "github.com/sirupsen/logrus" +) + +func TestProducer(t *testing.T) { + log.SetLevel(log.ErrorLevel) + os.Setenv(EnvKafkaBrokers, "wronghost:2349") + defer os.Clearenv() + + err := Producer(nil) + FatalIfWrongError(t, err, "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") +}