From 77ad1b5b70abe7e1673f662425e4c6bb21238173 Mon Sep 17 00:00:00 2001 From: Iman Tung Date: Tue, 12 Jun 2018 15:06:56 +0700 Subject: [PATCH] [iman] graceful shutdown --- cmds/consumer.go | 3 +++ cmds/producer.go | 3 +++ flow/kafka_agent.go | 6 ++++++ flow/kafka_agent_test.go | 3 +++ flow/kafka_consumer.go | 5 +++++ 5 files changed, 20 insertions(+) diff --git a/cmds/consumer.go b/cmds/consumer.go index daec7b4d..88d1dd89 100644 --- a/cmds/consumer.go +++ b/cmds/consumer.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/BaritoLog/barito-flow/flow" + "github.com/BaritoLog/go-boilerplate/srvkit" "github.com/BaritoLog/go-boilerplate/timekit" "github.com/BaritoLog/instru" cluster "github.com/bsm/sarama-cluster" @@ -55,6 +56,8 @@ func Consumer(c *cli.Context) (err error) { }, } + srvkit.AsyncGracefulShutdown(agent.Close) + return agent.Start() } diff --git a/cmds/producer.go b/cmds/producer.go index 03700c3e..24ad8c1e 100644 --- a/cmds/producer.go +++ b/cmds/producer.go @@ -2,6 +2,7 @@ package cmds import ( "github.com/BaritoLog/barito-flow/flow" + "github.com/BaritoLog/go-boilerplate/srvkit" "github.com/Shopify/sarama" log "github.com/sirupsen/logrus" @@ -41,5 +42,7 @@ func Producer(c *cli.Context) (err error) { producerMaxTps, ) + srvkit.AsyncGracefulShutdown(agent.Close) + return agent.Start() } diff --git a/flow/kafka_agent.go b/flow/kafka_agent.go index 1fcaf34e..e06ff70e 100644 --- a/flow/kafka_agent.go +++ b/flow/kafka_agent.go @@ -20,6 +20,12 @@ func (a *KafkaAgent) Start() (err error) { return } +func (a *KafkaAgent) Close() { + if a.Consumer != nil { + a.Consumer.Close() + } +} + func (a *KafkaAgent) loopMain() { for { select { diff --git a/flow/kafka_agent_test.go b/flow/kafka_agent_test.go index 17926ab2..c2ca7fb7 100644 --- a/flow/kafka_agent_test.go +++ b/flow/kafka_agent_test.go @@ -43,6 +43,7 @@ func TestKafkaAgent(t *testing.T) { gotNotification = notification }, } + defer agent.Close() go agent.Start() timekit.Sleep("1ms") @@ -74,6 +75,7 @@ func TestKafkaAgent_StoreError(t *testing.T) { err0 = err }, } + defer agent.Close() go agent.Start() timekit.Sleep("1ms") @@ -103,6 +105,7 @@ func TestKafkaAgent_KafkaError(t *testing.T) { err0 = err }, } + defer agent.Close() go agent.Start() timekit.Sleep("1ms") diff --git a/flow/kafka_consumer.go b/flow/kafka_consumer.go index 02aac8bd..806bb9c2 100644 --- a/flow/kafka_consumer.go +++ b/flow/kafka_consumer.go @@ -11,6 +11,7 @@ type KafkaConsumer interface { Notifications() <-chan *cluster.Notification Errors() <-chan error MarkOffset(msg *sarama.ConsumerMessage, metadata string) + Close() error } type dummyKafkaConsumer struct { @@ -33,3 +34,7 @@ func (c *dummyKafkaConsumer) Notifications() <-chan *cluster.Notification { func (c *dummyKafkaConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { } + +func (c *dummyKafkaConsumer) Close() error { + return nil +}