diff --git a/CHANGELOG.md b/CHANGELOG.md index 43dfc934..5120044f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed +## [0.13.3] +### Changed +- Make barito-flow-consumer compatible with both ES 6.x and ES 7.x +- Add basic auth for xpack-security feature +- Remove index settings when creating new index +- Set document type to `_doc` ## [0.13.2] ### Changed diff --git a/cmds/action.go b/cmds/action.go index a066e7ff..4038ce63 100644 --- a/cmds/action.go +++ b/cmds/action.go @@ -2,9 +2,10 @@ package cmds import ( "fmt" - "github.com/BaritoLog/barito-flow/prome" "time" + "github.com/BaritoLog/barito-flow/prome" + "github.com/BaritoLog/barito-flow/flow" "github.com/BaritoLog/go-boilerplate/srvkit" "github.com/BaritoLog/go-boilerplate/timekit" @@ -35,6 +36,8 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) { esBulkSize := configEsBulkSize() esFlushIntervalMs := configEsFlushIntervalMs() printTPS := configPrintTPS() + elasticUsername := configElasticUsername() + elasticPassword := configElasticPassword() config := sarama.NewConfig() config.Version = sarama.V0_10_2_1 // TODO: get version from env @@ -63,6 +66,8 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) { "newTopicEventName": newTopicEventName, "elasticRetrierInterval": elasticRetrierInterval, "esConfig": esConfig, + "elasticUsername": elasticUsername, + "elasticPassword": elasticPassword, } service := flow.NewBaritoConsumerService(consumerParams) diff --git a/cmds/config.go b/cmds/config.go index 05a4388a..924d98e1 100644 --- a/cmds/config.go +++ b/cmds/config.go @@ -39,6 +39,9 @@ const ( EnvConsumerRebalancingStrategy = "BARITO_CONSUMER_REBALANCING_STRATEGY" EnvPrintTPS = "BARITO_PRINT_TPS" + + EnvElasticUsername = "ELASTIC_USERNAME" + EnvElasticPassword = "ELASTIC_PASSWORD" ) var ( @@ -71,6 +74,9 @@ var ( DefaultEsFlushIntervalMs = 500 DefaultPrintTPS = "false" + + DefaultElasticUsername = "" + DefaultElasticPassword = "" ) func configKafkaBrokers() (brokers []string) { @@ -93,9 +99,9 @@ func configKafkaBrokers() (brokers []string) { } func configElasticsearchUrls() (urls []string) { - urls = sliceEnvOrDefault(EnvElasticsearchUrls, ",", []string{}) + urls = sliceEnvOrDefault(EnvElasticsearchUrls, ",", []string{}) - if len(urls) > 0 { + if len(urls) > 0 { return } @@ -201,6 +207,14 @@ func configPrintTPS() bool { return (stringEnvOrDefault(EnvPrintTPS, DefaultPrintTPS) == "true") } +func configElasticUsername() (s string) { + return stringEnvOrDefault(EnvElasticUsername, DefaultElasticUsername) +} + +func configElasticPassword() (s string) { + return stringEnvOrDefault(EnvElasticPassword, DefaultElasticPassword) +} + func stringEnvOrDefault(key, defaultValue string) string { s := os.Getenv(key) if len(s) > 0 { diff --git a/es/index.go b/es/index.go deleted file mode 100644 index 2cf40f07..00000000 --- a/es/index.go +++ /dev/null @@ -1,20 +0,0 @@ -// For elasticsearch 6.2: https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html -package es - -type Index struct { - Template string `json:"template,omitempty"` - Version int `json:"version,omitempty"` - Settings map[string]interface{} `json:"settings,omitempty"` - Doc *Mappings `json:"_doc,omitempty"` -} - -// NewESIndex -func NewIndex() *Index { - return &Index{ - Settings: make(map[string]interface{}), - } -} - -func (es *Index) AddSetting(name string, value interface{}) { - es.Settings[name] = value -} diff --git a/es/mapping.go b/es/mapping.go deleted file mode 100644 index 3a34374e..00000000 --- a/es/mapping.go +++ /dev/null @@ -1,32 +0,0 @@ -package es - -type Mappings struct { - DynamicTemplates []map[string]MatchConditions `json:"dynamic_templates"` - Properties map[string]Property `json:"properties"` -} - -type Property struct { - Type string `json:"type"` -} - -// NewESMappings -func NewMappings() *Mappings { - return &Mappings{ - Properties: make(map[string]Property), - } -} - -// AddDynamicTemplate -func (es *Mappings) AddDynamicTemplate(name string, matchCondition MatchConditions) *Mappings { - m := make(map[string]MatchConditions) - m[name] = matchCondition - - es.DynamicTemplates = append(es.DynamicTemplates, m) - return es -} - -// AddPropertyWithType -func (es *Mappings) AddPropertyWithType(name, typ string) *Mappings { - es.Properties[name] = Property{Type: typ} - return es -} diff --git a/es/match_conditions.go b/es/match_conditions.go deleted file mode 100644 index e789c62a..00000000 --- a/es/match_conditions.go +++ /dev/null @@ -1,20 +0,0 @@ -package es - -// ESDynamicTemplate https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html -type MatchConditions struct { - Match string `json:"match,omitempty"` - PathMatch string `json:"patch_match,omitempty"` - MatchMappingType string `json:"match_mapping_type,omitempty"` - Mapping MatchMapping `json:"mapping,omitempty"` -} - -type MatchMapping struct { - Type string `json:"type"` - Norms bool `json:"norms"` - Fields map[string]Field `json:"fields,omitempty"` -} - -type Field struct { - Type string `json:"type"` - IgnoreAbove int `json:"ignore_above,omitempty"` -} diff --git a/flow/barito_consumer_service.go b/flow/barito_consumer_service.go index 84b28ebd..f406a523 100644 --- a/flow/barito_consumer_service.go +++ b/flow/barito_consumer_service.go @@ -55,6 +55,9 @@ type baritoConsumerService struct { lastNewTopic string isHalt bool elasticRetrierInterval string + + elasticUsername string + elasticPassword string } func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerService { @@ -68,11 +71,13 @@ func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerServi newTopicEventName: params["newTopicEventName"].(string), workerMap: make(map[string]ConsumerWorker), elasticRetrierInterval: params["elasticRetrierInterval"].(string), + elasticUsername: params["elasticUsername"].(string), + elasticPassword: params["elasticPassword"].(string), } retrier := s.elasticRetrier() esConfig := params["esConfig"].(esConfig) - elastic, err := NewElastic(retrier, esConfig, s.elasticUrls) + elastic, err := NewElastic(retrier, esConfig, s.elasticUrls, s.elasticUsername, s.elasticPassword) s.esClient = &elastic if err != nil { s.logError(errkit.Concat(ErrElasticsearchClient, err)) diff --git a/flow/barito_consumer_service_test.go b/flow/barito_consumer_service_test.go index 25ad7b61..8e5f8d88 100644 --- a/flow/barito_consumer_service_test.go +++ b/flow/barito_consumer_service_test.go @@ -125,13 +125,15 @@ func TestBaritoConsumerService_onStoreTimber_ErrorStore(t *testing.T) { defer ts.Close() elasticUrls := []string{ts.URL} + elasticUsername := "" + elasticPassword := "" service := &baritoConsumerService{ elasticUrls: elasticUrls, } retrier := service.elasticRetrier() esConfig := NewEsConfig("SingleInsert", 1, time.Duration(1000), false) - elastic, _ := NewElastic(retrier, esConfig, elasticUrls) + elastic, _ := NewElastic(retrier, esConfig, elasticUrls, elasticUsername, elasticPassword) service.esClient = &elastic timberBytes, _ := proto.Marshal(pb.SampleTimberProto()) @@ -151,9 +153,12 @@ func TestBaritoConsumerService_onStoreTimber(t *testing.T) { elasticUrls: elasticUrls, } + elasticUsername := "" + elasticPassword := "" + retrier := service.elasticRetrier() esConfig := NewEsConfig("SingleInsert", 1, time.Duration(1000), false) - elastic, _ := NewElastic(retrier, esConfig, elasticUrls) + elastic, _ := NewElastic(retrier, esConfig, elasticUrls, elasticUsername, elasticPassword) service.esClient = &elastic timberBytes, _ := proto.Marshal(pb.SampleTimberProto()) @@ -298,5 +303,7 @@ func SampleConsumerParams(factory *dummyKafkaFactory) map[string]interface{} { "newTopicEventName": "", "elasticRetrierInterval": "", "esConfig": NewEsConfig("SingleInsert", 1, time.Duration(1000), false), + "elasticUsername": "", + "elasticPassword": "", } } diff --git a/flow/elastic.go b/flow/elastic.go index 81e911b3..df1e496f 100644 --- a/flow/elastic.go +++ b/flow/elastic.go @@ -3,11 +3,11 @@ package flow import ( "context" "fmt" + "github.com/BaritoLog/barito-flow/prome" "time" - "github.com/BaritoLog/barito-flow/es" "github.com/golang/protobuf/jsonpb" "github.com/olivere/elastic" log "github.com/sirupsen/logrus" @@ -16,6 +16,10 @@ import ( var counter int = 0 +const ( + DEFAULT_ELASTIC_DOCUMENT_TYPE = "_doc" +) + type Elastic interface { OnFailure(f func(*pb.Timber)) Store(ctx context.Context, timber pb.Timber) error @@ -47,13 +51,14 @@ func NewEsConfig(indexMethod string, bulkSize int, flushMs time.Duration, printT } } -func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string) (client elasticClient, err error) { +func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, elasticUsername string, elasticPassword string) (client elasticClient, err error) { c, err := elastic.NewClient( elastic.SetURL(urls...), elastic.SetSniff(false), elastic.SetHealthcheck(false), elastic.SetRetrier(retrierFunc), + elastic.SetBasicAuth(elasticUsername, elasticPassword), ) beforeBulkFunc, afterBulkFunc := getCommitCallback() @@ -124,16 +129,13 @@ func printThroughputPerSecond() { func (e *elasticClient) Store(ctx context.Context, timber pb.Timber) (err error) { indexPrefix := timber.GetContext().GetEsIndexPrefix() indexName := fmt.Sprintf("%s-%s", indexPrefix, time.Now().Format("2006.01.02")) - documentType := timber.GetContext().GetEsDocumentType() + documentType := DEFAULT_ELASTIC_DOCUMENT_TYPE appSecret := timber.GetContext().GetAppSecret() - exists, _ := e.client.IndexExists(indexName).Do(ctx) if !exists { log.Warnf("ES index '%s' is not exist", indexName) - index := elasticCreateIndex(indexPrefix) _, err = e.client.CreateIndex(indexName). - BodyJson(index). Do(ctx) instruESCreateIndex(err) if err != nil { @@ -171,38 +173,3 @@ func (e *elasticClient) singleInsert(ctx context.Context, indexName, documentTyp Do(ctx) return } - -func elasticCreateIndex(indexPrefix string) *es.Index { - - return &es.Index{ - Template: fmt.Sprintf("%s-*", indexPrefix), - Version: 60001, - Settings: map[string]interface{}{ - "index.refresh_interval": "5s", - }, - Doc: es.NewMappings(). - AddDynamicTemplate("message_field", es.MatchConditions{ - PathMatch: "@message", - MatchMappingType: "string", - Mapping: es.MatchMapping{ - Type: "text", - Norms: false, - }, - }). - AddDynamicTemplate("string_fields", es.MatchConditions{ - Match: "*", - MatchMappingType: "string", - Mapping: es.MatchMapping{ - Type: "text", - Norms: false, - Fields: map[string]es.Field{ - "keyword": es.Field{ - Type: "keyword", - IgnoreAbove: 256, - }, - }, - }, - }). - AddPropertyWithType("@timestamp", "date"), - } -} diff --git a/flow/elastic_test.go b/flow/elastic_test.go index c79fe2b0..eab7f5bc 100644 --- a/flow/elastic_test.go +++ b/flow/elastic_test.go @@ -16,6 +16,11 @@ import ( pb "github.com/vwidjaya/barito-proto/producer" ) +const ( + BARITO_DEFAULT_USERNAME = "" + BARITO_DEFAULT_PASSWORD = "" +) + func TestElasticStore_CreateIndexError(t *testing.T) { defer instru.Flush() @@ -30,7 +35,7 @@ func TestElasticStore_CreateIndexError(t *testing.T) { retrier := mockElasticRetrier() esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false) - client, err := NewElastic(retrier, esConfig, []string{ts.URL}) + client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD) FatalIfError(t, err) err = client.Store(context.Background(), timber) @@ -52,7 +57,7 @@ func TestElasticStore_CreateindexSuccess(t *testing.T) { retrier := mockElasticRetrier() esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false) - client, err := NewElastic(retrier, esConfig, []string{ts.URL}) + client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD) FatalIfError(t, err) appSecret := timber.GetContext().GetAppSecret() @@ -77,7 +82,7 @@ func TestElasticStoreman_store_SaveError(t *testing.T) { retrier := mockElasticRetrier() esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false) - client, err := NewElastic(retrier, esConfig, []string{ts.URL}) + client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD) FatalIfError(t, err) appSecret := timber.GetContext().GetAppSecret() @@ -137,7 +142,7 @@ func TestElasticStore_ExportMetrics(t *testing.T) { retrier := mockElasticRetrier() esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false) - client, err := NewElastic(retrier, esConfig, []string{ts.URL}) + client, err := NewElastic(retrier, esConfig, []string{ts.URL}, BARITO_DEFAULT_USERNAME, BARITO_DEFAULT_PASSWORD) FatalIfError(t, err) err = client.Store(context.Background(), timber) diff --git a/main.go b/main.go index a02eee8c..c388eef8 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,7 @@ import ( const ( Name = "barito-flow" - Version = "0.13.2" + Version = "0.13.3" ) var (