Skip to content

Commit

Permalink
Merge pull request #35 from BaritoLog/issue-#34
Browse files Browse the repository at this point in the history
Issue #34
  • Loading branch information
ekarisky authored Aug 30, 2019
2 parents 34f7c85 + 72cd74b commit cfd8569
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 51 deletions.
4 changes: 2 additions & 2 deletions cmds/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) {

brokers := configKafkaBrokers()
groupID := configKafkaGroupId()
esUrl := configElasticsearchUrl()
esUrls := configElasticsearchUrls()
topicSuffix := configKafkaTopicSuffix()
kafkaMaxRetry := configKafkaMaxRetry()
kafkaRetryInterval := configKafkaRetryInterval()
Expand Down Expand Up @@ -53,7 +53,7 @@ func ActionBaritoConsumerService(c *cli.Context) (err error) {
service := flow.NewBaritoConsumerService(
factory,
groupID,
esUrl,
esUrls,
topicSuffix,
kafkaMaxRetry,
kafkaRetryInterval,
Expand Down
13 changes: 7 additions & 6 deletions cmds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
EnvKafkaMaxRetry = "BARITO_KAFKA_MAX_RETRY"
EnvKafkaRetryInterval = "BARITO_KAFKA_RETRY_INTERVAL"

EnvElasticsearchUrl = "BARITO_ELASTICSEARCH_URL"
EnvElasticsearchUrls = "BARITO_ELASTICSEARCH_URLS"
EnvEsIndexMethod = "BARITO_ELASTICSEARCH_INDEX_METHOD"
EnvEsBulkSize = "BARITO_ELASTICSEARCH_BULK_SIZE"
EnvEsFlushIntervalMs = "BARITO_ELASTICSEARCH_FLUSH_INTERVAL_MS"
Expand Down Expand Up @@ -49,7 +49,7 @@ var (
DefaultKafkaMaxRetry = 0
DefaultKafkaRetryInterval = 10

DefaultElasticsearchUrl = "http://localhost:9200"
DefaultElasticsearchUrls = []string{"http://localhost:9200"}

DefaultPushMetricUrl = ""
DefaultPushMetricInterval = "30s"
Expand Down Expand Up @@ -82,16 +82,17 @@ func configKafkaBrokers() (brokers []string) {
return
}

func configElasticsearchUrl() (url string) {
func configElasticsearchUrls() (urls []string) {
consulUrl := configConsulUrl()
name := configConsulElasticsearchName()
url, err := consulElasticsearchUrl(consulUrl, name)
urls, err := consulElasticsearchUrl(consulUrl, name)

if err != nil {
url = stringEnvOrDefault(EnvElasticsearchUrl, DefaultElasticsearchUrl)
urls = sliceEnvOrDefault(EnvElasticsearchUrls, ",", DefaultElasticsearchUrls)
return
}

logConfig("consul", EnvElasticsearchUrl, url)
logConfig("consul", EnvElasticsearchUrls, urls)
return
}

Expand Down
28 changes: 16 additions & 12 deletions cmds/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"testing"

"github.com/BaritoLog/go-boilerplate/slicekit"
. "github.com/BaritoLog/go-boilerplate/testkit"
log "github.com/sirupsen/logrus"
)
Expand All @@ -12,14 +13,15 @@ func init() {
log.SetLevel(log.ErrorLevel)
}

// func TestGetKafkaBrokers(t *testing.T) {
// FatalIf(t, !slicekit.StringSliceEqual(getKafkaBrokers(), DefaultKafkaBrokers), "should return default ")
//
// os.Setenv(EnvKafkaBrokers, "kafka-broker-1:1278,kafka-broker-2:1288")
// defer os.Clearenv()
//
// FatalIf(t, !slicekit.StringSliceEqual(getKafkaBrokers(), []string{"kafka-broker-1:1278", "kafka-broker-2:1288"}), "should return default ")
// }
func TestGetKafkaBrokers(t *testing.T) {
FatalIf(t, !slicekit.StringSliceEqual(configKafkaBrokers(), DefaultKafkaBrokers), "should return default")

os.Setenv(EnvKafkaBrokers, "some-kafka-01:9092, some-kafka-02:9092")
defer os.Clearenv()

envKafkaBrokers := sliceEnvOrDefault(EnvKafkaBrokers, ",", DefaultKafkaBrokers)
FatalIf(t, !slicekit.StringSliceEqual(configKafkaBrokers(), envKafkaBrokers), "should get from env variable")
}

func TestGetConsulElastisearchName(t *testing.T) {
FatalIf(t, configConsulElasticsearchName() != DefaultConsulElasticsearchName, "should return default ")
Expand All @@ -29,12 +31,14 @@ func TestGetConsulElastisearchName(t *testing.T) {
FatalIf(t, configConsulElasticsearchName() != "elastic11", "should get from env variable")
}

func TestGetElasticsearchUrl(t *testing.T) {
FatalIf(t, configElasticsearchUrl() != DefaultElasticsearchUrl, "should return default ")
func TestGetElasticsearchUrls(t *testing.T) {
FatalIf(t, !slicekit.StringSliceEqual(configElasticsearchUrls(), DefaultElasticsearchUrls), "should return default")

os.Setenv(EnvElasticsearchUrl, "http://some-elasticsearch")
os.Setenv(EnvElasticsearchUrls, "http://some-elasticsearch-01:9200, http://some-elasticsearch-02:9200")
defer os.Clearenv()
FatalIf(t, configElasticsearchUrl() != "http://some-elasticsearch", "should get from env variable")

envEsUrls := sliceEnvOrDefault(EnvElasticsearchUrls, ",", DefaultElasticsearchUrls)
FatalIf(t, !slicekit.StringSliceEqual(configElasticsearchUrls(), envEsUrls), "should get from env variable")
}

func TestGetKafkaGroupID(t *testing.T) {
Expand Down
17 changes: 9 additions & 8 deletions cmds/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func consulKafkaBroker(address, name string) (brokers []string, err error) {
return
}

func consulElasticsearchUrl(address, name string) (url string, err error) {
func consulElasticsearchUrl(address, name string) (urls []string, err error) {
client, err := consulClient(address)
if err != nil {
return
Expand All @@ -37,14 +37,15 @@ func consulElasticsearchUrl(address, name string) (url string, err error) {
return
}

srvAddress := services[0].ServiceAddress
srvPort := services[0].ServicePort
srvSchema, ok := services[0].ServiceMeta["http_schema"]
if !ok {
srvSchema = "http"
for _, service := range services {
srvAddress := service.ServiceAddress
srvPort := service.ServicePort
srvSchema, ok := service.ServiceMeta["http_schema"]
if !ok {
srvSchema = "http"
}
urls = append(urls, fmt.Sprintf("%s://%s:%d", srvSchema, srvAddress, srvPort))
}

url = fmt.Sprintf("%s://%s:%d", srvSchema, srvAddress, srvPort)
return
}

Expand Down
27 changes: 22 additions & 5 deletions cmds/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,35 @@ import (
func TestConsulElasticsearch(t *testing.T) {
ts := NewTestServer(http.StatusOK, []byte(`[
{
"ServiceAddress": "172.17.0.3",
"ServiceAddress": "172.17.0.1",
"ServicePort": 5000,
"ServiceMeta": {
"http_schema": "https"
}
},
{
"ServiceAddress": "172.17.0.2",
"ServicePort": 5001,
"ServiceMeta": {
"http_schema": "https"
}
},
{
"ServiceAddress": "172.17.0.3",
"ServicePort": 5002,
"ServiceMeta": {
"http_schema": "https"
}
}
]`))
defer ts.Close()

url, err := consulElasticsearchUrl(ts.URL, "name")
urls, err := consulElasticsearchUrl(ts.URL, "name")
FatalIfError(t, err)
FatalIf(t, url != "https://172.17.0.3:5000", "wrong url")
FatalIf(t, len(urls) != 3, "return wrong urls")
FatalIf(t, urls[0] != "https://172.17.0.1:5000", "return wrong urls[0]")
FatalIf(t, urls[1] != "https://172.17.0.2:5001", "return wrong urls[1]")
FatalIf(t, urls[2] != "https://172.17.0.3:5002", "return wrong urls[2]")
}

func TestConsulElasticsearch_NoHttpSchema(t *testing.T) {
Expand All @@ -34,9 +51,9 @@ func TestConsulElasticsearch_NoHttpSchema(t *testing.T) {
]`))
defer ts.Close()

url, err := consulElasticsearchUrl(ts.URL, "name")
urls, err := consulElasticsearchUrl(ts.URL, "name")
FatalIfError(t, err)
FatalIf(t, url != "http://172.17.0.3:5000", "wrong url")
FatalIf(t, urls[0] != "http://172.17.0.3:5000", "wrong url[0]")
}

func TestConsulElasticsearch_NoService(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions flow/barito_consumer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type BaritoConsumerService interface {
type baritoConsumerService struct {
factory KafkaFactory
groupID string
elasticUrl string
elasticUrls []string
esClient *elasticClient
topicSuffix string
kafkaMaxRetry int
Expand All @@ -56,12 +56,12 @@ type baritoConsumerService struct {
elasticRetrierInterval string
}

func NewBaritoConsumerService(factory KafkaFactory, groupID string, elasticURL string, topicSuffix string, kafkaMaxRetry int, kafkaRetryInterval int, newTopicEventName string, elasticRetrierInterval string, esConfig esConfig) BaritoConsumerService {
func NewBaritoConsumerService(factory KafkaFactory, groupID string, elasticURLs []string, topicSuffix string, kafkaMaxRetry int, kafkaRetryInterval int, newTopicEventName string, elasticRetrierInterval string, esConfig esConfig) BaritoConsumerService {

s := &baritoConsumerService{
factory: factory,
groupID: groupID,
elasticUrl: elasticURL,
elasticUrls: elasticURLs,
topicSuffix: topicSuffix,
kafkaMaxRetry: kafkaMaxRetry,
kafkaRetryInterval: kafkaRetryInterval,
Expand All @@ -71,7 +71,7 @@ func NewBaritoConsumerService(factory KafkaFactory, groupID string, elasticURL s
}

retrier := s.elasticRetrier()
elastic, err := NewElastic(retrier, esConfig, s.elasticUrl)
elastic, err := NewElastic(retrier, esConfig, s.elasticUrls)
s.esClient = &elastic
if err != nil {
s.logError(errkit.Concat(ErrElasticsearchClient, err))
Expand Down
22 changes: 12 additions & 10 deletions flow/barito_consumer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBaritConsumerService_MakeKafkaAdminError(t *testing.T) {
factory.Expect_MakeKafkaAdmin_AlwaysError("some-error")
esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false)

service := NewBaritoConsumerService(factory, "groupID", "elasticURL", "topicSuffix", 1, 10, "newTopicEventName", "", esConfig)
service := NewBaritoConsumerService(factory, "groupID", []string{"elasticURL-1", "elasticURL-2"}, "topicSuffix", 1, 10, "newTopicEventName", "", esConfig)
err := service.Start()
FatalIfWrongError(t, err, "Make kafka admin failed: Error connecting to kafka, retry limit reached")
}
Expand All @@ -34,7 +34,7 @@ func TestBaritoConsumerService_MakeNewTopicWorkerError(t *testing.T) {
factory.Expect_MakeClusterConsumer_AlwaysError("some-error")
esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false)

service := NewBaritoConsumerService(factory, "groupID", "elasticURL", "topicSuffix", 0, 10, "newTopicEventName", "", esConfig)
service := NewBaritoConsumerService(factory, "groupID", []string{"elasticURL-1", "elasticURL-2"}, "topicSuffix", 0, 10, "newTopicEventName", "", esConfig)
err := service.Start()

FatalIfWrongError(t, err, "Make new topic worker failed: some-error")
Expand All @@ -50,7 +50,7 @@ func TestBaritoConsumerService(t *testing.T) {
factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl)
esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false)

var v interface{} = NewBaritoConsumerService(factory, "", "", "_logs", 0, 10, "", "", esConfig)
var v interface{} = NewBaritoConsumerService(factory, "", []string{""}, "_logs", 0, 10, "", "", esConfig)
service := v.(*baritoConsumerService)

err := service.Start()
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestBaritoConsumerService_onStoreTimber_ErrorConvertKafkaMessage(t *testing
defer ts.Close()

service := &baritoConsumerService{
elasticUrl: ts.URL,
elasticUrls: []string{ts.URL},
}

service.onStoreTimber(&sarama.ConsumerMessage{})
Expand All @@ -116,13 +116,14 @@ func TestBaritoConsumerService_onStoreTimber_ErrorStore(t *testing.T) {
})
defer ts.Close()

elasticUrls := []string{ts.URL}
service := &baritoConsumerService{
elasticUrl: ts.URL,
elasticUrls: elasticUrls,
}

retrier := service.elasticRetrier()
esConfig := NewEsConfig("SingleInsert", 1, time.Duration(1000), false)
elastic, _ := NewElastic(retrier, esConfig, ts.URL)
elastic, _ := NewElastic(retrier, esConfig, elasticUrls)
service.esClient = &elastic

service.onStoreTimber(&sarama.ConsumerMessage{
Expand All @@ -135,13 +136,14 @@ func TestBaritoConsumerService_onStoreTimber(t *testing.T) {
ts := NewTestServer(http.StatusOK, []byte(`{}`))
defer ts.Close()

elasticUrls := []string{ts.URL}
service := &baritoConsumerService{
elasticUrl: ts.URL,
elasticUrls: elasticUrls,
}

retrier := service.elasticRetrier()
esConfig := NewEsConfig("SingleInsert", 1, time.Duration(1000), false)
elastic, _ := NewElastic(retrier, esConfig, ts.URL)
elastic, _ := NewElastic(retrier, esConfig, elasticUrls)
service.esClient = &elastic

service.onStoreTimber(&sarama.ConsumerMessage{
Expand Down Expand Up @@ -227,7 +229,7 @@ func TestHaltAllWorker(t *testing.T) {
factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl)
esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false)

var v interface{} = NewBaritoConsumerService(factory, "", "", "_logs", 0, 10, "", "", esConfig)
var v interface{} = NewBaritoConsumerService(factory, "", []string{""}, "_logs", 0, 10, "", "", esConfig)
service := v.(*baritoConsumerService)

err := service.Start()
Expand Down Expand Up @@ -260,7 +262,7 @@ func TestResumeWorker(t *testing.T) {
factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl)
esConfig := NewEsConfig("BulkProcessor", 100, time.Duration(1000), false)

var v interface{} = NewBaritoConsumerService(factory, "", "", "_logs", 0, 10, "", "", esConfig)
var v interface{} = NewBaritoConsumerService(factory, "", []string{""}, "_logs", 0, 10, "", "", esConfig)
service := v.(*baritoConsumerService)

err := service.ResumeWorker()
Expand Down
2 changes: 1 addition & 1 deletion flow/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewEsConfig(indexMethod string, bulkSize int, flushMs time.Duration, printT
}
}

func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls ...string) (client elasticClient, err error) {
func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string) (client elasticClient, err error) {

c, err := elastic.NewClient(
elastic.SetURL(urls...),
Expand Down
6 changes: 3 additions & 3 deletions flow/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestElasticStore_CreateIndexError(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, ts.URL)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
FatalIfError(t, err)

err = client.Store(context.Background(), timber)
Expand All @@ -50,7 +50,7 @@ func TestElasticStore_CreateindexSuccess(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, ts.URL)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
FatalIfError(t, err)

appSecret := timber.Context().AppSecret
Expand All @@ -76,7 +76,7 @@ func TestElasticStoreman_store_SaveError(t *testing.T) {

retrier := mockElasticRetrier()
esConfig := NewEsConfig("SingleInsert", 100, time.Duration(1000), false)
client, err := NewElastic(retrier, esConfig, ts.URL)
client, err := NewElastic(retrier, esConfig, []string{ts.URL})
FatalIfError(t, err)

appSecret := timber.Context().AppSecret
Expand Down

0 comments on commit cfd8569

Please sign in to comment.