Skip to content

Commit

Permalink
Configure Barito-flow Consumer to be compatible with ES 6.X and ES 7.X (
Browse files Browse the repository at this point in the history
#44)

* setup basicAuth for elasticClient

Co-authored-by: galuhest <[email protected]>

* remove template and version from index API

Co-authored-by: Fadli <[email protected]>

* rollback remove Template and Version

Co-authored-by: Fadli <[email protected]>

* use es7 API

Co-authored-by: Fadli <[email protected]>

* rollback remove Template and version in elastic.go

Co-authored-by: Fadli <[email protected]>

* roll dynamic_mappings inside mappings

Co-authored-by: Fadli <[email protected]>

* patch_match -> path_match

Co-authored-by: Fadli <[email protected]>

* remove check version

Co-authored-by: Fadli <[email protected]>

* consumer should not set indices config anymore

Co-authored-by: Fadli <[email protected]>

* es and es7 packages are not used anymore

Co-authored-by: Fadli <[email protected]>

* remove documentType from parameter

Co-authored-by: Fadli <[email protected]>

* remove es and es7 from dependency

Co-authored-by: Fadli <[email protected]>

* documentType is set as _doc

Co-authored-by: Fadli <[email protected]>

* bump version to 13.3, list changes in CHANGELOG

Co-authored-by: Fadli <[email protected]>

* put default username and password to variables

Co-authored-by: Fadli <[email protected]>

* put default username and password to contant, and revert remove documentType chang it to contant instead

Co-authored-by: Fadli <[email protected]>

* put documentType to constant

Co-authored-by: Fadli <[email protected]>

* create const for  username and password

Co-authored-by: galuhest <[email protected]>

Co-authored-by: Fadli Nurhasan <[email protected]>
  • Loading branch information
galuhest and fadlinurhasan authored Apr 14, 2020
1 parent 46cfaed commit 67fc035
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 124 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Changed
## [0.13.3]
### Changed
- Make barito-flow-consumer compatible with both ES 6.x and ES 7.x
- Add basic auth for xpack-security feature
- Remove index settings when creating new index
- Set document type to `_doc`

## [0.13.2]
### Changed
Expand Down
7 changes: 6 additions & 1 deletion cmds/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package cmds

import (
"fmt"
"github.com/BaritoLog/barito-flow/prome"
"time"

"github.com/BaritoLog/barito-flow/prome"

"github.com/BaritoLog/barito-flow/flow"
"github.com/BaritoLog/go-boilerplate/srvkit"
"github.com/BaritoLog/go-boilerplate/timekit"
Expand Down Expand Up @@ -35,6 +36,8 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) {
esBulkSize := configEsBulkSize()
esFlushIntervalMs := configEsFlushIntervalMs()
printTPS := configPrintTPS()
elasticUsername := configElasticUsername()
elasticPassword := configElasticPassword()

config := sarama.NewConfig()
config.Version = sarama.V0_10_2_1 // TODO: get version from env
Expand Down Expand Up @@ -63,6 +66,8 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) {
"newTopicEventName": newTopicEventName,
"elasticRetrierInterval": elasticRetrierInterval,
"esConfig": esConfig,
"elasticUsername": elasticUsername,
"elasticPassword": elasticPassword,
}

service := flow.NewBaritoConsumerService(consumerParams)
Expand Down
18 changes: 16 additions & 2 deletions cmds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
EnvConsumerRebalancingStrategy = "BARITO_CONSUMER_REBALANCING_STRATEGY"

EnvPrintTPS = "BARITO_PRINT_TPS"

EnvElasticUsername = "ELASTIC_USERNAME"
EnvElasticPassword = "ELASTIC_PASSWORD"
)

