Skip to content

Commit

Permalink
Merge pull request #48 from BaritoLog/grpc-max-recv-size
Browse files Browse the repository at this point in the history
add option to set MaxRecvMsgSize for grpc
  • Loading branch information
fadlinurhasan authored Jan 7, 2021
2 parents cfd153b + 0090268 commit 1c4ae44
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cmds/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func ActionBaritoProducerService(c *cli.Context) (err error) {
kafkaMaxRetry := configKafkaMaxRetry()
kafkaRetryInterval := configKafkaRetryInterval()
newTopicEventName := configNewTopicEvent()
grpcMaxRecvMsgSize := configGrpcMaxRecvMsgSize()

// kafka producer config
config := sarama.NewConfig()
Expand All @@ -123,6 +124,7 @@ func ActionBaritoProducerService(c *cli.Context) (err error) {
"kafkaMaxRetry": kafkaMaxRetry,
"kafkaRetryInterval": kafkaRetryInterval,
"newEventTopic": newTopicEventName,
"grpcMaxRecvMsgSize": grpcMaxRecvMsgSize,
}

service := flow.NewProducerService(producerParams)
Expand Down
8 changes: 8 additions & 0 deletions cmds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
EnvEsBulkSize = "BARITO_ELASTICSEARCH_BULK_SIZE"
EnvEsFlushIntervalMs = "BARITO_ELASTICSEARCH_FLUSH_INTERVAL_MS"

EnvGrpcMaxRecvMsgSize = "BARITO_GRPC_MAX_RECV_MSG_SIZE"

EnvPushMetricUrl = "BARITO_PUSH_METRIC_URL"
EnvPushMetricInterval = "BARITO_PUSH_METRIC_INTERVAL"

Expand Down Expand Up @@ -57,6 +59,8 @@ var (

DefaultElasticsearchUrls = []string{"http://localhost:9200"}

DefaultGrpcMaxRecvMsgSize = 20 * 1000 * 1000

DefaultPushMetricUrl = ""
DefaultPushMetricInterval = "30s"

Expand Down Expand Up @@ -132,6 +136,10 @@ func configEsFlushIntervalMs() (i int) {
return intEnvOrDefault(EnvEsFlushIntervalMs, DefaultEsFlushIntervalMs)
}

func configGrpcMaxRecvMsgSize() (i int) {
return intEnvOrDefault(EnvGrpcMaxRecvMsgSize, DefaultGrpcMaxRecvMsgSize)
}

func configConsulElasticsearchName() (s string) {
return stringEnvOrDefault(EnvConsulElasticsearchName, DefaultConsulElasticsearchName)
}
Expand Down
4 changes: 3 additions & 1 deletion flow/grpc_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type producerService struct {
kafkaMaxRetry int
kafkaRetryInterval int
newEventTopic string
grpcMaxRecvMsgSize int

producer sarama.SyncProducer
admin KafkaAdmin
Expand All @@ -58,6 +59,7 @@ func NewProducerService(params map[string]interface{}) ProducerService {
kafkaMaxRetry: params["kafkaMaxRetry"].(int),
kafkaRetryInterval: params["kafkaRetryInterval"].(int),
newEventTopic: params["newEventTopic"].(string),
grpcMaxRecvMsgSize: params["grpcMaxRecvMsgSize"].(int),
}
}

Expand Down Expand Up @@ -118,7 +120,7 @@ func (s *producerService) initGrpcServer() (lis net.Listener, srv *grpc.Server,
return
}

srv = grpc.NewServer()
srv = grpc.NewServer(grpc.MaxRecvMsgSize(s.grpcMaxRecvMsgSize))
pb.RegisterProducerServer(srv, s)

s.grpcServer = srv
Expand Down
2 changes: 2 additions & 0 deletions flow/grpc_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func TestProducerService_Start_ErrorMakeSyncProducer(t *testing.T) {
"kafkaMaxRetry": 2,
"kafkaRetryInterval": 1,
"newEventTopic": "new_topic_events",
"grpcMaxRecvMsgSize": 20000000,
}

service := NewProducerService(producerParams)
Expand Down Expand Up @@ -249,6 +250,7 @@ func TestProducerService_Start_ErrorMakeKafkaAdmin(t *testing.T) {
"kafkaMaxRetry": 1,
"kafkaRetryInterval": 10,
"newEventTopic": "new_topic_events",
"grpcMaxRecvMsgSize": 20000000,
}

service := NewProducerService(producerParams)
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ require (
github.com/BaritoLog/instru v0.0.0-20190715232619-ef001fffe4f0
github.com/Shopify/sarama v1.19.0
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4 // indirect
github.com/bouk/monkey v1.0.1 // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/eapache/go-resiliency v1.2.0 // indirect
Expand All @@ -21,6 +23,8 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.9.5
github.com/hashicorp/consul/api v1.1.0
github.com/mailru/easyjson v0.0.0-20190620125010-da37f6c1e481 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/olivere/elastic v6.2.19+incompatible
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4 h1:EBTWhcAX7rNQ80RLwLCpHZBBrJuzallFHnF+yMXo928=
github.com/alecthomas/units v0.0.0-20201120081800-1786d5ef83d4/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down Expand Up @@ -155,6 +159,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/vwidjaya/barito-proto v0.0.0-20190820161146-42a41bebaef1 h1:SMyaeqxIZD6gG0GVdIPtPwH/rAvb+lO5dPiN+AWoeLc=
Expand Down Expand Up @@ -210,5 +215,6 @@ gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

0 comments on commit 1c4ae44

Please sign in to comment.