Skip to content

Commit

Permalink
Merge pull request #1066 from brave-intl/master
Browse files Browse the repository at this point in the history
production 2021-12-21_1
  • Loading branch information
husobee committed Dec 21, 2021
2 parents 3a2230b + 3812e07 commit afc036c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 22 deletions.
2 changes: 1 addition & 1 deletion payment/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestGenerateSecret(t *testing.T) {
if len(bareSecretKey) != 32 {
t.Error("Secret key does not have correct length", err)
}
if len(k) < 0 {
if len(k) <= 0 {
t.Error("the key should be bigger than nothing")
}
}
Expand Down
1 change: 1 addition & 0 deletions promotion/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const adminAttestationEventSchema = `{
{ "name": "created_at", "type": "string" }
]}`

// AdminAttestationEvent - kafka admin attestation event
type AdminAttestationEvent struct {
WalletID string `json:"wallet_id"`
Service string `json:"service"`
Expand Down
14 changes: 6 additions & 8 deletions promotion/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,8 @@ func (service *Service) SubmitBatchTransfer(ctx context.Context, batchID *uuid.U
)

var (
totalF64 float64
depositID string
transferID string
totalF64 float64
depositID string
)

for _, v := range transfers {
Expand All @@ -358,7 +357,6 @@ func (service *Service) SubmitBatchTransfer(ctx context.Context, batchID *uuid.U
break
}
depositID = *v.DepositID
transferID = transferID + v.ID.String()
}

// collapse into one transaction, not multiples in a bulk upload
Expand All @@ -367,7 +365,7 @@ func (service *Service) SubmitBatchTransfer(ctx context.Context, batchID *uuid.U
CurrencyCode: "BAT",
Amount: totalF64,
DepositID: depositID,
TransferID: transferID,
TransferID: batchID.String(),
SourceFrom: "userdrain",
})

Expand Down Expand Up @@ -726,13 +724,13 @@ func (service *Service) MintGrant(ctx context.Context, walletID uuid.UUID, total
}

// FetchAdminAttestationWalletID - retrieves walletID from topic
func (s *Service) FetchAdminAttestationWalletID(ctx context.Context) (*uuid.UUID, error) {
message, err := s.kafkaAdminAttestationReader.ReadMessage(ctx)
func (service *Service) FetchAdminAttestationWalletID(ctx context.Context) (*uuid.UUID, error) {
message, err := service.kafkaAdminAttestationReader.ReadMessage(ctx)
if err != nil {
return nil, fmt.Errorf("read message: error reading kafka message %w", err)
}

codec, ok := s.codecs[adminAttestationTopic]
codec, ok := service.codecs[adminAttestationTopic]
if !ok {
return nil, fmt.Errorf("read message: could not find codec %s", adminAttestationTopic)
}
Expand Down
1 change: 1 addition & 0 deletions promotion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func SetAdminAttestationTopic(newTopic string) {
adminAttestationTopic = newTopic
}

// KafkaReader - reader interface
type KafkaReader interface {
ReadMessage(ctx context.Context) (kafka.Message, error)
}
Expand Down
6 changes: 3 additions & 3 deletions utils/clients/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (c *SimpleHTTPClient) newRequest(
})

// m, _ := json.MarshalIndent(body, "", " ")
// fmt.Println(path, string(m))
if body != nil && method != "GET" {
buf = new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(body)
Expand Down Expand Up @@ -270,10 +269,9 @@ func (c *SimpleHTTPClient) do(

// helpful if you want to read the body as it is
bodyBytes, _ := requestutils.Read(resp.Body)
resp.Body.Close() // must close
_ = resp.Body.Close() // must close
resp.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))

// fmt.Println(req.URL.Host, req.URL.Path, string(bodyBytes))
if status >= 200 && status <= 299 {
if v != nil {
err = json.Unmarshal(bodyBytes, v)
Expand All @@ -284,6 +282,8 @@ func (c *SimpleHTTPClient) do(
return resp, nil
}

logger.Warn().Int("response_status", status).Err(err).Msg("failed http client call")
logger.Debug().Str("host", req.URL.Host).Str("path", req.URL.Path).Str("body", string(bodyBytes)).Msg("failed http client call")
return resp, errors.Wrap(err, ErrProtocolError)
}

Expand Down
25 changes: 15 additions & 10 deletions utils/kafka/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ import (
"time"

"github.com/linkedin/goavro"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"

appctx "github.com/brave-intl/bat-go/utils/context"
errorutils "github.com/brave-intl/bat-go/utils/errors"
"github.com/brave-intl/bat-go/utils/logging"
)

type KafkaRead struct {
// Reader - implements KafkaReader
type Reader struct {
kafkaReader *kafka.Reader
kafkaDialer *kafka.Dialer
}

func NewKafkaReader(ctx context.Context, groupID string, topic string) (*KafkaRead, error) {
// NewKafkaReader - creates a new kafka reader for groupID and topic
func NewKafkaReader(ctx context.Context, groupID string, topic string) (*Reader, error) {
_, logger := logging.SetupLogger(ctx)

dialer, x509Cert, err := TLSDialer()
Expand All @@ -40,20 +42,23 @@ func NewKafkaReader(ctx context.Context, groupID string, topic string) (*KafkaRe
kafkaBrokers := ctx.Value(appctx.KafkaBrokersCTXKey).(string)

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(kafkaBrokers, ","),
GroupID: groupID,
Topic: topic,
Dialer: dialer,
Logger: kafka.LoggerFunc(logger.Printf), // FIXME
Brokers: strings.Split(kafkaBrokers, ","),
GroupID: groupID,
Topic: topic,
Dialer: dialer,
StartOffset: 0,
RetentionTime: 2 * time.Hour,
Logger: kafka.LoggerFunc(logger.Printf), // FIXME
})

return &KafkaRead{
return &Reader{
kafkaReader: kafkaReader,
kafkaDialer: dialer,
}, nil
}

func (k *KafkaRead) ReadMessage(ctx context.Context) (kafka.Message, error) {
// ReadMessage - reads kafka messages
func (k *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) {
return k.kafkaReader.ReadMessage(ctx)
}

Expand Down

0 comments on commit afc036c

Please sign in to comment.