Skip to content

Commit

Permalink
add metrics when barito-flow failed connect to ES or Kafka (#40)
Browse files Browse the repository at this point in the history
* add metrics when elasticsearch retry push to ES

* add metrics when producer failed connect to kafka

* change metrics to count log message excedeed

* add log when producer failed to send logs to kafka

Co-authored-by: Fadli Nurhasan <[email protected]>

* create new event topic with replica 1 if not exists

Co-authored-by: Fadli Nurhasan <[email protected]>

* add parameters elastic retrier max retry

Co-authored-by: Fadli Nurhasan <[email protected]>

* make MaxRetry as global var and change maxretry default value

Co-authored-by: bentol <[email protected]>

Co-authored-by: Fadli Nurhasan <[email protected]>
  • Loading branch information
bentol and fadlinurhasan authored Apr 20, 2020
1 parent 67fc035 commit 5baaf82
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 29 deletions.
2 changes: 2 additions & 0 deletions cmds/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) {
kafkaRetryInterval := configKafkaRetryInterval()
newTopicEventName := configNewTopicEvent()
elasticRetrierInterval := configElasticsearchRetrierInterval()
elasticRetrierMaxRetry := configElasticsearchRetrierMaxRetry()
esIndexMethod := configEsIndexMethod()
esBulkSize := configEsBulkSize()
esFlushIntervalMs := configEsFlushIntervalMs()
Expand Down Expand Up @@ -65,6 +66,7 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) {
"kafkaRetryInterval": kafkaRetryInterval,
"newTopicEventName": newTopicEventName,
"elasticRetrierInterval": elasticRetrierInterval,
"elasticRetrierMaxRetry": elasticRetrierMaxRetry,
"esConfig": esConfig,
"elasticUsername": elasticUsername,
"elasticPassword": elasticPassword,
Expand Down
6 changes: 6 additions & 0 deletions cmds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

EnvNewTopicEventName = "BARITO_NEW_TOPIC_EVENT"
EnvConsumerElasticsearchRetrierInterval = "BARITO_CONSUMER_ELASTICSEARCH_RETRIER_INTERVAL"
EnvConsumerElasticsearchRetrierMaxRetry = "BARITO_CONSUMER_ELASTICSEARCH_RETRIER_MAX_RETRY"
EnvConsumerRebalancingStrategy = "BARITO_CONSUMER_REBALANCING_STRATEGY"

EnvPrintTPS = "BARITO_PRINT_TPS"
Expand Down Expand Up @@ -68,6 +69,7 @@ var (

DefaultNewTopicEventName = "new_topic_events"
DefaultElasticsearchRetrierInterval = "30s"
DefaultElasticsearchRetrierMaxRetry = 10
DefaultConsumerRebalancingStrategy = "RoundRobin"
DefaultEsIndexMethod = "BulkProcessor"
DefaultEsBulkSize = 100
Expand Down Expand Up @@ -199,6 +201,10 @@ func configElasticsearchRetrierInterval() string {
return stringEnvOrDefault(EnvConsumerElasticsearchRetrierInterval, DefaultElasticsearchRetrierInterval)
}

func configElasticsearchRetrierMaxRetry() int {
return intEnvOrDefault(EnvConsumerElasticsearchRetrierMaxRetry, DefaultElasticsearchRetrierMaxRetry)
}

func configConsumerRebalancingStrategy() string {
return stringEnvOrDefault(EnvConsumerRebalancingStrategy, DefaultConsumerRebalancingStrategy)
}
Expand Down
7 changes: 6 additions & 1 deletion flow/barito_consumer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flow
import (
"context"
"fmt"
"github.com/BaritoLog/barito-flow/prome"
"strings"
"time"

Expand Down Expand Up @@ -55,6 +56,7 @@ type baritoConsumerService struct {
lastNewTopic string
isHalt bool
elasticRetrierInterval string
elasticRetrierMaxRetry int

elasticUsername string
elasticPassword string
Expand All @@ -71,6 +73,7 @@ func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerServi
newTopicEventName: params["newTopicEventName"].(string),
workerMap: make(map[string]ConsumerWorker),
elasticRetrierInterval: params["elasticRetrierInterval"].(string),
elasticRetrierMaxRetry: params["elasticRetrierMaxRetry"].(int),
elasticUsername: params["elasticUsername"].(string),
elasticPassword: params["elasticPassword"].(string),
}
Expand All @@ -81,6 +84,7 @@ func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerServi
s.esClient = &elastic
if err != nil {
s.logError(errkit.Concat(ErrElasticsearchClient, err))
prome.IncreaseConsumerElasticsearchClientFailed(prome.ESClientFailedPhaseInit)
}

return s
Expand Down Expand Up @@ -210,6 +214,7 @@ func (s *baritoConsumerService) logNewTopic(topic string) {

func (s *baritoConsumerService) onElasticRetry(err error) {
s.logError(errkit.Concat(ErrElasticsearchClient, err))
prome.IncreaseConsumerElasticsearchClientFailed(prome.ESClientFailedPhaseRetry)
s.HaltAllWorker()
}

Expand Down Expand Up @@ -275,7 +280,7 @@ func (s *baritoConsumerService) HaltAllWorker() {
}

func (s *baritoConsumerService) elasticRetrier() *ElasticRetrier {
return NewElasticRetrier(timekit.Duration(s.elasticRetrierInterval), s.onElasticRetry)
return NewElasticRetrier(timekit.Duration(s.elasticRetrierInterval), s.elasticRetrierMaxRetry, s.onElasticRetry)
}

func (s *baritoConsumerService) ResumeWorker() (err error) {
Expand Down
55 changes: 51 additions & 4 deletions flow/barito_consumer_service_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package flow

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/BaritoLog/barito-flow/prome"

"github.com/BaritoLog/barito-flow/mock"
. "github.com/BaritoLog/go-boilerplate/testkit"
"github.com/BaritoLog/go-boilerplate/timekit"
Expand All @@ -21,10 +21,10 @@ import (

func init() {
log.SetLevel(log.ErrorLevel)
prome.InitConsumerInstrumentation()
}

func TestBaritConsumerService_MakeKafkaAdminError(t *testing.T) {
resetPrometheusMetrics()
factory := NewDummyKafkaFactory()
factory.Expect_MakeKafkaAdmin_AlwaysError("some-error")

Expand Down Expand Up @@ -78,6 +78,52 @@ func TestBaritoConsumerService(t *testing.T) {
FatalIf(t, !worker.IsStart(), "worker of topic abc_logs is not starting")
}

func TestBaritoConsumerService_OnElasticRetry(t *testing.T) {
resetPrometheusMetrics()
ctrl := gomock.NewController(t)
defer ctrl.Finish()

esHandler := &ELasticTestHandler{}
ts := httptest.NewServer(esHandler)
defer ts.Close()
esHandler.CustomHandler = func(w http.ResponseWriter, r *http.Request) {
if r.Method == "HEAD" { // check if index exist
w.WriteHeader(200)
_, _ = w.Write([]byte(`{}`))
go func() {
ts.Close()
}()
}
}

factory := NewDummyKafkaFactory()
factory.Expect_MakeKafkaAdmin_ConsumerServiceSuccess(ctrl, []string{"abc_logs"})
factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl)

consumerParams := SampleConsumerParams(factory)
consumerParams["elasticRetrierInterval"] = "1ms"
consumerParams["elasticRetrierMaxRetry"] = 5
consumerParams["elasticUrls"] = []string{ts.URL}
service := NewBaritoConsumerService(consumerParams).(*baritoConsumerService)

err := service.Start()
FatalIfError(t, err)
defer service.Close()
timberBytes, _ := proto.Marshal(pb.SampleTimberProto())

service.onStoreTimber(&sarama.ConsumerMessage{
Value: timberBytes,
})
FatalIf(t, service.lastError == nil, "Should return error because ES is stopped")

expected := `
# HELP barito_consumer_elasticsearch_client_failed Number of elasticsearch client failed
# TYPE barito_consumer_elasticsearch_client_failed counter
barito_consumer_elasticsearch_client_failed{phase="retry"} 5
`
FatalIfError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expected), "barito_consumer_elasticsearch_client_failed"))
}

func TestBaritoConsumerService_SpawnWorkerError(t *testing.T) {

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -301,7 +347,8 @@ func SampleConsumerParams(factory *dummyKafkaFactory) map[string]interface{} {
"kafkaMaxRetry": 1,
"kafkaRetryInterval": 10,
"newTopicEventName": "",
"elasticRetrierInterval": "",
"elasticRetrierInterval": "1s",
"elasticRetrierMaxRetry": 1,
"esConfig": NewEsConfig("SingleInsert", 1, time.Duration(1000), false),
"elasticUsername": "",
"elasticPassword": "",
Expand Down
8 changes: 4 additions & 4 deletions flow/dummy_kafka_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (f *dummyKafkaFactory) Expect_MakeClusterConsumer_AlwaysSuccess(ctrl *gomoc
consumer.EXPECT().Messages().AnyTimes()
consumer.EXPECT().Notifications().AnyTimes()
consumer.EXPECT().Errors().AnyTimes()
consumer.EXPECT().Close()
consumer.EXPECT().Close().AnyTimes()
return consumer, nil
}
}
Expand All @@ -63,7 +63,7 @@ func (f *dummyKafkaFactory) Expect_MakeClusterConsumer_ConsumerSpawnWorkerErrorC
consumer.EXPECT().Messages().AnyTimes()
consumer.EXPECT().Notifications().AnyTimes()
consumer.EXPECT().Errors().AnyTimes()
consumer.EXPECT().Close()
consumer.EXPECT().Close().AnyTimes()
return consumer, nil
}

Expand All @@ -80,8 +80,8 @@ func (f *dummyKafkaFactory) Expect_MakeKafkaAdmin_AlwaysError(errMsg string) {
func (f *dummyKafkaFactory) Expect_MakeKafkaAdmin_ConsumerServiceSuccess(ctrl *gomock.Controller, topics []string) {
f.MakeKafkaAdminFunc = func() (KafkaAdmin, error) {
admin := mock.NewMockKafkaAdmin(ctrl)
admin.EXPECT().Topics().Return(topics)
admin.EXPECT().Close()
admin.EXPECT().Topics().Return(topics).AnyTimes()
admin.EXPECT().Close().AnyTimes()
return admin, nil
}
}
Expand Down
4 changes: 4 additions & 0 deletions flow/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, e
elastic.SetBasicAuth(elasticUsername, elasticPassword),
)

if err != nil {
return
}

beforeBulkFunc, afterBulkFunc := getCommitCallback()

p, err := c.BulkProcessor().
Expand Down
10 changes: 8 additions & 2 deletions flow/elastic_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ import (
type ElasticRetrier struct {
backoff elastic.Backoff
onRetryFunc func(err error)
maxRetry int
}

func NewElasticRetrier(t time.Duration, f func(err error)) *ElasticRetrier {
func NewElasticRetrier(t time.Duration, n int, f func(err error)) *ElasticRetrier {
return &ElasticRetrier{
backoff: elastic.NewConstantBackoff(t),
onRetryFunc: f,
maxRetry: n,
}
}

func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request, resp *http.Response, err error) (time.Duration, bool, error) {

log.Warn(errors.New(fmt.Sprintf("Elasticsearch Retrier #%d", retry)))

if err == syscall.ECONNREFUSED {
Expand All @@ -35,5 +36,10 @@ func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request
// Let the backoff strategy decide how long to wait and whether to stop
wait, stop := r.backoff.Next(retry)
r.onRetryFunc(err)

// if max retry 0, it will retry forever
if r.maxRetry > 0 && retry >= r.maxRetry {
stop = false
}
return wait, stop, nil
}
8 changes: 6 additions & 2 deletions flow/elastic_retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import (
"time"
)

const (
MAX_RETRY = 10
)

func mockElasticRetrier() *ElasticRetrier {
return NewElasticRetrier(1*time.Second, mockRetrier)
return NewElasticRetrier(1*time.Second, MAX_RETRY, mockRetrier)
}

func mockRetrier(err error) {
// Nothing to do
}

func TestNewElasticRetrier(t *testing.T) {
r := NewElasticRetrier(1*time.Second, mockRetrier)
r := NewElasticRetrier(1*time.Second, MAX_RETRY, mockRetrier)
wait, ok, err := r.Retry(context.TODO(), 1, nil, nil, nil)
if want, got := 1*time.Second, wait; want != got {
t.Fatalf("expected %v, got %v", want, got)
Expand Down
21 changes: 13 additions & 8 deletions flow/elastic_test_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type ELasticTestHandler struct {
CreateAPIStatus int
PostAPIStatus int
ResponseBody []byte
CustomHandler func(w http.ResponseWriter, r *http.Request)
}

func (handler *ELasticTestHandler) getResponseBody() (body []byte) {
Expand All @@ -18,13 +19,17 @@ func (handler *ELasticTestHandler) getResponseBody() (body []byte) {
}

func (handler *ELasticTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "HEAD" { // check if index exist
w.WriteHeader(handler.ExistAPIStatus)
} else if r.Method == "PUT" { // create index
w.WriteHeader(handler.CreateAPIStatus)
w.Write(handler.getResponseBody())
} else if r.Method == "POST" { // post message
w.WriteHeader(handler.PostAPIStatus)
w.Write(handler.getResponseBody())
if handler.CustomHandler == nil {
if r.Method == "HEAD" { // check if index exist
w.WriteHeader(handler.ExistAPIStatus)
} else if r.Method == "PUT" { // create index
w.WriteHeader(handler.CreateAPIStatus)
w.Write(handler.getResponseBody())
} else if r.Method == "POST" { // post message
w.WriteHeader(handler.PostAPIStatus)
w.Write(handler.getResponseBody())
}
} else {
handler.CustomHandler(w, r)
}
}
9 changes: 6 additions & 3 deletions flow/grpc_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (s *producerService) initProducer() (err error) {
log.Infof("Retry kafka sync producer successful")
}
} else {
prome.IncreaseProducerKafkaClientFailed()
if (s.kafkaMaxRetry == 0) || (retry < s.kafkaMaxRetry) {
log.Warnf("Cannot connect to kafka: %s, retrying in %d seconds", err, s.kafkaRetryInterval)
time.Sleep(time.Duration(s.kafkaRetryInterval) * time.Second)
Expand Down Expand Up @@ -202,7 +203,7 @@ func (s *producerService) Produce(_ context.Context, timber *pb.Timber) (resp *p
maxTokenIfNotExist := timber.GetContext().GetAppMaxTps()
if s.limiter.IsHitLimit(topic, 1, maxTokenIfNotExist) {
err = onLimitExceededGrpc()
prome.IncreaseProducerTPSExceededCounter(topic)
prome.IncreaseProducerTPSExceededCounter(topic, 1)
return
}

Expand All @@ -222,9 +223,10 @@ func (s *producerService) ProduceBatch(_ context.Context, timberCollection *pb.T
topic := timberCollection.GetContext().GetKafkaTopic() + s.topicSuffix

maxTokenIfNotExist := timberCollection.GetContext().GetAppMaxTps()
if s.limiter.IsHitLimit(topic, len(timberCollection.GetItems()), maxTokenIfNotExist) {
lengthMessages := len(timberCollection.GetItems())
if s.limiter.IsHitLimit(topic, lengthMessages, maxTokenIfNotExist) {
err = onLimitExceededGrpc()
prome.IncreaseProducerTPSExceededCounter(topic)
prome.IncreaseProducerTPSExceededCounter(topic, lengthMessages)
return
}

Expand All @@ -234,6 +236,7 @@ func (s *producerService) ProduceBatch(_ context.Context, timberCollection *pb.T

err = s.handleProduce(timber, topic)
if err != nil {
log.Infof("Failed send logs to kafka: %s", err)
return
}
}
Expand Down
Loading

0 comments on commit 5baaf82

Please sign in to comment.