diff --git a/cmds/action.go b/cmds/action.go index 4038ce63..44afdf21 100644 --- a/cmds/action.go +++ b/cmds/action.go @@ -32,6 +32,7 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) { kafkaRetryInterval := configKafkaRetryInterval() newTopicEventName := configNewTopicEvent() elasticRetrierInterval := configElasticsearchRetrierInterval() + elasticRetrierMaxRetry := configElasticsearchRetrierMaxRetry() esIndexMethod := configEsIndexMethod() esBulkSize := configEsBulkSize() esFlushIntervalMs := configEsFlushIntervalMs() @@ -65,6 +66,7 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) { "kafkaRetryInterval": kafkaRetryInterval, "newTopicEventName": newTopicEventName, "elasticRetrierInterval": elasticRetrierInterval, + "elasticRetrierMaxRetry": elasticRetrierMaxRetry, "esConfig": esConfig, "elasticUsername": elasticUsername, "elasticPassword": elasticPassword, diff --git a/cmds/config.go b/cmds/config.go index 924d98e1..83567e0e 100644 --- a/cmds/config.go +++ b/cmds/config.go @@ -36,6 +36,7 @@ const ( EnvNewTopicEventName = "BARITO_NEW_TOPIC_EVENT" EnvConsumerElasticsearchRetrierInterval = "BARITO_CONSUMER_ELASTICSEARCH_RETRIER_INTERVAL" + EnvConsumerElasticsearchRetrierMaxRetry = "BARITO_CONSUMER_ELASTICSEARCH_RETRIER_MAX_RETRY" EnvConsumerRebalancingStrategy = "BARITO_CONSUMER_REBALANCING_STRATEGY" EnvPrintTPS = "BARITO_PRINT_TPS" @@ -68,6 +69,7 @@ var ( DefaultNewTopicEventName = "new_topic_events" DefaultElasticsearchRetrierInterval = "30s" + DefaultElasticsearchRetrierMaxRetry = 10 DefaultConsumerRebalancingStrategy = "RoundRobin" DefaultEsIndexMethod = "BulkProcessor" DefaultEsBulkSize = 100 @@ -199,6 +201,10 @@ func configElasticsearchRetrierInterval() string { return stringEnvOrDefault(EnvConsumerElasticsearchRetrierInterval, DefaultElasticsearchRetrierInterval) } +func configElasticsearchRetrierMaxRetry() int { + return intEnvOrDefault(EnvConsumerElasticsearchRetrierMaxRetry, DefaultElasticsearchRetrierMaxRetry) +} + func configConsumerRebalancingStrategy() string { return stringEnvOrDefault(EnvConsumerRebalancingStrategy, DefaultConsumerRebalancingStrategy) } diff --git a/flow/barito_consumer_service.go b/flow/barito_consumer_service.go index f406a523..ca690280 100644 --- a/flow/barito_consumer_service.go +++ b/flow/barito_consumer_service.go @@ -3,6 +3,7 @@ package flow import ( "context" "fmt" + "github.com/BaritoLog/barito-flow/prome" "strings" "time" @@ -55,6 +56,7 @@ type baritoConsumerService struct { lastNewTopic string isHalt bool elasticRetrierInterval string + elasticRetrierMaxRetry int elasticUsername string elasticPassword string @@ -71,6 +73,7 @@ func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerServi newTopicEventName: params["newTopicEventName"].(string), workerMap: make(map[string]ConsumerWorker), elasticRetrierInterval: params["elasticRetrierInterval"].(string), + elasticRetrierMaxRetry: params["elasticRetrierMaxRetry"].(int), elasticUsername: params["elasticUsername"].(string), elasticPassword: params["elasticPassword"].(string), } @@ -81,6 +84,7 @@ func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerServi s.esClient = &elastic if err != nil { s.logError(errkit.Concat(ErrElasticsearchClient, err)) + prome.IncreaseConsumerElasticsearchClientFailed(prome.ESClientFailedPhaseInit) } return s @@ -210,6 +214,7 @@ func (s *baritoConsumerService) logNewTopic(topic string) { func (s *baritoConsumerService) onElasticRetry(err error) { s.logError(errkit.Concat(ErrElasticsearchClient, err)) + prome.IncreaseConsumerElasticsearchClientFailed(prome.ESClientFailedPhaseRetry) s.HaltAllWorker() } @@ -275,7 +280,7 @@ func (s *baritoConsumerService) HaltAllWorker() { } func (s *baritoConsumerService) elasticRetrier() *ElasticRetrier { - return NewElasticRetrier(timekit.Duration(s.elasticRetrierInterval), s.onElasticRetry) + return NewElasticRetrier(timekit.Duration(s.elasticRetrierInterval), s.elasticRetrierMaxRetry, s.onElasticRetry) } func (s *baritoConsumerService) ResumeWorker() (err error) { diff --git a/flow/barito_consumer_service_test.go b/flow/barito_consumer_service_test.go index 8e5f8d88..70c4f7e7 100644 --- a/flow/barito_consumer_service_test.go +++ b/flow/barito_consumer_service_test.go @@ -1,14 +1,14 @@ package flow import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "net/http" "net/http/httptest" "strings" "testing" "time" - "github.com/BaritoLog/barito-flow/prome" - "github.com/BaritoLog/barito-flow/mock" . "github.com/BaritoLog/go-boilerplate/testkit" "github.com/BaritoLog/go-boilerplate/timekit" @@ -21,10 +21,10 @@ import ( func init() { log.SetLevel(log.ErrorLevel) - prome.InitConsumerInstrumentation() } func TestBaritConsumerService_MakeKafkaAdminError(t *testing.T) { + resetPrometheusMetrics() factory := NewDummyKafkaFactory() factory.Expect_MakeKafkaAdmin_AlwaysError("some-error") @@ -78,6 +78,52 @@ func TestBaritoConsumerService(t *testing.T) { FatalIf(t, !worker.IsStart(), "worker of topic abc_logs is not starting") } +func TestBaritoConsumerService_OnElasticRetry(t *testing.T) { + resetPrometheusMetrics() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + esHandler := &ELasticTestHandler{} + ts := httptest.NewServer(esHandler) + defer ts.Close() + esHandler.CustomHandler = func(w http.ResponseWriter, r *http.Request) { + if r.Method == "HEAD" { // check if index exist + w.WriteHeader(200) + _, _ = w.Write([]byte(`{}`)) + go func() { + ts.Close() + }() + } + } + + factory := NewDummyKafkaFactory() + factory.Expect_MakeKafkaAdmin_ConsumerServiceSuccess(ctrl, []string{"abc_logs"}) + factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl) + + consumerParams := SampleConsumerParams(factory) + consumerParams["elasticRetrierInterval"] = "1ms" + consumerParams["elasticRetrierMaxRetry"] = 5 + consumerParams["elasticUrls"] = []string{ts.URL} + service := NewBaritoConsumerService(consumerParams).(*baritoConsumerService) + + err := service.Start() + FatalIfError(t, err) + defer service.Close() + timberBytes, _ := proto.Marshal(pb.SampleTimberProto()) + + service.onStoreTimber(&sarama.ConsumerMessage{ + Value: timberBytes, + }) + FatalIf(t, service.lastError == nil, "Should return error because ES is stopped") + + expected := ` + # HELP barito_consumer_elasticsearch_client_failed Number of elasticsearch client failed + # TYPE barito_consumer_elasticsearch_client_failed counter + barito_consumer_elasticsearch_client_failed{phase="retry"} 5 + ` + FatalIfError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expected), "barito_consumer_elasticsearch_client_failed")) +} + func TestBaritoConsumerService_SpawnWorkerError(t *testing.T) { ctrl := gomock.NewController(t) @@ -301,7 +347,8 @@ func SampleConsumerParams(factory *dummyKafkaFactory) map[string]interface{} { "kafkaMaxRetry": 1, "kafkaRetryInterval": 10, "newTopicEventName": "", - "elasticRetrierInterval": "", + "elasticRetrierInterval": "1s", + "elasticRetrierMaxRetry": 1, "esConfig": NewEsConfig("SingleInsert", 1, time.Duration(1000), false), "elasticUsername": "", "elasticPassword": "", diff --git a/flow/dummy_kafka_factory.go b/flow/dummy_kafka_factory.go index 1c75f7f7..b48c5dc8 100644 --- a/flow/dummy_kafka_factory.go +++ b/flow/dummy_kafka_factory.go @@ -51,7 +51,7 @@ func (f *dummyKafkaFactory) Expect_MakeClusterConsumer_AlwaysSuccess(ctrl *gomoc consumer.EXPECT().Messages().AnyTimes() consumer.EXPECT().Notifications().AnyTimes() consumer.EXPECT().Errors().AnyTimes() - consumer.EXPECT().Close() + consumer.EXPECT().Close().AnyTimes() return consumer, nil } } @@ -63,7 +63,7 @@ func (f *dummyKafkaFactory) Expect_MakeClusterConsumer_ConsumerSpawnWorkerErrorC consumer.EXPECT().Messages().AnyTimes() consumer.EXPECT().Notifications().AnyTimes() consumer.EXPECT().Errors().AnyTimes() - consumer.EXPECT().Close() + consumer.EXPECT().Close().AnyTimes() return consumer, nil } @@ -80,8 +80,8 @@ func (f *dummyKafkaFactory) Expect_MakeKafkaAdmin_AlwaysError(errMsg string) { func (f *dummyKafkaFactory) Expect_MakeKafkaAdmin_ConsumerServiceSuccess(ctrl *gomock.Controller, topics []string) { f.MakeKafkaAdminFunc = func() (KafkaAdmin, error) { admin := mock.NewMockKafkaAdmin(ctrl) - admin.EXPECT().Topics().Return(topics) - admin.EXPECT().Close() + admin.EXPECT().Topics().Return(topics).AnyTimes() + admin.EXPECT().Close().AnyTimes() return admin, nil } } diff --git a/flow/elastic.go b/flow/elastic.go index df1e496f..9a7a46ee 100644 --- a/flow/elastic.go +++ b/flow/elastic.go @@ -61,6 +61,10 @@ func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, e elastic.SetBasicAuth(elasticUsername, elasticPassword), ) + if err != nil { + return + } + beforeBulkFunc, afterBulkFunc := getCommitCallback() p, err := c.BulkProcessor(). diff --git a/flow/elastic_retrier.go b/flow/elastic_retrier.go index 192cb8c5..d64c15a3 100644 --- a/flow/elastic_retrier.go +++ b/flow/elastic_retrier.go @@ -15,17 +15,18 @@ import ( type ElasticRetrier struct { backoff elastic.Backoff onRetryFunc func(err error) + maxRetry int } -func NewElasticRetrier(t time.Duration, f func(err error)) *ElasticRetrier { +func NewElasticRetrier(t time.Duration, n int, f func(err error)) *ElasticRetrier { return &ElasticRetrier{ backoff: elastic.NewConstantBackoff(t), onRetryFunc: f, + maxRetry: n, } } func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request, resp *http.Response, err error) (time.Duration, bool, error) { - log.Warn(errors.New(fmt.Sprintf("Elasticsearch Retrier #%d", retry))) if err == syscall.ECONNREFUSED { @@ -35,5 +36,10 @@ func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request // Let the backoff strategy decide how long to wait and whether to stop wait, stop := r.backoff.Next(retry) r.onRetryFunc(err) + + // if max retry 0, it will retry forever + if r.maxRetry > 0 && retry >= r.maxRetry { + stop = false + } return wait, stop, nil } diff --git a/flow/elastic_retrier_test.go b/flow/elastic_retrier_test.go index 39ba2ceb..94fd2521 100644 --- a/flow/elastic_retrier_test.go +++ b/flow/elastic_retrier_test.go @@ -6,8 +6,12 @@ import ( "time" ) +const ( + MAX_RETRY = 10 +) + func mockElasticRetrier() *ElasticRetrier { - return NewElasticRetrier(1*time.Second, mockRetrier) + return NewElasticRetrier(1*time.Second, MAX_RETRY, mockRetrier) } func mockRetrier(err error) { @@ -15,7 +19,7 @@ func mockRetrier(err error) { } func TestNewElasticRetrier(t *testing.T) { - r := NewElasticRetrier(1*time.Second, mockRetrier) + r := NewElasticRetrier(1*time.Second, MAX_RETRY, mockRetrier) wait, ok, err := r.Retry(context.TODO(), 1, nil, nil, nil) if want, got := 1*time.Second, wait; want != got { t.Fatalf("expected %v, got %v", want, got) diff --git a/flow/elastic_test_handler.go b/flow/elastic_test_handler.go index b4c6bf2b..4c373188 100644 --- a/flow/elastic_test_handler.go +++ b/flow/elastic_test_handler.go @@ -7,6 +7,7 @@ type ELasticTestHandler struct { CreateAPIStatus int PostAPIStatus int ResponseBody []byte + CustomHandler func(w http.ResponseWriter, r *http.Request) } func (handler *ELasticTestHandler) getResponseBody() (body []byte) { @@ -18,13 +19,17 @@ func (handler *ELasticTestHandler) getResponseBody() (body []byte) { } func (handler *ELasticTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method == "HEAD" { // check if index exist - w.WriteHeader(handler.ExistAPIStatus) - } else if r.Method == "PUT" { // create index - w.WriteHeader(handler.CreateAPIStatus) - w.Write(handler.getResponseBody()) - } else if r.Method == "POST" { // post message - w.WriteHeader(handler.PostAPIStatus) - w.Write(handler.getResponseBody()) + if handler.CustomHandler == nil { + if r.Method == "HEAD" { // check if index exist + w.WriteHeader(handler.ExistAPIStatus) + } else if r.Method == "PUT" { // create index + w.WriteHeader(handler.CreateAPIStatus) + w.Write(handler.getResponseBody()) + } else if r.Method == "POST" { // post message + w.WriteHeader(handler.PostAPIStatus) + w.Write(handler.getResponseBody()) + } + } else { + handler.CustomHandler(w, r) } } diff --git a/flow/grpc_producer.go b/flow/grpc_producer.go index de799ece..40f59f92 100644 --- a/flow/grpc_producer.go +++ b/flow/grpc_producer.go @@ -73,6 +73,7 @@ func (s *producerService) initProducer() (err error) { log.Infof("Retry kafka sync producer successful") } } else { + prome.IncreaseProducerKafkaClientFailed() if (s.kafkaMaxRetry == 0) || (retry < s.kafkaMaxRetry) { log.Warnf("Cannot connect to kafka: %s, retrying in %d seconds", err, s.kafkaRetryInterval) time.Sleep(time.Duration(s.kafkaRetryInterval) * time.Second) @@ -202,7 +203,7 @@ func (s *producerService) Produce(_ context.Context, timber *pb.Timber) (resp *p maxTokenIfNotExist := timber.GetContext().GetAppMaxTps() if s.limiter.IsHitLimit(topic, 1, maxTokenIfNotExist) { err = onLimitExceededGrpc() - prome.IncreaseProducerTPSExceededCounter(topic) + prome.IncreaseProducerTPSExceededCounter(topic, 1) return } @@ -222,9 +223,10 @@ func (s *producerService) ProduceBatch(_ context.Context, timberCollection *pb.T topic := timberCollection.GetContext().GetKafkaTopic() + s.topicSuffix maxTokenIfNotExist := timberCollection.GetContext().GetAppMaxTps() - if s.limiter.IsHitLimit(topic, len(timberCollection.GetItems()), maxTokenIfNotExist) { + lengthMessages := len(timberCollection.GetItems()) + if s.limiter.IsHitLimit(topic, lengthMessages, maxTokenIfNotExist) { err = onLimitExceededGrpc() - prome.IncreaseProducerTPSExceededCounter(topic) + prome.IncreaseProducerTPSExceededCounter(topic, lengthMessages) return } @@ -234,6 +236,7 @@ func (s *producerService) ProduceBatch(_ context.Context, timberCollection *pb.T err = s.handleProduce(timber, topic) if err != nil { + log.Infof("Failed send logs to kafka: %s", err) return } } diff --git a/flow/grpc_producer_test.go b/flow/grpc_producer_test.go index 8bd61a5c..4ff1be37 100644 --- a/flow/grpc_producer_test.go +++ b/flow/grpc_producer_test.go @@ -22,6 +22,7 @@ func resetPrometheusMetrics() { prometheus.DefaultRegisterer = registry prome.InitProducerInstrumentation() + prome.InitConsumerInstrumentation() } func TestProducerService_Produce_OnLimitExceeded(t *testing.T) { @@ -67,7 +68,7 @@ func TestProducerService_ProduceBatch_OnLimitExceeded(t *testing.T) { expected := ` # HELP barito_producer_tps_exceeded_total Number of TPS exceeded event # TYPE barito_producer_tps_exceeded_total counter - barito_producer_tps_exceeded_total{topic="some_topic"} 1 + barito_producer_tps_exceeded_total{topic="some_topic"} 2 ` FatalIfError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expected), "barito_producer_tps_exceeded_total")) } @@ -159,6 +160,8 @@ func TestProducerService_Produce_OnSuccess(t *testing.T) { topicSuffix: "_logs", admin: admin, limiter: limiter, + kafkaMaxRetry: 5, + kafkaRetryInterval: 10, } resp, err := srv.Produce(nil, pb.SampleTimberProto()) @@ -205,6 +208,8 @@ func TestProducerService_ProduceBatch_OnSuccess(t *testing.T) { } func TestProducerService_Start_ErrorMakeSyncProducer(t *testing.T) { + resetPrometheusMetrics() + factory := NewDummyKafkaFactory() factory.Expect_MakeSyncProducerFunc_AlwaysError("some-error") @@ -214,8 +219,8 @@ func TestProducerService_Start_ErrorMakeSyncProducer(t *testing.T) { "restAddr": "rest", "rateLimitResetInterval": 1, "topicSuffix": "_logs", - "kafkaMaxRetry": 1, - "kafkaRetryInterval": 10, + "kafkaMaxRetry": 2, + "kafkaRetryInterval": 1, "newEventTopic": "new_topic_events", } @@ -223,6 +228,12 @@ func TestProducerService_Start_ErrorMakeSyncProducer(t *testing.T) { err := service.Start() FatalIfWrongError(t, err, "Make sync producer failed: Error connecting to kafka, retry limit reached") + expected := ` + # HELP barito_producer_kafka_client_failed Number of client failed to connect to kafka + # TYPE barito_producer_kafka_client_failed counter + barito_producer_kafka_client_failed 2 + ` + FatalIfError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expected), "barito_producer_kafka_client_failed")) } func TestProducerService_Start_ErrorMakeKafkaAdmin(t *testing.T) { diff --git a/prome/prometheus_instrumentation.go b/prome/prometheus_instrumentation.go index ab8558d6..fe21ff9a 100644 --- a/prome/prometheus_instrumentation.go +++ b/prome/prometheus_instrumentation.go @@ -7,12 +7,19 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +const ( + ESClientFailedPhaseInit = "init" + ESClientFailedPhaseRetry = "retry" +) + var consumerLogStoredCounter *prometheus.CounterVec var consumerBulkProcessTimeSecond prometheus.Summary var consumerKafkaMessagesIncomingCounter *prometheus.CounterVec +var consumerElasticsearchClientFailed *prometheus.CounterVec var producerKafkaMessageStoredTotal *prometheus.CounterVec var producerTPSExceededCounter *prometheus.CounterVec var producerSendToKafkaTimeSecond *prometheus.SummaryVec +var producerKafkaClientFailed *prometheus.CounterVec func InitConsumerInstrumentation() { consumerLogStoredCounter = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -28,6 +35,10 @@ func InitConsumerInstrumentation() { Name: "barito_consumer_kafka_message_incoming_total", Help: "Number of messages incoming from kafka", }, []string{"topic"}) + consumerElasticsearchClientFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "barito_consumer_elasticsearch_client_failed", + Help: "Number of elasticsearch client failed", + }, []string{"phase"}) } func InitProducerInstrumentation() { @@ -39,6 +50,10 @@ func InitProducerInstrumentation() { Name: "barito_producer_tps_exceeded_total", Help: "Number of TPS exceeded event", }, []string{"topic"}) + producerKafkaClientFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "barito_producer_kafka_client_failed", + Help: "Number of client failed to connect to kafka", + }, []string{}) producerSendToKafkaTimeSecond = promauto.NewSummaryVec(prometheus.SummaryOpts{ Name: "barito_producer_send_to_kafka_time_second", Help: "Send to Kafka time in second", @@ -58,6 +73,10 @@ func ObserveBulkProcessTime(elapsedTime float64) { consumerBulkProcessTimeSecond.Observe(elapsedTime) } +func IncreaseConsumerElasticsearchClientFailed(phase string) { + consumerElasticsearchClientFailed.WithLabelValues(phase).Inc() +} + func IncreaseKafkaMessagesStoredTotal(topic string) { producerKafkaMessageStoredTotal.WithLabelValues(topic, "").Inc() } @@ -66,10 +85,14 @@ func IncreaseKafkaMessagesStoredTotalWithError(topic string, errorType string) { producerKafkaMessageStoredTotal.WithLabelValues(topic, errorType).Inc() } -func IncreaseProducerTPSExceededCounter(topic string) { - producerTPSExceededCounter.WithLabelValues(topic).Inc() +func IncreaseProducerTPSExceededCounter(topic string, n int) { + producerTPSExceededCounter.WithLabelValues(topic).Add(float64(n)) } func ObserveSendToKafkaTime(topic string, elapsedTime float64) { producerSendToKafkaTimeSecond.WithLabelValues(topic).Observe(elapsedTime) } + +func IncreaseProducerKafkaClientFailed() { + producerKafkaClientFailed.WithLabelValues().Inc() +}