From 48cb24fce1a627cb8e76a4cc8e9c2a6577cef161 Mon Sep 17 00:00:00 2001 From: clD11 <23483715+clD11@users.noreply.github.com> Date: Fri, 17 Dec 2021 20:33:45 +0000 Subject: [PATCH 1/3] feat(1010): kafka consumer signalling drain retry (#1063) added retention time and start offset --- utils/kafka/dialer.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/utils/kafka/dialer.go b/utils/kafka/dialer.go index 35c90e681..bbf90e8cb 100644 --- a/utils/kafka/dialer.go +++ b/utils/kafka/dialer.go @@ -40,11 +40,13 @@ func NewKafkaReader(ctx context.Context, groupID string, topic string) (*KafkaRe kafkaBrokers := ctx.Value(appctx.KafkaBrokersCTXKey).(string) kafkaReader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: strings.Split(kafkaBrokers, ","), - GroupID: groupID, - Topic: topic, - Dialer: dialer, - Logger: kafka.LoggerFunc(logger.Printf), // FIXME + Brokers: strings.Split(kafkaBrokers, ","), + GroupID: groupID, + Topic: topic, + Dialer: dialer, + StartOffset: 0, + RetentionTime: 2 * time.Hour, + Logger: kafka.LoggerFunc(logger.Printf), // FIXME }) return &KafkaRead{ From 57599122e29e6d4af832c9f9868e5cc4f22799c4 Mon Sep 17 00:00:00 2001 From: clD11 <23483715+clD11@users.noreply.github.com> Date: Tue, 21 Dec 2021 11:22:36 +0000 Subject: [PATCH 2/3] fixed linter (#1065) --- payment/key_test.go | 2 +- promotion/avro.go | 1 + promotion/drain.go | 6 +++--- promotion/service.go | 1 + utils/clients/client.go | 2 +- utils/kafka/dialer.go | 13 ++++++++----- 6 files changed, 15 insertions(+), 10 deletions(-) diff --git a/payment/key_test.go b/payment/key_test.go index 302747bdb..447174f6d 100644 --- a/payment/key_test.go +++ b/payment/key_test.go @@ -71,7 +71,7 @@ func TestGenerateSecret(t *testing.T) { if len(bareSecretKey) != 32 { t.Error("Secret key does not have correct length", err) } - if len(k) < 0 { + if len(k) <= 0 { t.Error("the key should be bigger than nothing") } } diff --git a/promotion/avro.go b/promotion/avro.go index 0bf856672..16cfbadce 100644 --- a/promotion/avro.go +++ b/promotion/avro.go @@ -42,6 +42,7 @@ const adminAttestationEventSchema = `{ { "name": "created_at", "type": "string" } ]}` +// AdminAttestationEvent - kafka admin attestation event type AdminAttestationEvent struct { WalletID string `json:"wallet_id"` Service string `json:"service"` diff --git a/promotion/drain.go b/promotion/drain.go index fd1eee635..8ea88f427 100644 --- a/promotion/drain.go +++ b/promotion/drain.go @@ -726,13 +726,13 @@ func (service *Service) MintGrant(ctx context.Context, walletID uuid.UUID, total } // FetchAdminAttestationWalletID - retrieves walletID from topic -func (s *Service) FetchAdminAttestationWalletID(ctx context.Context) (*uuid.UUID, error) { - message, err := s.kafkaAdminAttestationReader.ReadMessage(ctx) +func (service *Service) FetchAdminAttestationWalletID(ctx context.Context) (*uuid.UUID, error) { + message, err := service.kafkaAdminAttestationReader.ReadMessage(ctx) if err != nil { return nil, fmt.Errorf("read message: error reading kafka message %w", err) } - codec, ok := s.codecs[adminAttestationTopic] + codec, ok := service.codecs[adminAttestationTopic] if !ok { return nil, fmt.Errorf("read message: could not find codec %s", adminAttestationTopic) } diff --git a/promotion/service.go b/promotion/service.go index 5e1321df7..132a22122 100644 --- a/promotion/service.go +++ b/promotion/service.go @@ -98,6 +98,7 @@ func SetAdminAttestationTopic(newTopic string) { adminAttestationTopic = newTopic } +// KafkaReader - reader interface type KafkaReader interface { ReadMessage(ctx context.Context) (kafka.Message, error) } diff --git a/utils/clients/client.go b/utils/clients/client.go index fea07eeba..b8b7a353c 100644 --- a/utils/clients/client.go +++ b/utils/clients/client.go @@ -270,7 +270,7 @@ func (c *SimpleHTTPClient) do( // helpful if you want to read the body as it is bodyBytes, _ := requestutils.Read(resp.Body) - resp.Body.Close() // must close + _ = resp.Body.Close() // must close resp.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) // fmt.Println(req.URL.Host, req.URL.Path, string(bodyBytes)) diff --git a/utils/kafka/dialer.go b/utils/kafka/dialer.go index bbf90e8cb..4d97de6ae 100644 --- a/utils/kafka/dialer.go +++ b/utils/kafka/dialer.go @@ -14,19 +14,21 @@ import ( "time" "github.com/linkedin/goavro" - kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" appctx "github.com/brave-intl/bat-go/utils/context" errorutils "github.com/brave-intl/bat-go/utils/errors" "github.com/brave-intl/bat-go/utils/logging" ) -type KafkaRead struct { +// Reader - implements KafkaReader +type Reader struct { kafkaReader *kafka.Reader kafkaDialer *kafka.Dialer } -func NewKafkaReader(ctx context.Context, groupID string, topic string) (*KafkaRead, error) { +// NewKafkaReader - creates a new kafka reader for groupID and topic +func NewKafkaReader(ctx context.Context, groupID string, topic string) (*Reader, error) { _, logger := logging.SetupLogger(ctx) dialer, x509Cert, err := TLSDialer() @@ -49,13 +51,14 @@ func NewKafkaReader(ctx context.Context, groupID string, topic string) (*KafkaRe Logger: kafka.LoggerFunc(logger.Printf), // FIXME }) - return &KafkaRead{ + return &Reader{ kafkaReader: kafkaReader, kafkaDialer: dialer, }, nil } -func (k *KafkaRead) ReadMessage(ctx context.Context) (kafka.Message, error) { +// ReadMessage - reads kafka messages +func (k *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) { return k.kafkaReader.ReadMessage(ctx) } From 3812e07f80cd099cd323d1562959a1d969a5fe76 Mon Sep 17 00:00:00 2001 From: husobee Date: Tue, 21 Dec 2021 10:26:59 -0500 Subject: [PATCH 3/3] bf tx consolidation tx id (#1064) * use the batch id as idempotency key bf consolidation * removing comments not needed --- promotion/drain.go | 8 +++----- utils/clients/client.go | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/promotion/drain.go b/promotion/drain.go index 8ea88f427..04867f68a 100644 --- a/promotion/drain.go +++ b/promotion/drain.go @@ -335,9 +335,8 @@ func (service *Service) SubmitBatchTransfer(ctx context.Context, batchID *uuid.U ) var ( - totalF64 float64 - depositID string - transferID string + totalF64 float64 + depositID string ) for _, v := range transfers { @@ -358,7 +357,6 @@ func (service *Service) SubmitBatchTransfer(ctx context.Context, batchID *uuid.U break } depositID = *v.DepositID - transferID = transferID + v.ID.String() } // collapse into one transaction, not multiples in a bulk upload @@ -367,7 +365,7 @@ func (service *Service) SubmitBatchTransfer(ctx context.Context, batchID *uuid.U CurrencyCode: "BAT", Amount: totalF64, DepositID: depositID, - TransferID: transferID, + TransferID: batchID.String(), SourceFrom: "userdrain", }) diff --git a/utils/clients/client.go b/utils/clients/client.go index b8b7a353c..4df1a4506 100644 --- a/utils/clients/client.go +++ b/utils/clients/client.go @@ -160,7 +160,6 @@ func (c *SimpleHTTPClient) newRequest( }) // m, _ := json.MarshalIndent(body, "", " ") - // fmt.Println(path, string(m)) if body != nil && method != "GET" { buf = new(bytes.Buffer) err := json.NewEncoder(buf).Encode(body) @@ -273,7 +272,6 @@ func (c *SimpleHTTPClient) do( _ = resp.Body.Close() // must close resp.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) - // fmt.Println(req.URL.Host, req.URL.Path, string(bodyBytes)) if status >= 200 && status <= 299 { if v != nil { err = json.Unmarshal(bodyBytes, v) @@ -284,6 +282,8 @@ func (c *SimpleHTTPClient) do( return resp, nil } + logger.Warn().Int("response_status", status).Err(err).Msg("failed http client call") + logger.Debug().Str("host", req.URL.Host).Str("path", req.URL.Path).Str("body", string(bodyBytes)).Msg("failed http client call") return resp, errors.Wrap(err, ErrProtocolError) }