Skip to content

Commit

Permalink
[iman] graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Iman Tung committed Jun 12, 2018
1 parent 0d7da92 commit 77ad1b5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cmds/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/BaritoLog/barito-flow/flow"
"github.com/BaritoLog/go-boilerplate/srvkit"
"github.com/BaritoLog/go-boilerplate/timekit"
"github.com/BaritoLog/instru"
cluster "github.com/bsm/sarama-cluster"
Expand Down Expand Up @@ -55,6 +56,8 @@ func Consumer(c *cli.Context) (err error) {
},
}

srvkit.AsyncGracefulShutdown(agent.Close)

return agent.Start()

}
Expand Down
3 changes: 3 additions & 0 deletions cmds/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmds

import (
"github.com/BaritoLog/barito-flow/flow"
"github.com/BaritoLog/go-boilerplate/srvkit"
"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -41,5 +42,7 @@ func Producer(c *cli.Context) (err error) {
producerMaxTps,
)

srvkit.AsyncGracefulShutdown(agent.Close)

return agent.Start()
}
6 changes: 6 additions & 0 deletions flow/kafka_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ func (a *KafkaAgent) Start() (err error) {
return
}

func (a *KafkaAgent) Close() {
if a.Consumer != nil {
a.Consumer.Close()
}
}

func (a *KafkaAgent) loopMain() {
for {
select {
Expand Down
3 changes: 3 additions & 0 deletions flow/kafka_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestKafkaAgent(t *testing.T) {
gotNotification = notification
},
}
defer agent.Close()

go agent.Start()
timekit.Sleep("1ms")
Expand Down Expand Up @@ -74,6 +75,7 @@ func TestKafkaAgent_StoreError(t *testing.T) {
err0 = err
},
}
defer agent.Close()

go agent.Start()
timekit.Sleep("1ms")
Expand Down Expand Up @@ -103,6 +105,7 @@ func TestKafkaAgent_KafkaError(t *testing.T) {
err0 = err
},
}
defer agent.Close()

go agent.Start()
timekit.Sleep("1ms")
Expand Down
5 changes: 5 additions & 0 deletions flow/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type KafkaConsumer interface {
Notifications() <-chan *cluster.Notification
Errors() <-chan error
MarkOffset(msg *sarama.ConsumerMessage, metadata string)
Close() error
}

type dummyKafkaConsumer struct {
Expand All @@ -33,3 +34,7 @@ func (c *dummyKafkaConsumer) Notifications() <-chan *cluster.Notification {

func (c *dummyKafkaConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
}

func (c *dummyKafkaConsumer) Close() error {
return nil
}

0 comments on commit 77ad1b5

Please sign in to comment.