var (
Expand Down Expand Up @@ -71,6 +74,9 @@ var (
DefaultEsFlushIntervalMs = 500

DefaultPrintTPS = "false"

DefaultElasticUsername = ""
DefaultElasticPassword = ""
)

func configKafkaBrokers() (brokers []string) {
Expand All @@ -93,9 +99,9 @@ func configKafkaBrokers() (brokers []string) {
}

func configElasticsearchUrls() (urls []string) {
urls = sliceEnvOrDefault(EnvElasticsearchUrls, ",", []string{})
urls = sliceEnvOrDefault(EnvElasticsearchUrls, ",", []string{})

if len(urls) > 0 {
if len(urls) > 0 {
return
}

Expand Down Expand Up @@ -201,6 +207,14 @@ func configPrintTPS() bool {
return (stringEnvOrDefault(EnvPrintTPS, DefaultPrintTPS) == "true")
}

func configElasticUsername() (s string) {
return stringEnvOrDefault(EnvElasticUsername, DefaultElasticUsername)
}

func configElasticPassword() (s string) {
return stringEnvOrDefault(EnvElasticPassword, DefaultElasticPassword)
}

func stringEnvOrDefault(key, defaultValue string) string {
s := os.Getenv(key)
if len(s) > 0 {
Expand Down
20 changes: 0 additions & 20 deletions es/index.go

This file was deleted.

32 changes: 0 additions & 32 deletions es/mapping.go

This file was deleted.

20 changes: 0 additions & 20 deletions es/match_conditions.go

This file was deleted.

7 changes: 6 additions & 1 deletion flow/barito_consumer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type baritoConsumerService struct {
lastNewTopic string
isHalt bool
elasticRetrierInterval string

elasticUsername string
elasticPassword string
}

func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerService {
Expand All @@ -68,11 +71,13 @@ func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerServi
newTopicEventName: params["newTopicEventName"].(string),
workerMap: make(map[string]ConsumerWorker),
elasticRetrierInterval: params["elasticRetrierInterval"].(string),
elasticUsername: params["elasticUsername"].(string),
elasticPassword: params["elasticPassword"].(string),
}

retrier := s.elasticRetrier()
esConfig := params["esConfig"].(esConfig)
elastic, err := NewElastic(retrier, esConfig, s.elasticUrls)
elastic, err := NewElastic(retrier, esConfig, s.elasticUrls, s.elasticUsername, s.elasticPassword)
s.esClient = &elastic
if err != nil {
s.logError(errkit.Concat(ErrElasticsearchClient, err))
Expand Down
11 changes: 9 additions & 2 deletions flow/barito_consumer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ func TestBaritoConsumerService_onStoreTimber_ErrorStore(t *testing.T) {
defer ts.Close()

elasticUrls := []string{ts.URL}
elasticUsername := ""
elasticPassword := ""
service := &baritoConsumerService{
elasticUrls: elasticUrls,
}

retrier := service.elasticRetrier()
esConfig := NewEsConfig("SingleInsert", 1, time.Duration(1000), false)
elastic, _ := NewElastic(retrier, esConfig, elasticUrls)
elastic, _ := NewElastic(retrier, esConfig, elasticUrls, elasticUsername, elasticPassword)
service.esClient = &elastic

timberBytes, _ := proto.Marshal(pb.SampleTimberProto())
Expand All @@ -151,9 +153,12 @@ func TestBaritoConsumerService_onStoreTimber(t *testing.T) {
elasticUrls: elasticUrls,
}

elasticUsername := ""
elasticPassword := ""

retrier := service.elasticRetrier()
esConfig := NewEsConfig("SingleInsert", 1, time.Duration(1000), false)
elastic, _ := NewElastic(retrier, esConfig, elasticUrls)
elastic, _ := NewElastic(retrier, esConfig, elasticUrls, elasticUsername, elasticPassword)
service.esClient = &elastic

timberBytes, _ := proto.Marshal(pb.SampleTimberProto())
Expand Down Expand Up @@ -298,5 +303,7 @@ func SampleConsumerParams(factory *dummyKafkaFactory) map[string]interface{} {
"newTopicEventName": "",
"elasticRetrierInterval": "",
"esConfig": NewEsConfig("SingleInsert", 1, time.Duration(1000), false),
"elasticUsername": "",
"elasticPassword": "",
}
}
49 changes: 8 additions & 41 deletions flow/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package flow
import (
"context"
"fmt"

"github.com/BaritoLog/barito-flow/prome"

"time"

"github.com/BaritoLog/barito-flow/es"
"github.com/golang/protobuf/jsonpb"
"github.com/olivere/elastic"
log "github.com/sirupsen/logrus"
Expand All @@ -16,6 +16,10 @@ import (

var counter int = 0

const (
DEFAULT_ELASTIC_DOCUMENT_TYPE = "_doc"
)

type Elastic interface {
OnFailure(f func(*pb.Timber))
Store(ctx context.Context, timber pb.Timber) error
Expand Down Expand Up @@ -47,13 +51,14 @@ func NewEsConfig(indexMethod string, bulkSize int, flushMs time.Duration, printT
}
}

func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string) (client elasticClient, err error) {
func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, elasticUsername string, elasticPassword string) (client elasticClient, err error) {

c, err := elastic.NewClient(
elastic.SetURL(urls...),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetRetrier(retrierFunc),
elastic.SetBasicAuth(elasticUsername, elasticPassword),
)

beforeBulkFunc, afterBulkFunc := getCommitCallback()
Expand Down Expand Up @@ -124,16 +129,13 @@ func printThroughputPerSecond() {
func (e *elasticClient) Store(ctx context.Context, timber pb.Timber) (err error) {
indexPrefix := timber.GetContext().GetEsIndexPrefix()
indexName := fmt.Sprintf("%s-%s", indexPrefix, time.Now().Format("2006.01.02"))
documentType := timber.GetContext().GetEsDocumentType()
documentType := DEFAULT_ELASTIC_DOCUMENT_TYPE
appSecret := timber.GetContext().GetAppSecret()

exists, _ := e.client.IndexExists(indexName).Do(ctx)

if !exists {
log.Warnf("ES index '%s' is not exist", indexName)
index := elasticCreateIndex(indexPrefix)
_, err = e.client.CreateIndex(indexName).
BodyJson(index).
Do(ctx)
instruESCreateIndex(err)
if err != nil {
Expand Down Expand Up @@ -171,38 +173,3 @@ func (e *elasticClient) singleInsert(ctx context.Context, indexName, documentTyp
Do(ctx)
return
}

func elasticCreateIndex(indexPrefix string) *es.Index {

return &es.Index{
Template: fmt.Sprintf("%s-*", indexPrefix),
Version: 60001,
Settings: map[string]interface{}{
"index.refresh_interval": "5s",
},
Doc: es.NewMappings().
AddDynamicTemplate("message_field", es.MatchConditions{
PathMatch: "@message",
MatchMappingType: "string",
Mapping: es.MatchMapping{
Type: "text",
Norms: false,
},
}).
AddDynamicTemplate("string_fields", es.MatchConditions{
Match: "*",
MatchMappingType: "string",
Mapping: es.MatchMapping{
Type: "text",
Norms: false,
Fields: map[string]es.Field{
"keyword": es.Field{
Type: "keyword",
IgnoreAbove: 256,
},
},
},
}).
AddPropertyWithType("@timestamp", "date"),
}
}
13 changes: 9 additions & 4 deletions flow/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
pb "github.com/vwidjaya/barito-proto/producer"
)

const (
BARITO_DEFAULT_USERNAME = ""
BARITO_DEFAULT_PASSWORD = ""
)

func TestElasticStore_CreateIndexError(t *testing.T) {
defer instru.Flush()

Expand All @@ -30,7 +35,7 @@ func TestElasticStore_CreateIndexError(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD)
FatalIfError(t, err)

err = client.Store(context.Background(), timber)
Expand All @@ -52,7 +57,7 @@ func TestElasticStore_CreateindexSuccess(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD)
FatalIfError(t, err)

appSecret := timber.GetContext().GetAppSecret()
Expand All @@ -77,7 +82,7 @@ func TestElasticStoreman_store_SaveError(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD)
FatalIfError(t, err)

appSecret := timber.GetContext().GetAppSecret()
Expand Down Expand Up @@ -137,7 +142,7 @@ func TestElasticStore_ExportMetrics(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD)
FatalIfError(t, err)

err = client.Store(context.Background(), timber)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const (
Name = "barito-flow"
Version = "0.13.2"
Version = "0.13.3"
)

var (
Expand Down

0 comments on commit 67fc035

Please sign in to comment.