diff --git a/batch_handler.go b/batch_handler.go index cb88943..2752c5b 100644 --- a/batch_handler.go +++ b/batch_handler.go @@ -1,100 +1,41 @@ package mq -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "time" -) +import "context" -type Message struct { - Id string `json:"id,omitempty" gorm:"column:id;primary_key" bson:"id,omitempty" dynamodbav:"id,omitempty" firestore:"id,omitempty"` - Data []byte `json:"data,omitempty" gorm:"column:data" bson:"data,omitempty" dynamodbav:"data,omitempty" firestore:"data,omitempty"` - Attributes map[string]string `json:"attributes,omitempty" gorm:"column:attributes" bson:"attributes,omitempty" dynamodbav:"attributes,omitempty" firestore:"attributes,omitempty"` - Timestamp *time.Time `json:"timestamp,omitempty" gorm:"column:timestamp" bson:"timestamp,omitempty" dynamodbav:"timestamp,omitempty" firestore:"timestamp,omitempty"` - Raw interface{} `json:"-" bson:"-" dynamodbav:"-" firestore:"-"` - Value interface{} `json:"-" bson:"-" dynamodbav:"-" firestore:"-"` +type Message[T any] struct { + Data []byte `yaml:"data" mapstructure:"data" json:"data,omitempty" gorm:"column:data" bson:"data,omitempty" dynamodbav:"data,omitempty" firestore:"data,omitempty"` + Attributes map[string]string `yaml:"attributes" mapstructure:"attributes" json:"attributes,omitempty" gorm:"column:attributes" bson:"attributes,omitempty" dynamodbav:"attributes,omitempty" firestore:"attributes,omitempty"` + Value T `yaml:"value" mapstructure:"value" json:"value,omitempty" gorm:"column:value" bson:"value,omitempty" dynamodbav:"value,omitempty" firestore:"value,omitempty"` } -type BatchHandler struct { - modelType reflect.Type - modelsType reflect.Type - Write func(ctx context.Context, models interface{}) ([]int, []int, error) // Return: Success indices, Fail indices, Error - Unmarshal func(data []byte, v interface{}) error - Marshal func(v interface{}) ([]byte, error) - LogError func(context.Context, string) - LogInfo func(context.Context, string) +type BatchHandler[T any] struct { + Write func(context.Context, []T) ([]int, error) // Return: Fail indices, Error } -func NewBatchHandler(writeBatch func(context.Context, interface{}) ([]int, []int, error), modelType reflect.Type, unmarshal func(data []byte, v interface{}) error, logs ...func(context.Context, string)) *BatchHandler { - modelsType := reflect.Zero(reflect.SliceOf(modelType)).Type() - if unmarshal == nil { - unmarshal = json.Unmarshal - } - h := &BatchHandler{modelType: modelType, modelsType: modelsType, Write: writeBatch} - if len(logs) >= 1 { - h.LogError = logs[0] - } - if len(logs) >= 2 { - h.LogInfo = logs[1] - } +func NewBatchHandler[T any](writeBatch func(context.Context, []T) ([]int, error)) *BatchHandler[T] { + h := &BatchHandler[T]{Write: writeBatch} return h } -func (h *BatchHandler) Handle(ctx context.Context, data []*Message) ([]*Message, error) { - failMessages := make([]*Message, 0) - - vs := InitModels(h.modelsType) - var v = reflect.Indirect(reflect.ValueOf(vs)) - for _, message := range data { - if message.Value != nil { - vo := reflect.ValueOf(message.Value) - v1 := reflect.Indirect(vo) - v = reflect.Append(v, v1) - } else { - item := InitModel(h.modelType) - err := h.Unmarshal(message.Data, item) - if err != nil { - failMessages = append(failMessages, message) - return failMessages, fmt.Errorf(`cannot unmarshal item: %s. Error: %s`, message.Data, err.Error()) - } - vo := reflect.ValueOf(item) - x := reflect.Indirect(vo).Interface() - v = reflect.Append(v, reflect.ValueOf(x)) +func (h *BatchHandler[T]) Handle(ctx context.Context, data []Message[T]) ([]Message[T], error) { + failMessages := make([]Message[T], 0) + var vs []T + le := len(data) + if le > 0 { + for i := 0; i < le; i++ { + message := data[i] + vs = append(vs, message.Value) } - } - if h.LogInfo != nil { - if h.Marshal != nil { - sv, er0 := h.Marshal(v.Interface()) - if er0 != nil { - h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v)) - } else { - h.LogInfo(ctx, fmt.Sprintf(`models: %s`, sv)) - } - } else { - h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v)) + failIndices, err := h.Write(ctx, vs) + if err != nil { + return failMessages, err } - } - successIndices, failIndices, er1 := h.Write(ctx, v.Interface()) - if h.LogInfo != nil { - h.LogInfo(ctx, fmt.Sprintf(`success indices %v fail indices %v`, successIndices, failIndices)) - } - if successIndices != nil && len(successIndices) > 0 { - if failIndices != nil && len(failIndices) > 0 { - for _, failIndex := range failIndices { - failMessages = append(failMessages, data[failIndex]) + sl := len(failIndices) + if sl > 0 { + for j := 0; j < sl; j++ { + failMessages = append(failMessages, data[failIndices[j]]) } } - return failMessages, nil - } - if er1 != nil && h.LogError != nil { - sv, er2 := json.Marshal(v.Interface()) - if er2 != nil { - h.LogError(ctx, fmt.Sprintf("Cannot write batch: %s Error: %s", v.Interface(), er1.Error())) - } else { - h.LogError(ctx, fmt.Sprintf("Cannot write batch: %s Error: %s", sv, er1.Error())) - } } - return data, er1 + return failMessages, nil } diff --git a/batch_worker.go b/batch_worker.go index 4517ce8..e6abd47 100644 --- a/batch_worker.go +++ b/batch_worker.go @@ -2,6 +2,7 @@ package mq import ( "context" + "encoding/json" "fmt" "strconv" "sync" @@ -10,86 +11,162 @@ import ( const TimeFormat = "15:04:05.000" -type BatchWorker interface { - Handle(ctx context.Context, message *Message) - Run(ctx context.Context) +type BatchConfig struct { + RetryCountName string `yaml:"retry_count_name" mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"` + LimitRetry int `yaml:"limit_retry" mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"` + Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"` + Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"` + Timeout int64 `yaml:"timeout" mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"` + BatchSize int `yaml:"batch_size" mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"` } -type BatchWorkerConfig struct { - BatchSize int `mapstructure:"batch_size" json:"batchSize,omitempty" gorm:"column:batchsize" bson:"batchSize,omitempty" dynamodbav:"batchSize,omitempty" firestore:"batchSize,omitempty"` - Timeout int64 `mapstructure:"timeout" json:"timeout,omitempty" gorm:"column:timeout" bson:"timeout,omitempty" dynamodbav:"timeout,omitempty" firestore:"timeout,omitempty"` - LimitRetry int `mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"` - Goroutines bool `mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"` -} - -type DefaultBatchWorker struct { +type BatchWorker[T any] struct { batchSize int timeout int64 - limitRetry int - handle func(ctx context.Context, data []*Message) ([]*Message, error) + Unmarshal func(data []byte, v any) error + handle func(ctx context.Context, data []Message[T]) ([]Message[T], error) + Validate func(context.Context, *T) ([]ErrorMessage, error) + Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string) + HandleError func(context.Context, []byte, map[string]string) Retry func(context.Context, []byte, map[string]string) error + LimitRetry int RetryCountName string - Error func(context.Context, []byte, map[string]string) error Goroutine bool - messages []*Message + messages []Message[T] latestExecutedTime time.Time mux sync.Mutex + Key string LogError func(context.Context, string) LogInfo func(context.Context, string) + LogDebug func(context.Context, string) } -func NewBatchWorkerByConfig(batchConfig BatchWorkerConfig, handle func(context.Context, []*Message) ([]*Message, error), retry func(context.Context, []byte, map[string]string) error, retryCountName string, errorHandler func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *DefaultBatchWorker { - return NewBatchWorker(batchConfig.BatchSize, batchConfig.Timeout, batchConfig.LimitRetry, handle, retry, retryCountName, errorHandler, batchConfig.Goroutines, logs...) +func NewBatchWorkerByConfig[T any]( + c BatchConfig, + handle func(context.Context, []Message[T]) ([]Message[T], error), + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *BatchWorker[T] { + return NewBatchWorker[T](c.BatchSize, c.Timeout, nil, handle, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) } - -func NewDefaultBatchWorker(batchConfig BatchWorkerConfig, handle func(context.Context, []*Message) ([]*Message, error), retry func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *DefaultBatchWorker { - return NewBatchWorker(batchConfig.BatchSize, batchConfig.Timeout, batchConfig.LimitRetry, handle, retry, "", nil, batchConfig.Goroutines, logs...) +func NewBatchWorkerByConfigAndUnmarshal[T any]( + c BatchConfig, + unmarshal func(data []byte, v any) error, + handle func(context.Context, []Message[T]) ([]Message[T], error), + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *BatchWorker[T] { + return NewBatchWorker[T](c.BatchSize, c.Timeout, unmarshal, handle, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) } - -func NewBatchWorker(batchSize int, timeout int64, limitRetry int, handle func(context.Context, []*Message) ([]*Message, error), retry func(context.Context, []byte, map[string]string) error, retryCountName string, handleError func(context.Context, []byte, map[string]string) error, goroutine bool, logs ...func(context.Context, string)) *DefaultBatchWorker { +func NewBatchWorker[T any]( + batchSize int, timeout int64, + unmarshal func(data []byte, v any) error, + handle func(context.Context, []Message[T]) ([]Message[T], error), + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + limitRetry int, + retryCountName string, + goroutine bool, + key string, + logs ...func(context.Context, string)) *BatchWorker[T] { if len(retryCountName) == 0 { retryCountName = "retryCount" } - if handleError == nil { - e1 := NewErrorHandler(logs...) - handleError = e1.HandleError + if unmarshal == nil { + unmarshal = json.Unmarshal } - w := &DefaultBatchWorker{ + w := &BatchWorker[T]{ batchSize: batchSize, timeout: timeout, - limitRetry: limitRetry, + Unmarshal: unmarshal, handle: handle, + Validate: validate, + Reject: reject, + HandleError: handleError, Retry: retry, + LimitRetry: limitRetry, RetryCountName: retryCountName, - Error: handleError, Goroutine: goroutine, + Key: key, } - if len(logs) >= 1 { + if len(logs) > 0 { w.LogError = logs[0] } - if len(logs) >= 2 { + if len(logs) > 1 { w.LogInfo = logs[1] } + if len(logs) > 2 { + w.LogInfo = logs[2] + } return w } -func (w *DefaultBatchWorker) Handle(ctx context.Context, message *Message) { +func (w *BatchWorker[T]) Handle(ctx context.Context, data []byte, attrs map[string]string) { + if data == nil { + return + } + if w.LogInfo != nil { + key := GetString(ctx, w.Key) + if len(key) > 0 { + w.LogInfo(ctx, fmt.Sprintf("Received message with key %s : %s", key, GetLog(data, attrs))) + } else { + w.LogInfo(ctx, fmt.Sprintf("Received message: %s", GetLog(data, attrs))) + } + } + var v T + er1 := json.Unmarshal(data, &v) + if er1 != nil { + if w.LogError != nil { + w.LogError(ctx, fmt.Sprintf("cannot unmarshal item: %s . Error: %s", GetLog(data, attrs), er1.Error())) + } + return + } + if w.Validate != nil { + errs, err := w.Validate(ctx, &v) + if err != nil { + if w.LogError != nil { + w.LogError(ctx, "Error when validate data: "+err.Error()) + } + return + } + if len(errs) > 0 { + w.Reject(ctx, &v, errs, data, attrs) + return + } + } w.mux.Lock() - if message != nil { - w.messages = append(w.messages, message) + msg := Message[T]{Data: data, Attributes: attrs, Value: v} + w.messages = append(w.messages, msg) + if w.ready(ctx) { + w.execute(ctx) + } + w.mux.Unlock() +} +func (w *BatchWorker[T]) CallByTimer(ctx context.Context) { + w.mux.Lock() + if w.LogDebug != nil { + w.LogDebug(ctx, "Call by timer") } if w.ready(ctx) { w.execute(ctx) } w.mux.Unlock() } - -func (w *DefaultBatchWorker) ready(ctx context.Context) bool { +func (w *BatchWorker[T]) ready(ctx context.Context) bool { isReady := false now := time.Now() batchSize := len(w.messages) t := w.latestExecutedTime.Add(time.Duration(w.timeout) * time.Millisecond) if batchSize > 0 && (batchSize >= w.batchSize || t.Sub(now) < 0) { + if w.LogDebug != nil && batchSize >= w.batchSize { + w.LogDebug(ctx, fmt.Sprintf("Call by batch size %d %d", w.batchSize, batchSize)) + } isReady = true } if isReady && w.LogInfo != nil { @@ -98,7 +175,7 @@ func (w *DefaultBatchWorker) ready(ctx context.Context) bool { return isReady } -func (w *DefaultBatchWorker) execute(ctx context.Context) { +func (w *BatchWorker[T]) execute(ctx context.Context) { lenMessages := len(w.messages) if lenMessages == 0 { w.reset(ctx) @@ -110,13 +187,12 @@ func (w *DefaultBatchWorker) execute(ctx context.Context) { if err != nil && w.LogError != nil { w.LogError(ctx, "Error of batch handling: "+err.Error()) } - if errList != nil && len(errList) > 0 { + if len(errList) > 0 { if w.Retry == nil { if w.LogError != nil { l := len(errList) for i := 0; i < l; i++ { - x := CreateLog(errList[i].Data, errList[i].Attributes, errList[i].Id, errList[i].Timestamp) - w.LogError(ctx, fmt.Sprintf("Error message: %s.", x)) + w.LogError(ctx, fmt.Sprintf("Error message: %s.", GetLog(errList[i].Data, errList[i].Attributes))) } } } else { @@ -133,24 +209,21 @@ func (w *DefaultBatchWorker) execute(ctx context.Context) { } } retryCount++ - if retryCount > w.limitRetry { + if retryCount > w.LimitRetry { if w.LogInfo != nil { - x := CreateLog(errList[i].Data, errList[i].Attributes, errList[i].Id, errList[i].Timestamp) - w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, w.limitRetry, x)) + w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount-1, w.LimitRetry, GetLog(errList[i].Data, errList[i].Attributes))) } - if w.Error != nil { - w.Error(ctx, errList[i].Data, errList[i].Attributes) + if w.HandleError != nil { + w.HandleError(ctx, errList[i].Data, errList[i].Attributes) } continue } else if w.LogInfo != nil { - x := CreateLog(errList[i].Data, errList[i].Attributes, errList[i].Id, errList[i].Timestamp) - w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s.", retryCount, x)) + w.LogInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s", retryCount-1, GetLog(errList[i].Data, errList[i].Attributes))) } errList[i].Attributes[w.RetryCountName] = strconv.Itoa(retryCount) er3 := w.Retry(ctx, errList[i].Data, errList[i].Attributes) if er3 != nil && w.LogError != nil { - x := CreateLog(errList[i].Data, errList[i].Attributes, errList[i].Id, errList[i].Timestamp) - w.LogError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", x, er3.Error())) + w.LogError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", GetLog(errList[i].Data, errList[i].Attributes), er3.Error())) } } } @@ -158,19 +231,19 @@ func (w *DefaultBatchWorker) execute(ctx context.Context) { w.reset(ctx) } -func (w *DefaultBatchWorker) reset(ctx context.Context) { +func (w *BatchWorker[T]) reset(ctx context.Context) { w.messages = w.messages[:0] w.latestExecutedTime = time.Now() } -func (w *DefaultBatchWorker) Run(ctx context.Context) { +func (w *BatchWorker[T]) Run(ctx context.Context) { w.reset(ctx) ticker := time.NewTicker(time.Duration(w.timeout) * time.Millisecond) go func() { for { select { case <-ticker.C: - w.Handle(ctx, nil) + w.CallByTimer(ctx) } } }() diff --git a/bytes_writer.go b/bytes_writer.go deleted file mode 100644 index b27c8eb..0000000 --- a/bytes_writer.go +++ /dev/null @@ -1,22 +0,0 @@ -package mq - -import ( - "context" - "github.com/pkg/errors" -) - -type BytesWriter struct { - Send func(ctx context.Context, data []byte, attributes map[string]string) (string, error) -} -func NewBytesWriter(send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)) *BytesWriter { - return &BytesWriter{Send: send} -} -func (w *BytesWriter) Write(ctx context.Context, data interface{}) error { - d, ok := data.([]byte) - if !ok { - return errors.New("data must be byte array ([]byte)") - } else { - _, err := w.Send(ctx, d, nil) - return err - } -} diff --git a/confluent/consumer.go b/confluent/consumer.go index dad2c97..681d669 100644 --- a/confluent/consumer.go +++ b/confluent/consumer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/core-go/mq" "log" "time" ) @@ -13,7 +12,6 @@ type ( Consumer struct { Consumer *kafka.Consumer Topics []string - Convert func(context.Context, []byte) ([]byte, error) LogError func(context.Context, string) LogInfo func(context.Context, string) } @@ -70,20 +68,12 @@ func NewConsumer(consumer *kafka.Consumer, topics []string, logs ...func(context } return c } -func NewConsumerWithConvert(consumer *kafka.Consumer, topics []string, options ...func(context.Context, []byte) ([]byte, error)) *Consumer { - var convert func(context.Context, []byte) ([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &Consumer{Consumer: consumer, Topics: topics, Convert: convert} -} -func NewConsumerByConfigAndRetries(c ConsumerConfig, convert func(context.Context, []byte) ([]byte, error), retries ...time.Duration) (*Consumer, error) { +func NewConsumerByConfigAndRetries(c ConsumerConfig, retries ...time.Duration) (*Consumer, error) { if len(retries) == 0 { cs, err := NewConsumerByConfig(c) if err != nil { return cs, err } - cs.Convert = convert return cs, nil } else { return NewConsumerByConfigAndRetryArray(c, retries) @@ -109,24 +99,19 @@ func NewConsumerByConfigAndRetryArray(c ConsumerConfig, retries []time.Duration, if err != nil { log.Println(fmt.Sprintf("Fail in creating new Consumer after %d retries", i)) } - var convert func(context.Context, []byte) ([]byte, error) - if len(options) > 0 { - convert = options[0] - } return &Consumer{ Consumer: consumer, Topics: []string{c.Topic}, - Convert: convert, }, nil } -func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) { +func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, []byte, map[string]string)) { defer c.Consumer.Close() err := c.Consumer.SubscribeTopics(c.Topics, nil) if err != nil { if c.LogError != nil { - c.LogError(ctx, fmt.Sprintf("Subscribe Topic err: %v", err)) + c.LogError(ctx, fmt.Sprintf("Consume Topic err: %v", err)) } return } @@ -139,35 +124,79 @@ func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value))) } h := HeaderToMap(e.Headers) - message := &mq.Message{ - Id: string(e.Key), - Data: e.Value, - Attributes: h, - Timestamp: &e.Timestamp, - Raw: e, + handle(ctx, e.Value, h) + case kafka.PartitionEOF: + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Reached %v", e)) } - if c.Convert == nil { - handle(ctx, message, nil) - } else { - data, err := c.Convert(ctx, e.Value) - if err != nil { - handle(ctx, message, err) - } else { - message.Data = data - handle(ctx, message, err) - } + case kafka.Error: + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Error: %v", e)) + } + run = false + default: + } + } +} +func (c *Consumer) ConsumeValue(ctx context.Context, handle func(context.Context, []byte)) { + defer c.Consumer.Close() + err := c.Consumer.SubscribeTopics(c.Topics, nil) + if err != nil { + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Consume Topic err: %v", err)) + } + return + } + run := true + for run == true { + ev := c.Consumer.Poll(0) + switch e := ev.(type) { + case *kafka.Message: + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value))) } + handle(ctx, e.Value) case kafka.PartitionEOF: if c.LogInfo != nil { c.LogInfo(ctx, fmt.Sprintf("Reached %v", e)) + } + case kafka.Error: + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Error: %v", e)) + } + run = false + default: + } + } +} +func (c *Consumer) ConsumeMessage(ctx context.Context, handle func(context.Context, *kafka.Message)) { + defer c.Consumer.Close() + err := c.Consumer.SubscribeTopics(c.Topics, nil) + if err != nil { + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Consume Topic err: %v", err)) + } + return + } + run := true + for run == true { + ev := c.Consumer.Poll(0) + switch e := ev.(type) { + case *kafka.Message: + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Message on %s: %s", e.TopicPartition, string(e.Value))) + } + handle(ctx, e) + case kafka.PartitionEOF: + if c.LogInfo != nil { + c.LogInfo(ctx, fmt.Sprintf("Reached %v", e)) } case kafka.Error: if c.LogError != nil { c.LogError(ctx, fmt.Sprintf("Error: %v", e)) } - handle(ctx, nil, e) run = false default: } diff --git a/confluent/producer.go b/confluent/producer.go index dcc40ca..a5cad5d 100644 --- a/confluent/producer.go +++ b/confluent/producer.go @@ -102,25 +102,25 @@ func NewProducerWithRetryArray(c ProducerConfig, retries []time.Duration, conver } return p, err } -func (p *Producer) Put(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Producer) Put(ctx context.Context, data []byte, attributes map[string]string) error { return p.Produce(ctx, data, attributes) } -func (p *Producer) Write(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Producer) Write(ctx context.Context, data []byte, attributes map[string]string) error { return p.Produce(ctx, data, attributes) } -func (p *Producer) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Producer) Publish(ctx context.Context, data []byte, attributes map[string]string) error { return p.Produce(ctx, data, attributes) } -func (p *Producer) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Producer) Send(ctx context.Context, data []byte, attributes map[string]string) error { return p.Produce(ctx, data, attributes) } -func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes map[string]string) (string, error) { +func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes map[string]string) error { var binary = data var err error if p.Convert != nil { binary, err = p.Convert(ctx, data) if err != nil { - return "", err + return err } } msg := kafka.Message{ @@ -137,7 +137,7 @@ func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes m defer close(deliveryChan) err = p.Producer.Produce(&msg, deliveryChan) if err != nil { - return msg.String(), err + return err } p.Producer.Flush(p.Timeout) e := <-deliveryChan @@ -147,21 +147,24 @@ func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes m if p.Error != nil { err = p.Error(m, err) } - return msg.String(), err + return err } - return msg.String(), m.TopicPartition.Error + return m.TopicPartition.Error case kafka.Error: - return "", m + return m } - return msg.String(), nil + return nil } -func (p *Producer) ProduceWithKey(ctx context.Context, data []byte, key []byte, messageAttributes map[string]string) (string, error) { +func (p *Producer) ProduceValue(ctx context.Context, data []byte) error { + return p.Produce(ctx, data, nil) +} +func (p *Producer) ProduceWithKey(ctx context.Context, key []byte, data []byte, messageAttributes map[string]string) error { var binary = data var err error if p.Convert != nil { binary, err = p.Convert(ctx, data) if err != nil { - return "", err + return err } } msg := kafka.Message{ @@ -177,7 +180,7 @@ func (p *Producer) ProduceWithKey(ctx context.Context, data []byte, key []byte, defer close(deliveryChan) err = p.Producer.Produce(&msg, deliveryChan) if err != nil { - return msg.String(), err + return err } p.Producer.Flush(p.Timeout) e := <-deliveryChan @@ -187,11 +190,11 @@ func (p *Producer) ProduceWithKey(ctx context.Context, data []byte, key []byte, if p.Error != nil { err = p.Error(m, err) } - return msg.String(), err + return err } - return msg.String(), m.TopicPartition.Error + return m.TopicPartition.Error case kafka.Error: - return "", m + return m } - return msg.String(), nil + return nil } diff --git a/confluent/simple_producer.go b/confluent/simple_producer.go index 3749c5d..770f98f 100644 --- a/confluent/simple_producer.go +++ b/confluent/simple_producer.go @@ -8,7 +8,7 @@ import ( ) type ( - SimpleProducer struct { + TopicProducer struct { Producer *kafka.Producer Timeout int Convert func(context.Context, []byte) ([]byte, error) @@ -16,7 +16,7 @@ type ( Error func(*kafka.Message, error) error } ) -func NewSimpleProducerByConfigMap(c kafka.ConfigMap, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*SimpleProducer, error) { +func NewTopicProducerByConfigMap(c kafka.ConfigMap, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*TopicProducer, error) { p, err := kafka.NewProducer(&c) if err != nil { fmt.Printf("Failed to create Producer: %s\n", err) @@ -29,7 +29,7 @@ func NewSimpleProducerByConfigMap(c kafka.ConfigMap, timeout int, convert func(c if timeout <= 0 { timeout = 100 } - pd := &SimpleProducer{ + pd := &TopicProducer{ Producer: p, Timeout: timeout, Convert: convert, @@ -37,7 +37,7 @@ func NewSimpleProducerByConfigMap(c kafka.ConfigMap, timeout int, convert func(c } return pd, nil } -func NewSimpleProducerByConfig(c ProducerConfig, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*SimpleProducer, error) { +func NewTopicProducerByConfig(c ProducerConfig, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) (*TopicProducer, error) { p, err := NewKafkaProducerByConfig(c) if err != nil { fmt.Printf("Failed to create Producer: %s\n", err) @@ -50,7 +50,7 @@ func NewSimpleProducerByConfig(c ProducerConfig, timeout int, convert func(conte if timeout <= 0 { timeout = 100 } - pd := &SimpleProducer{ + pd := &TopicProducer{ Producer: p, Timeout: timeout, Convert: convert, @@ -58,7 +58,7 @@ func NewSimpleProducerByConfig(c ProducerConfig, timeout int, convert func(conte } return pd, nil } -func NewSimpleProducer(producer *kafka.Producer, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) *SimpleProducer { +func NewSimpleProducer(producer *kafka.Producer, timeout int, convert func(context.Context, []byte) ([]byte, error), options ...func() string) *TopicProducer { var generate func() string if len(options) > 0 { generate = options[0] @@ -66,7 +66,7 @@ func NewSimpleProducer(producer *kafka.Producer, timeout int, convert func(conte if timeout <= 0 { timeout = 100 } - return &SimpleProducer{Producer: producer, Timeout: timeout, Convert: convert, Generate: generate} + return &TopicProducer{Producer: producer, Timeout: timeout, Convert: convert, Generate: generate} } func NewKafkaProducerByConfig(c ProducerConfig) (*kafka.Producer, error) { conf := kafka.ConfigMap{ @@ -100,25 +100,13 @@ func NewKafkaProducerByConfig(c ProducerConfig) (*kafka.Producer, error) { return kafka.NewProducer(&conf) } -func (p *SimpleProducer) Put(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { - return p.Produce(ctx, topic, data, attributes) -} -func (p *SimpleProducer) Send(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { - return p.Produce(ctx, topic, data, attributes) -} -func (p *SimpleProducer) Write(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { - return p.Produce(ctx, topic, data, attributes) -} -func (p *SimpleProducer) Publish(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { - return p.Produce(ctx, topic, data, attributes) -} -func (p *SimpleProducer) Produce(ctx context.Context, topic string, data []byte, messageAttributes map[string]string) (string, error) { +func (p *TopicProducer) Produce(ctx context.Context, topic string, data []byte, messageAttributes map[string]string) error { var binary = data var err error if p.Convert != nil { binary, err = p.Convert(ctx, data) if err != nil { - return "", err + return err } } msg := kafka.Message{ @@ -135,7 +123,7 @@ func (p *SimpleProducer) Produce(ctx context.Context, topic string, data []byte, defer close(deliveryChan) err = p.Producer.Produce(&msg, deliveryChan) if err != nil { - return msg.String(), err + return err } p.Producer.Flush(p.Timeout) e := <-deliveryChan @@ -145,21 +133,21 @@ func (p *SimpleProducer) Produce(ctx context.Context, topic string, data []byte, if p.Error != nil { err = p.Error(m, err) } - return msg.String(), err + return err } - return msg.String(), m.TopicPartition.Error + return m.TopicPartition.Error case kafka.Error: - return "", m + return m } - return msg.String(), nil + return nil } -func (p *SimpleProducer) ProduceWithKey(ctx context.Context, topic string, data []byte, key []byte, messageAttributes map[string]string) (string, error) { +func (p *TopicProducer) ProduceWithKey(ctx context.Context, topic string, key []byte, data []byte, messageAttributes map[string]string) error { var binary = data var err error if p.Convert != nil { binary, err = p.Convert(ctx, data) if err != nil { - return "", err + return err } } msg := kafka.Message{ @@ -175,7 +163,7 @@ func (p *SimpleProducer) ProduceWithKey(ctx context.Context, topic string, data defer close(deliveryChan) err = p.Producer.Produce(&msg, deliveryChan) if err != nil { - return msg.String(), err + return err } p.Producer.Flush(p.Timeout) e := <-deliveryChan @@ -185,13 +173,13 @@ func (p *SimpleProducer) ProduceWithKey(ctx context.Context, topic string, data if p.Error != nil { err = p.Error(m, err) } - return msg.String(), err + return err } - return msg.String(), m.TopicPartition.Error + return m.TopicPartition.Error case kafka.Error: - return "", m + return m } - return msg.String(), nil + return nil } func MapToHeader(messageAttributes map[string]string) []kafka.Header { headers := make([]kafka.Header, 0) diff --git a/error_handler.go b/error_handler.go index 03768c7..f9017dd 100644 --- a/error_handler.go +++ b/error_handler.go @@ -5,25 +5,64 @@ import ( "fmt" ) -func NewErrorHandler(logError ...func(context.Context, string)) *ErrorHandler { - h := &ErrorHandler{} +type ErrorMessage struct { + Field string `yaml:"field" mapstructure:"field" json:"field,omitempty" gorm:"column:field" bson:"field,omitempty" dynamodbav:"field,omitempty" firestore:"field,omitempty"` + Code string `yaml:"code" mapstructure:"code" json:"code,omitempty" gorm:"column:code" bson:"code,omitempty" dynamodbav:"code,omitempty" firestore:"code,omitempty"` + Param string `yaml:"param" mapstructure:"param" json:"param,omitempty" gorm:"column:param" bson:"param,omitempty" dynamodbav:"param,omitempty" firestore:"param,omitempty"` + Message string `yaml:"message" mapstructure:"message" json:"message,omitempty" gorm:"column:message" bson:"message,omitempty" dynamodbav:"message,omitempty" firestore:"message,omitempty"` +} + +func NewErrorHandler[T any](logError ...func(context.Context, string)) *ErrorHandler[T] { + h := &ErrorHandler[T]{} if len(logError) >= 1 { h.LogError = logError[0] } return h } -type ErrorHandler struct { +type ErrorHandler[T any] struct { LogError func(context.Context, string) } -func (w *ErrorHandler) HandleError(ctx context.Context, data []byte, attrs map[string]string) error { +func (w *ErrorHandler[T]) Reject(ctx context.Context, res T, err []ErrorMessage, data []byte) { + if w.LogError != nil && data != nil { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s Error: %+v", data, err)) + } +} +func (w *ErrorHandler[T]) RejectWithMap(ctx context.Context, res T, err []ErrorMessage, data []byte, attrs map[string]string) { + if w.LogError != nil && data != nil { + if len(attrs) > 0 { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s Attributes: %+v Error: %+v", data, attrs, err)) + } else { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s Error: %+v", data, err)) + } + } +} +func (w *ErrorHandler[T]) HandleError(ctx context.Context, data []byte) { + if w.LogError != nil && data != nil { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s", data)) + } +} +func (w *ErrorHandler[T]) HandleErrorWithMap(ctx context.Context, data []byte, attrs map[string]string) { if w.LogError != nil && data != nil { - if attrs == nil || len(attrs) == 0 { - w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s", data)) + if len(attrs) > 0 { + w.LogError(ctx, fmt.Sprintf("Message is invalid %s Attributes: %+v", data, attrs)) } else { - w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s %s", data, attrs)) + w.LogError(ctx, fmt.Sprintf("Message is invalid %s", data)) + } + } +} +func GetString(ctx context.Context, key string) string { + if len(key) > 0 { + u := ctx.Value(key) + if u != nil { + s, ok := u.(string) + if ok { + return s + } else { + return "" + } } } - return nil + return "" } diff --git a/error_writer.go b/error_writer.go deleted file mode 100644 index b154eb2..0000000 --- a/error_writer.go +++ /dev/null @@ -1,54 +0,0 @@ -package mq - -import ( - "context" - "encoding/json" - "fmt" - "reflect" -) - -func NewErrorWriter(write func(ctx context.Context, model interface{}) error, modelType *reflect.Type, logError ...func(context.Context, string)) *ErrorWriter { - return NewErrorWriterWithUnmarshal(write, modelType, json.Unmarshal, logError...) -} -func NewErrorWriterWithUnmarshal(write func(ctx context.Context, model interface{}) error, modelType *reflect.Type, unmarshal func(data []byte, v interface{}) error, logError ...func(context.Context, string)) *ErrorWriter { - if unmarshal == nil { - unmarshal = json.Unmarshal - } - h := &ErrorWriter{Write: write, ModelType: modelType, Unmarshal: unmarshal} - if len(logError) >= 1 { - h.LogError = logError[0] - } - return h -} -type ErrorWriter struct { - Write func(ctx context.Context, model interface{}) error - ModelType *reflect.Type - Unmarshal func(data []byte, v interface{}) error - LogError func(context.Context, string) -} - -func (w *ErrorWriter) HandleError(ctx context.Context, data []byte, attrs map[string]string) error { - if data == nil { - return nil - } - if w.Write != nil { - if w.ModelType == nil { - return w.Write(ctx, data) - } else { - v := InitModel(*w.ModelType) - err := w.Unmarshal(data, v) - if err != nil { - return err - } - return w.Write(ctx, v) - } - } - if w.LogError != nil { - if attrs == nil || len(attrs) == 0 { - w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s", data)) - } else { - w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s %s", data, attrs)) - } - } - return nil -} diff --git a/func.go b/func.go deleted file mode 100644 index 20f03a0..0000000 --- a/func.go +++ /dev/null @@ -1,11 +0,0 @@ -package mq - -import "reflect" - -func InitModel(modelType reflect.Type) interface{} { - return reflect.New(modelType).Interface() -} - -func InitModels(modelsType reflect.Type) interface{} { - return reflect.New(modelsType).Interface() -} diff --git a/handler.go b/handler.go index 25c46a0..3fc6832 100644 --- a/handler.go +++ b/handler.go @@ -4,58 +4,69 @@ import ( "context" "encoding/json" "fmt" - "reflect" - "strconv" "time" ) type HandlerConfig struct { - RetryCountName string `mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"` - LimitRetry int `mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"` - Goroutines bool `mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"` + Retry *RetryConfig `yaml:"retry" mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"` + Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"` + Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"` } -type Handler struct { - Write func(ctx context.Context, model interface{}) error - ModelType *reflect.Type - Validate func(ctx context.Context, message *Message) error - LimitRetry int - Retry func(context.Context, []byte, map[string]string) error - RetryCountName string - Error func(context.Context, []byte, map[string]string) error - Unmarshal func([]byte, interface{}) error - Retries []time.Duration - Goroutines bool - LogError func(context.Context, string) - LogInfo func(context.Context, string) +type Handler[T any] struct { + Unmarshal func(data []byte, v any) error + Write func(ctx context.Context, model *T) error + Validate func(context.Context, *T) ([]ErrorMessage, error) + Reject func(context.Context, *T, []ErrorMessage, []byte) + HandleError func(context.Context, []byte) + Retries []time.Duration + Goroutines bool + Key string + LogError func(context.Context, string) + LogInfo func(context.Context, string) } -func NewHandlerByConfig(c HandlerConfig, write func(context.Context, interface{}) error, modelType *reflect.Type, retry func(context.Context, []byte, map[string]string) error, validate func(context.Context, *Message) error, handleError func(context.Context, []byte, map[string]string) error, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler { - return NewHandlerWithRetryService(write, modelType, c.LimitRetry, retry, c.RetryCountName, validate, handleError, c.Goroutines, unmarshal, logs...) +func NewHandlerByConfig[T any](c HandlerConfig, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte), + handleError func(context.Context, []byte), + goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { + return NewHandlerByConfigAndUnmarshal[T](c, nil, write, validate, reject, handleError, goroutines, key, logs...) } -func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, handleError func(context.Context, []byte, map[string]string) error, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler { - if c == nil { - return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, unmarshal, logs...) - } - retries := DurationsFromValue(*c, "Retry", 20) - if len(retries) == 0 { - return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, unmarshal, logs...) +func NewHandlerByConfigAndUnmarshal[T any](c HandlerConfig, + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte), + handleError func(context.Context, []byte), + goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { + if c.Retry == nil { + return NewHandlerWithKey[T](unmarshal, write, validate, reject, handleError, nil, c.Goroutines, c.Key, logs...) + } else { + retries := DurationsFromValue(c.Retry, "Retry", 20) + return NewHandlerWithKey[T](unmarshal, write, validate, reject, handleError, retries, c.Goroutines, c.Key, logs...) } - return NewHandlerWithRetries(write, modelType, validate, retries, handleError, goroutines, unmarshal, logs...) } -func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, goroutines bool, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler { +func NewHandlerWithKey[T any]( + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte), + handleError func(context.Context, []byte), + retries []time.Duration, + goroutines bool, key string, logs ...func(context.Context, string)) *Handler[T] { if unmarshal == nil { unmarshal = json.Unmarshal } - c := &Handler{ - ModelType: modelType, - Write: write, - Validate: validate, - Unmarshal: unmarshal, - Goroutines: goroutines, - Error: handleError, - } - if retries != nil { - c.Retries = retries + c := &Handler[T]{ + Write: write, + Unmarshal: unmarshal, + Validate: validate, + Reject: reject, + HandleError: handleError, + Retries: retries, + Goroutines: goroutines, + Key: key, } if len(logs) >= 1 { c.LogError = logs[0] @@ -65,193 +76,96 @@ func NewHandlerWithRetries(write func(context.Context, interface{}) error, model } return c } -func NewHandler(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, goroutines bool, unmarshal func([]byte, interface{}) error, logs ...func(context.Context, string)) *Handler { - return NewHandlerWithRetryService(write, modelType, -1, nil, "", validate, nil, goroutines, unmarshal, logs...) -} -func NewHandlerWithRetryService(write func(context.Context, interface{}) error, modelType *reflect.Type, limitRetry int, retry func(context.Context, []byte, map[string]string) error, retryCountName string, validate func(context.Context, *Message) error, - handleError func(context.Context, []byte, map[string]string) error, - goroutines bool, - unmarshal func([]byte, interface{}) error, - logs ...func(context.Context, string)) *Handler { - if len(retryCountName) == 0 { - retryCountName = "retryCount" - } - if retry != nil && handleError == nil { - e1 := NewErrorHandler(logs...) - handleError = e1.HandleError - } - if unmarshal == nil { - unmarshal = json.Unmarshal - } - c := &Handler{ - ModelType: modelType, - Write: write, - Validate: validate, - LimitRetry: limitRetry, - Retry: retry, - RetryCountName: retryCountName, - Error: handleError, - Unmarshal: unmarshal, - Goroutines: goroutines, - } - if len(logs) >= 1 { - c.LogError = logs[0] + +func (c *Handler[T]) Handle(ctx context.Context, data []byte) { + if data == nil { + return } - if len(logs) >= 2 { - c.LogInfo = logs[1] + if c.LogInfo != nil { + key := GetString(ctx, c.Key) + if len(key) > 0 { + c.LogInfo(ctx, fmt.Sprintf("Received message with key %s : %s", key, data)) + } else { + c.LogInfo(ctx, fmt.Sprintf("Received message: %s", data)) + } } - return c -} - -func (c *Handler) Handle(ctx context.Context, data []byte, header map[string]string, err error) error { - if err != nil { + var v T + er1 := json.Unmarshal(data, &v) + if er1 != nil { if c.LogError != nil { - c.LogError(ctx, "Processing message error: "+err.Error()) + c.LogError(ctx, fmt.Sprintf("cannot unmarshal item: %s. Error: %s", data, er1.Error())) } - return err - } else if data == nil { - return nil - } - if c.LogInfo != nil { - c.LogInfo(ctx, fmt.Sprintf("Received message: %s", data)) + return } - message := &Message{Data: data, Attributes: header} if c.Validate != nil { - er2 := c.Validate(ctx, message) - if er2 != nil { + errs, err := c.Validate(ctx, &v) + if err != nil { if c.LogError != nil { - if header == nil || len(header) == 0 { - c.LogError(ctx, fmt.Sprintf("Message is invalid: %s . Error: %s", data, er2.Error())) - } else { - c.LogError(ctx, fmt.Sprintf("Message is invalid: %s %s . Error: %s", data, header, er2.Error())) - } + c.LogError(ctx, "Error when validate data: "+err.Error()) } - return er2 + return } - } - var item interface{} - if message.Value != nil { - item = message.Value - } - if c.ModelType != nil && item == nil { - v := InitModel(*c.ModelType) - er1 := c.Unmarshal(message.Data, v) - if er1 != nil { - if c.LogError != nil { - c.LogError(ctx, fmt.Sprintf(`cannot unmarshal item: %s. Error: %s`, message.Data, er1.Error())) - } - return nil + if len(errs) > 0 { + c.Reject(ctx, &v, errs, data) } - item = v } if c.Goroutines { - go c.write(ctx, data, header, item) - return nil + go c.write(ctx, data, &v) } else { - return c.write(ctx, data, header, item) - } -} -func (c *Handler) write(ctx context.Context, data []byte, header map[string]string, item interface{}) error { - // ctx = context.WithValue(ctx, "message", message) - if c.Retry == nil && c.Retries != nil && len(c.Retries) > 0 { - return WriteWithRetry(ctx, c.Write, data, item, c.Retries, c.Error, c.LogError) - } else { - return Write(ctx, c.Write, data, header, item, c.Error, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) + c.write(ctx, data, &v) } } -func WriteWithRetry(ctx context.Context, write func(context.Context, interface{}) error, data []byte, item interface{}, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) error { - var logError func(context.Context, string) - if len(logs) > 0 { - logError = logs[0] +func (c *Handler[T]) write(ctx context.Context, data []byte, item *T) error { + er3 := c.Write(ctx, item) + if er3 == nil { + return er3 } - if er1 := write(ctx, item); er1 != nil { + if c.Retries != nil && len(c.Retries) > 0 { i := 0 - err := Retry(ctx, retries, func() (err error) { + err := Retry(ctx, c.Retries, func() (err error) { i = i + 1 - er2 := write(ctx, item) - if er2 == nil && logError != nil { - logError(ctx, fmt.Sprintf("Write successfully after %d retries %s", i, data)) + er2 := c.Write(ctx, item) + if er2 == nil { + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Write successfully after %d retries %s", i, data)) + } } return er2 - }, logError) - if err != nil { - if handleError != nil { - handleError(ctx, data, nil) - } - if logError != nil { - logError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), data, er1.Error())) + }, c.LogError) + if err != nil && c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(c.Retries), data, err.Error())) + if c.HandleError != nil { + c.HandleError(ctx, data) } } - return err - } - return nil -} -func Write(ctx context.Context, write func(context.Context, interface{}) error, data []byte, attrs map[string]string, item interface{}, handleError func(context.Context, []byte, map[string]string) error, retry func(context.Context, []byte, map[string]string) error, limitRetry int, retryCountName string, logs ...func(context.Context, string)) error { - var logError func(context.Context, string) - var logInfo func(context.Context, string) - if len(logs) > 0 { - logError = logs[0] - } - if len(logs) > 1 { - logInfo = logs[1] - } - er3 := write(ctx, item) - if er3 == nil { - return er3 - } - if logError != nil { - logError(ctx, fmt.Sprintf("Fail to write %s . Error: %s", data, er3.Error())) - } - - if retry == nil { - if handleError != nil { - handleError(ctx, data, attrs) - } - return er3 - } - retryCount := 0 - if attrs == nil { - attrs = make(map[string]string) + return nil } else { - var er4 error - retryCount, er4 = strconv.Atoi(attrs[retryCountName]) - if er4 != nil { - retryCount = 0 + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("Failed to write %s . Error: %s", data, er3.Error())) } + if c.HandleError != nil { + c.HandleError(ctx, data) + } + return er3 } - retryCount++ - if retryCount > limitRetry { - if logInfo != nil { - if attrs == nil || len(attrs) == 0 { - logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount, limitRetry, data)) - } else { - logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s %s.", retryCount, limitRetry, data, attrs)) - } +} +// Retry Copy this code from https://stackoverflow.com/questions/47606761/repeat-code-if-an-error-occured +func Retry(ctx context.Context, sleeps []time.Duration, f func() error, log func(context.Context, string)) (err error) { + attempts := len(sleeps) + for i := 0; ; i++ { + time.Sleep(sleeps[i]) + err = f() + if err == nil { + return } - if handleError != nil { - handleError(ctx, data, attrs) + if i >= (attempts - 1) { + break } - } else { - if logInfo != nil { - if attrs == nil || len(attrs) == 0 { - logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s.", retryCount, data)) - } else { - logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s %s.", retryCount, data, attrs)) - } - } - attrs[retryCountName] = strconv.Itoa(retryCount) - er2 := retry(ctx, data, attrs) - if er2 != nil { - if logError != nil { - if attrs == nil || len(attrs) == 0 { - logError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", data, er2.Error())) - } else { - logError(ctx, fmt.Sprintf("Cannot retry %s %s. Error: %s", data, attrs, er2.Error())) - } - } + if log != nil { + log(ctx, fmt.Sprintf("Retrying %d of %d after error: %s", i+1, attempts, err.Error())) } } - return nil + return fmt.Errorf("after %d attempts, last error: %s", attempts, err) } diff --git a/kafka/reader.go b/kafka/reader.go index 0ef1c77..bd708e3 100644 --- a/kafka/reader.go +++ b/kafka/reader.go @@ -3,7 +3,6 @@ package kafka import ( "context" "crypto/tls" - "github.com/core-go/mq" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/scram" "time" @@ -11,55 +10,71 @@ import ( type Reader struct { Reader *kafka.Reader + LogError func(ctx context.Context, msg string) AckOnConsume bool - Convert func(context.Context, []byte) ([]byte, error) + Key string } -func NewReader(reader *kafka.Reader, ackOnConsume bool, options...func(context.Context, []byte)([]byte, error)) (*Reader, error) { - var convert func(context.Context, []byte)([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &Reader{Reader: reader, AckOnConsume: ackOnConsume, Convert: convert}, nil +func NewReader(reader *kafka.Reader, logError func(ctx context.Context, msg string), ackOnConsume bool, key string) (*Reader, error) { + return &Reader{Reader: reader, LogError: logError, AckOnConsume: ackOnConsume, Key: key}, nil } -func NewReaderByConfig(c ReaderConfig, ackOnConsume bool, options...func(context.Context, []byte)([]byte, error)) (*Reader, error) { +func NewReaderByConfig(c ReaderConfig, logError func(ctx context.Context, msg string), ackOnConsume bool) (*Reader, error) { dialer := GetDialer(c.Client.Username, c.Client.Password, scram.SHA512, &kafka.Dialer{ Timeout: 30 * time.Second, DualStack: true, TLS: &tls.Config{}, }) reader := NewKafkaReader(c, dialer) - return NewReader(reader, ackOnConsume, options...) + return NewReader(reader, logError, ackOnConsume, c.Key) } -func (c *Reader) Read(ctx context.Context, handle func(context.Context, *mq.Message, error) error) { +func (c *Reader) Read(ctx context.Context, handle func(context.Context, []byte, map[string]string)) { for { msg, err := c.Reader.FetchMessage(ctx) if err != nil { - handle(ctx, nil, err) + c.LogError(ctx, "Error when read: "+err.Error()) } else { attributes := HeaderToMap(msg.Headers) - message := mq.Message{ - Id: string(msg.Key), - Data: msg.Value, - Attributes: attributes, - Timestamp: &msg.Time, - Raw: msg, + if len(c.Key) > 0 && msg.Key != nil { + ctx = context.WithValue(ctx, c.Key, string(msg.Key)) } if c.AckOnConsume { c.Reader.CommitMessages(ctx, msg) } - if c.Convert == nil { - handle(ctx, &message, nil) - } else { - data, err := c.Convert(ctx, msg.Value) - if err == nil { - message.Data = data - } - handle(ctx, &message, err) + handle(ctx, msg.Value, attributes) + } + } +} +func (c *Reader) ReadValue(ctx context.Context, handle func(context.Context, []byte)) { + for { + msg, err := c.Reader.FetchMessage(ctx) + if err != nil { + c.LogError(ctx, "Error when read: "+err.Error()) + } else { + if len(c.Key) > 0 && msg.Key != nil { + ctx = context.WithValue(ctx, c.Key, string(msg.Key)) + } + if c.AckOnConsume { + c.Reader.CommitMessages(ctx, msg) } - + handle(ctx, msg.Value) + } + } +} +func (c *Reader) ReadMessage(ctx context.Context, handle func(context.Context, kafka.Message)) { + for { + msg, err := c.Reader.FetchMessage(ctx) + if err != nil { + c.LogError(ctx, "Error when read: "+err.Error()) + } else { + if len(c.Key) > 0 && msg.Key != nil { + ctx = context.WithValue(ctx, c.Key, string(msg.Key)) + } + if c.AckOnConsume { + c.Reader.CommitMessages(ctx, msg) + } + handle(ctx, msg) } } } diff --git a/kafka/reader_config.go b/kafka/reader_config.go index 17c45c9..5fab440 100644 --- a/kafka/reader_config.go +++ b/kafka/reader_config.go @@ -1,11 +1,12 @@ package kafka type ReaderConfig struct { - Brokers []string `mapstructure:"brokers" json:"brokers,omitempty" gorm:"column:brokers" bson:"brokers,omitempty" dynamodbav:"brokers,omitempty" firestore:"brokers,omitempty"` - GroupID string `mapstructure:"group_id" json:"groupID,omitempty" gorm:"column:groupid" bson:"groupID,omitempty" dynamodbav:"groupID,omitempty" firestore:"groupID,omitempty"` - Topic string `mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"` - Client ClientConfig `mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"` - MinBytes *int `mapstructure:"min_bytes" json:"minBytes,omitempty" gorm:"column:minbytes" bson:"minBytes,omitempty" dynamodbav:"minBytes,omitempty" firestore:"minBytes,omitempty"` - MaxBytes int `mapstructure:"max_bytes" json:"maxBytes,omitempty" gorm:"column:maxbytes" bson:"maxBytes,omitempty" dynamodbav:"maxBytes,omitempty" firestore:"maxBytes,omitempty"` - CommitInterval *int64 `mapstructure:"commit_interval" json:"commitInterval,omitempty" gorm:"column:commitinterval" bson:"commitInterval,omitempty" dynamodbav:"commitInterval,omitempty" firestore:"commitInterval,omitempty"` + Brokers []string `yaml:"brokers" mapstructure:"brokers" json:"brokers,omitempty" gorm:"column:brokers" bson:"brokers,omitempty" dynamodbav:"brokers,omitempty" firestore:"brokers,omitempty"` + GroupID string `yaml:"group_id" mapstructure:"group_id" json:"groupID,omitempty" gorm:"column:groupid" bson:"groupID,omitempty" dynamodbav:"groupID,omitempty" firestore:"groupID,omitempty"` + Topic string `yaml:"topic" mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"` + Client ClientConfig `yaml:"client" mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"` + MinBytes *int `yaml:"min_bytes" mapstructure:"min_bytes" json:"minBytes,omitempty" gorm:"column:minbytes" bson:"minBytes,omitempty" dynamodbav:"minBytes,omitempty" firestore:"minBytes,omitempty"` + MaxBytes int `yaml:"max_bytes" mapstructure:"max_bytes" json:"maxBytes,omitempty" gorm:"column:maxbytes" bson:"maxBytes,omitempty" dynamodbav:"maxBytes,omitempty" firestore:"maxBytes,omitempty"` + CommitInterval *int64 `yaml:"commit_interval" mapstructure:"commit_interval" json:"commitInterval,omitempty" gorm:"column:commitinterval" bson:"commitInterval,omitempty" dynamodbav:"commitInterval,omitempty" firestore:"commitInterval,omitempty"` + Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"` } diff --git a/kafka/simple_reader.go b/kafka/simple_reader.go deleted file mode 100644 index efec3f5..0000000 --- a/kafka/simple_reader.go +++ /dev/null @@ -1,53 +0,0 @@ -package kafka - -import ( - "context" - "crypto/tls" - "github.com/segmentio/kafka-go" - "github.com/segmentio/kafka-go/sasl/scram" - "time" -) - -type SimpleReader struct { - SimpleReader *kafka.Reader - AckOnConsume bool - Convert func(context.Context, []byte)([]byte, error) -} - -func NewSimpleReader(reader *kafka.Reader, ackOnConsume bool, options...func(context.Context, []byte)([]byte, error)) (*SimpleReader, error) { - var convert func(context.Context, []byte)([]byte, error) - if len(options) > 0 { - convert = options[0] - } - return &SimpleReader{SimpleReader: reader, AckOnConsume: ackOnConsume, Convert: convert}, nil -} - -func NewSimpleReaderByConfig(c ReaderConfig, ackOnConsume bool, options...func(context.Context, []byte)([]byte, error)) (*SimpleReader, error) { - dialer := GetDialer(c.Client.Username, c.Client.Password, scram.SHA512, &kafka.Dialer{ - Timeout: 30 * time.Second, - DualStack: true, - TLS: &tls.Config{}, - }) - reader := NewKafkaReader(c, dialer) - return NewSimpleReader(reader, ackOnConsume, options...) -} - -func (c *SimpleReader) Read(ctx context.Context, handle func(context.Context, []byte, map[string]string, error) error) { - for { - msg, err := c.SimpleReader.FetchMessage(ctx) - if err != nil { - handle(ctx, nil, nil, err) - } else { - attributes := HeaderToMap(msg.Headers) - if c.AckOnConsume { - c.SimpleReader.CommitMessages(ctx, msg) - } - if c.Convert == nil { - handle(ctx, msg.Value, attributes, nil) - } else { - data, err := c.Convert(ctx, msg.Value) - handle(ctx, data, attributes, err) - } - } - } -} diff --git a/kafka/simple_writer.go b/kafka/simple_writer.go index 28ecc7a..4382ac6 100644 --- a/kafka/simple_writer.go +++ b/kafka/simple_writer.go @@ -8,51 +8,42 @@ import ( "time" ) -type SimpleWriter struct { +type TopicWriter struct { Writer *kafka.Writer - Convert func(context.Context, []byte)([]byte, error) Generate func()string } -func NewSimpleWriter(writer *kafka.Writer, convert func(context.Context, []byte)([]byte, error), options...func()string) (*SimpleWriter, error) { +func NewTopicWriter(writer *kafka.Writer, options...func()string) (*TopicWriter, error) { var generate func()string if len(options) > 0 { generate = options[0] } - return &SimpleWriter{Writer: writer, Convert: convert, Generate: generate}, nil + return &TopicWriter{Writer: writer, Generate: generate}, nil } -func NewSimpleWriterByConfig(c WriterConfig, convert func(context.Context, []byte)([]byte, error), options...func()string) (*SimpleWriter, error) { +func NewTopicWriterByConfig(c WriterConfig, options...func()string) (*TopicWriter, error) { dialer := GetDialer(c.Client.Username, c.Client.Password, scram.SHA512, &kafka.Dialer{ Timeout: 30 * time.Second, DualStack: true, TLS: &tls.Config{}, }) writer := NewKafkaWriter(c.Topic, c.Brokers, dialer) - return NewSimpleWriter(writer, convert, options...) + return NewTopicWriter(writer, options...) } -func (p *SimpleWriter) Publish(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { +func (p *TopicWriter) Publish(ctx context.Context, topic string, data []byte, attributes map[string]string) error { return p.Write(ctx, topic, data, attributes) } -func (p *SimpleWriter) Send(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { +func (p *TopicWriter) Send(ctx context.Context, topic string, data []byte, attributes map[string]string) error { return p.Write(ctx, topic, data, attributes) } -func (p *SimpleWriter) Put(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { +func (p *TopicWriter) Put(ctx context.Context, topic string, data []byte, attributes map[string]string) error { return p.Write(ctx, topic, data, attributes) } -func (p *SimpleWriter) Produce(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { +func (p *TopicWriter) Produce(ctx context.Context, topic string, data []byte, attributes map[string]string) error { return p.Write(ctx, topic, data, attributes) } -func (p *SimpleWriter) Write(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { - var binary = data - var err error - if p.Convert != nil { - binary, err = p.Convert(ctx, binary) - if err != nil { - return "", err - } - } - msg := kafka.Message{Value: binary} +func (p *TopicWriter) Write(ctx context.Context, topic string, data []byte, attributes map[string]string) error { + msg := kafka.Message{Value: data} if attributes != nil { msg.Headers = MapToHeader(attributes) } @@ -61,22 +52,19 @@ func (p *SimpleWriter) Write(ctx context.Context, topic string, data []byte, att msg.Key = []byte(id) p.Writer.Topic = topic err := p.Writer.WriteMessages(ctx, msg) - return id, err + return err } else { p.Writer.Topic = topic err := p.Writer.WriteMessages(ctx, msg) - return "", err + return err } } -func (p *SimpleWriter) WriteWithKey(ctx context.Context, topic string, data []byte, key []byte, attributes map[string]string) (string, error) { +func (p *TopicWriter) WriteValue(ctx context.Context, topic string, data []byte) error { + return p.Write(ctx, topic, data, nil) +} +func (p *TopicWriter) WriteWithKey(ctx context.Context, topic string, data []byte, key []byte, attributes map[string]string) (string, error) { var binary = data var err error - if p.Convert != nil { - binary, err = p.Convert(ctx, binary) - if err != nil { - return "", err - } - } msg := kafka.Message{Value: binary} if attributes != nil { msg.Headers = MapToHeader(attributes) @@ -84,6 +72,7 @@ func (p *SimpleWriter) WriteWithKey(ctx context.Context, topic string, data []by if key != nil { msg.Key = key } + p.Writer.Topic = topic err = p.Writer.WriteMessages(ctx, msg) return "", err } diff --git a/kafka/writer.go b/kafka/writer.go index 4117210..03dc46e 100644 --- a/kafka/writer.go +++ b/kafka/writer.go @@ -10,49 +10,41 @@ import ( type Writer struct { Writer *kafka.Writer - Convert func(context.Context, []byte)([]byte, error) Generate func()string } -func NewWriter(writer *kafka.Writer, convert func(context.Context, []byte)([]byte, error), options...func()string) (*Writer, error) { +func NewWriter(writer *kafka.Writer, options...func()string) (*Writer, error) { var generate func()string if len(options) > 0 { generate = options[0] } - return &Writer{Writer: writer, Convert: convert, Generate: generate}, nil + return &Writer{Writer: writer, Generate: generate}, nil } -func NewWriterByConfig(c WriterConfig, convert func(context.Context, []byte)([]byte, error), options...func()string) (*Writer, error) { +func NewWriterByConfig(c WriterConfig, options...func()string) (*Writer, error) { dialer := GetDialer(c.Client.Username, c.Client.Password, scram.SHA512, &kafka.Dialer{ Timeout: 30 * time.Second, DualStack: true, TLS: &tls.Config{}, }) writer := NewKafkaWriter(c.Topic, c.Brokers, dialer) - return NewWriter(writer, convert, options...) + return NewWriter(writer, options...) } -func (p *Writer) Put(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Writer) Put(ctx context.Context, data []byte, attributes map[string]string) error { return p.Write(ctx, data, attributes) } -func (p *Writer) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Writer) Send(ctx context.Context, data []byte, attributes map[string]string) error { return p.Write(ctx, data, attributes) } -func (p *Writer) Produce(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Writer) Produce(ctx context.Context, data []byte, attributes map[string]string) error { return p.Write(ctx, data, attributes) } -func (p *Writer) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Writer) Publish(ctx context.Context, data []byte, attributes map[string]string) error { return p.Write(ctx, data, attributes) } -func (p *Writer) Write(ctx context.Context, data []byte, attributes map[string]string) (string, error) { - var binary = data +func (p *Writer) Write(ctx context.Context, data []byte, attributes map[string]string) error { var err error - if p.Convert != nil { - binary, err = p.Convert(ctx, data) - if err != nil { - return "", err - } - } - msg := kafka.Message{Value: binary} + msg := kafka.Message{Value: data} if attributes != nil { msg.Headers = MapToHeader(attributes) } @@ -61,22 +53,28 @@ func (p *Writer) Write(ctx context.Context, data []byte, attributes map[string]s msg.Key = []byte(id) err = p.Writer.WriteMessages(ctx, msg) - return id, err + return err } else { err = p.Writer.WriteMessages(ctx, msg) - return "", err + return err } } -func (p *Writer) WriteWithKey(ctx context.Context, data []byte, key []byte, attributes map[string]string) (string, error) { - var binary = data +func (p *Writer) WriteValue(ctx context.Context, data []byte) error { var err error - if p.Convert != nil { - binary, err = p.Convert(ctx, data) - if err != nil { - return "", err - } + msg := kafka.Message{Value: data} + if p.Generate != nil { + id := p.Generate() + msg.Key = []byte(id) + err = p.Writer.WriteMessages(ctx, msg) + return err + } else { + err = p.Writer.WriteMessages(ctx, msg) + return err } - msg := kafka.Message{Value: binary} +} +func (p *Writer) WriteWithKey(ctx context.Context, data []byte, key []byte, attributes map[string]string) error { + var err error + msg := kafka.Message{Value: data} if attributes != nil { msg.Headers = MapToHeader(attributes) } @@ -84,5 +82,5 @@ func (p *Writer) WriteWithKey(ctx context.Context, data []byte, key []byte, attr msg.Key = key } err = p.Writer.WriteMessages(ctx, msg) - return "", err + return err } diff --git a/log.go b/log.go deleted file mode 100644 index 36e917b..0000000 --- a/log.go +++ /dev/null @@ -1,21 +0,0 @@ -package mq - -import "time" - -func CreateLog(data []byte, header map[string]string, id string, timestamp *time.Time) interface{} { - if len(id) == 0 && timestamp == nil && (header == nil || len(header) == 0) { - return data - } - m := make(map[string]interface{}) - if len(id) > 0 { - m["id"] = id - } - m["data"] = data - if header != nil && len(header) > 0 { - m["attributes"] = header - } - if timestamp != nil { - m["timestamp"] = timestamp - } - return m -} diff --git a/map_batch_handler.go b/map_batch_handler.go deleted file mode 100644 index 68731da..0000000 --- a/map_batch_handler.go +++ /dev/null @@ -1,172 +0,0 @@ -package mq - -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "strings" -) - -type TagName struct { - Index int - Bson string - Json string -} - -type MapBatchHandler struct { - LogError func(context.Context, string) - LogInfo func(context.Context, string) - modelType reflect.Type - modelsType reflect.Type - Write func(ctx context.Context, models []map[string]interface{}) ([]int, []int, error) // Return: Success indices, Fail indices, Error - mapJsonIndex map[string]TagName -} - -func NewMapBatchHandler(modelType reflect.Type, writeBatch func(context.Context, []map[string]interface{}) ([]int, []int, error), logs ...func(context.Context, string)) *MapBatchHandler { - modelsType := reflect.Zero(reflect.SliceOf(modelType)).Type() - typesTag := []string{"json", "bson"} - mapJsonIndex := BuildMapField(modelType, typesTag, "json") - h := &MapBatchHandler{modelType: modelType, modelsType: modelsType, Write: writeBatch, mapJsonIndex: mapJsonIndex} - if len(logs) >= 1 { - h.LogError = logs[0] - } - if len(logs) >= 2 { - h.LogInfo = logs[1] - } - return h -} - -func (h *MapBatchHandler) Handle(ctx context.Context, data []*Message) ([]*Message, error) { - failMessages := make([]*Message, 0) - - var v = reflect.Indirect(reflect.ValueOf(h.initModels())) - var messagesByteData = make([][]byte, 0) - for _, message := range data { - if message.Data != nil { - messagesByteData = append(messagesByteData, message.Data) - } - } - if h.LogInfo != nil { - sv, er0 := json.Marshal(v.Interface()) - if er0 != nil { - h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v)) - } else { - h.LogInfo(ctx, fmt.Sprintf(`models: %s`, sv)) - } - } - modelMaps, er1 := h.ConvertToMaps(messagesByteData) - if er1 != nil { - if h.LogError != nil { - h.LogError(ctx, "error when converting to map: "+er1.Error()) - } - } - successIndices, failIndices, er2 := h.Write(ctx, modelMaps) - if h.LogInfo != nil { - h.LogInfo(ctx, fmt.Sprintf(`success indices %v fail indices %v`, successIndices, failIndices)) - } - if er2 != nil { - if h.LogError != nil { - h.LogError(ctx, fmt.Sprintf("Cannot write batch: %s Error: %s", v.Interface(), er2.Error())) - } - return data, er2 - } - for _, failIndex := range failIndices { - failMessages = append(failMessages, data[failIndex]) - } - - return failMessages, nil -} - -func (h *MapBatchHandler) ConvertToMaps(v interface{}) ([]map[string]interface{}, error) { - modelMaps := make([]map[string]interface{}, 0) - switch reflect.TypeOf(v).Kind() { - case reflect.Slice: - models := reflect.Indirect(reflect.ValueOf(v)) - for i := 0; i < models.Len(); i++ { - model, errToMap := h.StructToMap(models.Index(i).Interface()) - if errToMap == nil { - modelMaps = append(modelMaps, model) - } - } - } - return modelMaps, nil -} - -func (h *MapBatchHandler) StructToMap(bytes interface{}) (map[string]interface{}, error) { - maps := make(map[string]interface{}) - if bytes != nil { - b, ok := bytes.([]byte) - if ok { - s := string(b) - body := make(map[string]interface{}) - er1 := json.NewDecoder(strings.NewReader(s)).Decode(&body) - if er1 != nil { - return maps, er1 - } - bodyStruct := reflect.New(h.modelType).Interface() - err2 := json.NewDecoder(strings.NewReader(s)).Decode(&bodyStruct) - if err2 != nil { - return maps, err2 - } - for keyJsonName, _ := range body { - if tag, ok := h.mapJsonIndex[keyJsonName]; ok { - if tag.Index >= 0 { - v, _, errv := GetValue(bodyStruct, tag.Index) - if errv == nil { - if tag.Bson != "" { - maps[tag.Bson] = v - } else { - maps[tag.Json] = v - } - } - } - } - } - return maps, nil - } else { - return maps, fmt.Errorf("must is byte") - } - } - return maps, fmt.Errorf("ERROR StructToMap with value Nil") -} - -func GetValue(model interface{}, index int) (interface{}, string, error) { - valueObject := reflect.Indirect(reflect.ValueOf(model)) - return valueObject.Field(index).Interface(), valueObject.Type().Field(index).Name, nil -} - -func BuildMapField(modelType reflect.Type, tagTypes []string, tagType string) map[string]TagName { - model := reflect.New(modelType).Interface() - val := reflect.Indirect(reflect.ValueOf(model)) - m := make(map[string]TagName) - for i := 0; i < val.Type().NumField(); i++ { - field := val.Type().Field(i) - tagName := TagName{Index: i} - keyTag := "" - for _, tagItem := range tagTypes { - tagStr, ok := field.Tag.Lookup(tagItem) - if ok { - keyOfTagNameItem := strings.Split(tagStr, ",")[0] - if tagItem == "bson" { - tagName.Bson = keyOfTagNameItem - } else if tagItem == "json" { - tagName.Json = keyOfTagNameItem - } - if tagItem == tagType { - keyTag = keyOfTagNameItem - } - } - } - if keyTag != "" { - m[keyTag] = tagName - } else { - m[field.Name] = tagName - } - } - return m -} - -func (h *MapBatchHandler) initModels() interface{} { - return reflect.New(h.modelsType).Interface() -} diff --git a/maps_writer.go b/maps_writer.go deleted file mode 100644 index 1b5447f..0000000 --- a/maps_writer.go +++ /dev/null @@ -1,7 +0,0 @@ -package mq - -import "context" - -type MapsWriter interface { - WriteBatch(ctx context.Context, models []map[string]interface{}) ([]int, []int, error) // Return: Success indices, Fail indices, Error -} diff --git a/message_queue_writer.go b/message_queue_writer.go deleted file mode 100644 index dc34934..0000000 --- a/message_queue_writer.go +++ /dev/null @@ -1,53 +0,0 @@ -package mq - -import ( - "context" - "encoding/json" -) - -type MessageQueueWriter struct { - send func(ctx context.Context, data []byte, attributes map[string]string) (string, error) -} - -func NewMessageQueueWriter(send func(context.Context, []byte, map[string]string) (string, error)) *MessageQueueWriter { - return &MessageQueueWriter{send: send} -} - -func (w *MessageQueueWriter) Write(ctx context.Context, model interface{}) error { - if model == nil { - return nil - } - data, er1 := Marshal(model) - if er1 != nil { - return er1 - } - msg := GetMessageFromContext(ctx) - if msg != nil && len(msg.Attributes) > 0 { - _, er2 := w.send(ctx, data, msg.Attributes) - return er2 - } else { - _, er2 := w.send(ctx, data, nil) - return er2 - } -} -func Marshal(v interface{}) ([]byte, error) { - b, ok1 := v.([]byte) - if ok1 { - return b, nil - } - s, ok2 := v.(string) - if ok2 { - return []byte(s), nil - } - return json.Marshal(v) -} -func GetMessageFromContext(ctx context.Context) *Message { - msg := ctx.Value("message") - if msg != nil { - k, ok := msg.(*Message) - if ok { - return k - } - } - return nil -} diff --git a/retry.go b/retry.go index b7fe6f3..16e1b50 100644 --- a/retry.go +++ b/retry.go @@ -1,35 +1,34 @@ package mq import ( - "context" - "fmt" "reflect" "strconv" "time" ) type RetryConfig struct { - Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"` - Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"` - Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"` - Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"` - Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"` - Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"` - Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"` - Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"` - Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"` - Retry10 int64 `mapstructure:"10" json:"retry10,omitempty" gorm:"column:retry10" bson:"retry10,omitempty" dynamodbav:"retry10,omitempty" firestore:"retry10,omitempty"` - Retry11 int64 `mapstructure:"11" json:"retry11,omitempty" gorm:"column:retry11" bson:"retry11,omitempty" dynamodbav:"retry11,omitempty" firestore:"retry11,omitempty"` - Retry12 int64 `mapstructure:"12" json:"retry12,omitempty" gorm:"column:retry12" bson:"retry12,omitempty" dynamodbav:"retry12,omitempty" firestore:"retry12,omitempty"` - Retry13 int64 `mapstructure:"13" json:"retry13,omitempty" gorm:"column:retry13" bson:"retry13,omitempty" dynamodbav:"retry13,omitempty" firestore:"retry13,omitempty"` - Retry14 int64 `mapstructure:"14" json:"retry14,omitempty" gorm:"column:retry14" bson:"retry14,omitempty" dynamodbav:"retry14,omitempty" firestore:"retry14,omitempty"` - Retry15 int64 `mapstructure:"15" json:"retry15,omitempty" gorm:"column:retry15" bson:"retry15,omitempty" dynamodbav:"retry15,omitempty" firestore:"retry15,omitempty"` - Retry16 int64 `mapstructure:"16" json:"retry16,omitempty" gorm:"column:retry16" bson:"retry16,omitempty" dynamodbav:"retry16,omitempty" firestore:"retry16,omitempty"` - Retry17 int64 `mapstructure:"17" json:"retry17,omitempty" gorm:"column:retry17" bson:"retry17,omitempty" dynamodbav:"retry17,omitempty" firestore:"retry17,omitempty"` - Retry18 int64 `mapstructure:"18" json:"retry18,omitempty" gorm:"column:retry18" bson:"retry18,omitempty" dynamodbav:"retry18,omitempty" firestore:"retry18,omitempty"` - Retry19 int64 `mapstructure:"19" json:"retry19,omitempty" gorm:"column:retry19" bson:"retry19,omitempty" dynamodbav:"retry19,omitempty" firestore:"retry19,omitempty"` - Retry20 int64 `mapstructure:"20" json:"retry20,omitempty" gorm:"column:retry20" bson:"retry20,omitempty" dynamodbav:"retry20,omitempty" firestore:"retry20,omitempty"` + Retry1 int64 `yaml:"1" mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"` + Retry2 int64 `yaml:"2" mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"` + Retry3 int64 `yaml:"3" mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"` + Retry4 int64 `yaml:"4" mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"` + Retry5 int64 `yaml:"5" mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"` + Retry6 int64 `yaml:"6" mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"` + Retry7 int64 `yaml:"7" mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"` + Retry8 int64 `yaml:"8" mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"` + Retry9 int64 `yaml:"9" mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"` + Retry10 int64 `yaml:"10" mapstructure:"10" json:"retry10,omitempty" gorm:"column:retry10" bson:"retry10,omitempty" dynamodbav:"retry10,omitempty" firestore:"retry10,omitempty"` + Retry11 int64 `yaml:"11" mapstructure:"11" json:"retry11,omitempty" gorm:"column:retry11" bson:"retry11,omitempty" dynamodbav:"retry11,omitempty" firestore:"retry11,omitempty"` + Retry12 int64 `yaml:"12" mapstructure:"12" json:"retry12,omitempty" gorm:"column:retry12" bson:"retry12,omitempty" dynamodbav:"retry12,omitempty" firestore:"retry12,omitempty"` + Retry13 int64 `yaml:"13" mapstructure:"13" json:"retry13,omitempty" gorm:"column:retry13" bson:"retry13,omitempty" dynamodbav:"retry13,omitempty" firestore:"retry13,omitempty"` + Retry14 int64 `yaml:"14" mapstructure:"14" json:"retry14,omitempty" gorm:"column:retry14" bson:"retry14,omitempty" dynamodbav:"retry14,omitempty" firestore:"retry14,omitempty"` + Retry15 int64 `yaml:"15" mapstructure:"15" json:"retry15,omitempty" gorm:"column:retry15" bson:"retry15,omitempty" dynamodbav:"retry15,omitempty" firestore:"retry15,omitempty"` + Retry16 int64 `yaml:"16" mapstructure:"16" json:"retry16,omitempty" gorm:"column:retry16" bson:"retry16,omitempty" dynamodbav:"retry16,omitempty" firestore:"retry16,omitempty"` + Retry17 int64 `yaml:"17" mapstructure:"17" json:"retry17,omitempty" gorm:"column:retry17" bson:"retry17,omitempty" dynamodbav:"retry17,omitempty" firestore:"retry17,omitempty"` + Retry18 int64 `yaml:"18" mapstructure:"18" json:"retry18,omitempty" gorm:"column:retry18" bson:"retry18,omitempty" dynamodbav:"retry18,omitempty" firestore:"retry18,omitempty"` + Retry19 int64 `yaml:"19" mapstructure:"19" json:"retry19,omitempty" gorm:"column:retry19" bson:"retry19,omitempty" dynamodbav:"retry19,omitempty" firestore:"retry19,omitempty"` + Retry20 int64 `yaml:"20" mapstructure:"20" json:"retry20,omitempty" gorm:"column:retry20" bson:"retry20,omitempty" dynamodbav:"retry20,omitempty" firestore:"retry20,omitempty"` } + func DurationsFromValue(v interface{}, prefix string, max int) []time.Duration { arr := MakeArray(v, prefix, max) return MakeDurations(arr) @@ -56,22 +55,3 @@ func MakeArray(v interface{}, prefix string, max int) []int64 { } return ar } - -// Retry Copy this code from https://stackoverflow.com/questions/47606761/repeat-code-if-an-error-occured -func Retry(ctx context.Context, sleeps []time.Duration, f func() error, log func(context.Context, string)) (err error) { - attempts := len(sleeps) - for i := 0; ; i++ { - err = f() - if err == nil { - return - } - if i >= (attempts - 1) { - break - } - if log != nil { - log(ctx, fmt.Sprintf("Retrying %d of %d after error: %s", i+1, attempts, err.Error())) - } - time.Sleep(sleeps[i]) - } - return fmt.Errorf("after %d attempts, last error: %s", attempts, err) -} diff --git a/retry_handler.go b/retry_handler.go new file mode 100644 index 0000000..5c5dac2 --- /dev/null +++ b/retry_handler.go @@ -0,0 +1,190 @@ +package mq + +import ( + "context" + "encoding/json" + "fmt" + "strconv" +) + +type RetryHandlerConfig struct { + RetryCountName string `yaml:"retry_count_name" mapstructure:"retry_count_name" json:"retryCountName,omitempty" gorm:"column:retrycountname" bson:"retryCountName,omitempty" dynamodbav:"retryCountName,omitempty" firestore:"retryCountName,omitempty"` + LimitRetry int `yaml:"limit_retry" mapstructure:"limit_retry" json:"limitRetry,omitempty" gorm:"column:limitretry" bson:"limitRetry,omitempty" dynamodbav:"limitRetry,omitempty" firestore:"limitRetry,omitempty"` + Goroutines bool `yaml:"goroutines" mapstructure:"goroutines" json:"goroutines,omitempty" gorm:"column:goroutines" bson:"goroutines,omitempty" dynamodbav:"goroutines,omitempty" firestore:"goroutines,omitempty"` + Key string `yaml:"key" mapstructure:"key" json:"key,omitempty" gorm:"column:key" bson:"key,omitempty" dynamodbav:"key,omitempty" firestore:"key,omitempty"` +} + +type RetryHandler[T any] struct { + Unmarshal func(data []byte, v any) error + Write func(context.Context, *T) error + Validate func(context.Context, *T) ([]ErrorMessage, error) + Reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string) + HandleError func(context.Context, []byte, map[string]string) + Retry func(context.Context, []byte, map[string]string) error + LimitRetry int + RetryCountName string + Goroutines bool + LogError func(context.Context, string) + LogInfo func(context.Context, string) + Key string +} + +func NewRetryHandlerByConfig[T any]( + c RetryHandlerConfig, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *RetryHandler[T] { + return NewRetryHandler[T](nil, write, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) +} +func NewRetryHandlerByConfigAndUnmarshal[T any]( + c RetryHandlerConfig, + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + logs ...func(context.Context, string)) *RetryHandler[T] { + return NewRetryHandler[T](unmarshal, write, validate, reject, handleError, retry, c.LimitRetry, c.RetryCountName, c.Goroutines, c.Key, logs...) +} +func NewRetryHandler[T any]( + unmarshal func(data []byte, v any) error, + write func(context.Context, *T) error, + validate func(context.Context, *T) ([]ErrorMessage, error), + reject func(context.Context, *T, []ErrorMessage, []byte, map[string]string), + handleError func(context.Context, []byte, map[string]string), + retry func(context.Context, []byte, map[string]string) error, + limitRetry int, + retryCountName string, + goroutines bool, key string, logs ...func(context.Context, string)) *RetryHandler[T] { + if len(retryCountName) == 0 { + retryCountName = "retry" + } + if unmarshal == nil { + unmarshal = json.Unmarshal + } + c := &RetryHandler[T]{ + Unmarshal: unmarshal, + Write: write, + Validate: validate, + Reject: reject, + HandleError: handleError, + Retry: retry, + LimitRetry: limitRetry, + RetryCountName: retryCountName, + Goroutines: goroutines, + Key: key, + } + if len(logs) >= 1 { + c.LogError = logs[0] + } + if len(logs) >= 2 { + c.LogInfo = logs[1] + } + return c +} + +func (c *RetryHandler[T]) Handle(ctx context.Context, data []byte, attrs map[string]string) { + if data == nil { + return + } + if c.LogInfo != nil { + key := GetString(ctx, c.Key) + if len(key) > 0 { + c.LogInfo(ctx, fmt.Sprintf("Received message with key %s : %s", key, GetLog(data, attrs))) + } else { + c.LogInfo(ctx, fmt.Sprintf("Received message: %s", GetLog(data, attrs))) + } + } + var v T + er1 := c.Unmarshal(data, &v) + if er1 != nil { + if c.LogError != nil { + c.LogError(ctx, fmt.Sprintf("cannot unmarshal item: %s. Error: %s", GetLog(data, attrs), er1.Error())) + } + return + } + if c.Validate != nil { + errs, err := c.Validate(ctx, &v) + if err != nil { + if c.LogError != nil { + c.LogError(ctx, "Error when validate data: "+err.Error()) + } + return + } + if len(errs) > 0 { + c.Reject(ctx, &v, errs, data, attrs) + } + } + if c.Goroutines { + go Write[*T](ctx, c.Write, &v, data, attrs, c.HandleError, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) + } else { + Write[*T](ctx, c.Write, &v, data, attrs, c.HandleError, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) + } +} + +func Write[T any](ctx context.Context, write func(context.Context, T) error, item T, data []byte, attrs map[string]string, handleError func(context.Context, []byte, map[string]string), retry func(context.Context, []byte, map[string]string) error, limitRetry int, retryCountName string, logs ...func(context.Context, string)) { + var logError func(context.Context, string) + var logInfo func(context.Context, string) + if len(logs) > 0 { + logError = logs[0] + } + if len(logs) > 1 { + logInfo = logs[1] + } + er3 := write(ctx, item) + if er3 == nil { + return + } + if logError != nil { + logError(ctx, fmt.Sprintf("Fail to write %s . Error: %s", GetLog(data, attrs), er3.Error())) + } + + if retry == nil { + if handleError != nil { + handleError(ctx, data, attrs) + } + return + } + retryCount := 0 + if attrs == nil { + attrs = make(map[string]string) + } else { + var er4 error + retryCount, er4 = strconv.Atoi(attrs[retryCountName]) + if er4 != nil { + retryCount = 0 + } + } + retryCount++ + if retryCount > limitRetry { + if logInfo != nil { + logInfo(ctx, fmt.Sprintf("Retry: %d . Retry limitation: %d . Message: %s.", retryCount-1, limitRetry, GetLog(data, attrs))) + } + if handleError != nil { + handleError(ctx, data, attrs) + } + } else { + if logInfo != nil { + logInfo(ctx, fmt.Sprintf("Retry: %d . Message: %s", retryCount-1, GetLog(data, attrs))) + } + attrs[retryCountName] = strconv.Itoa(retryCount) + er2 := retry(ctx, data, attrs) + if er2 != nil { + if logError != nil { + logError(ctx, fmt.Sprintf("Cannot retry %s . Error: %s", GetLog(data, attrs), er2.Error())) + } + } + } +} + +func GetLog(data []byte, attrs map[string]string) string { + if len(attrs) == 0 { + return fmt.Sprintf("%s %+v", data, attrs) + } else { + return fmt.Sprintf("%s", data) + } +} diff --git a/retry_sender.go b/retry_sender.go deleted file mode 100644 index 80f420b..0000000 --- a/retry_sender.go +++ /dev/null @@ -1,83 +0,0 @@ -package mq - -import ( - "context" - "fmt" - "time" -) - -type RetrySender struct { - send func(ctx context.Context, data []byte, attributes map[string]string) (string, error) - Retries []time.Duration - Log func(context.Context, string) - Error func(ctx context.Context, data []byte, attrs map[string]string) error - Goroutines bool -} - -func NewSenderByConfig(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig, options... func(context.Context, []byte, map[string]string) error) *RetrySender { - var handlerError func(context.Context, []byte, map[string]string) error - if len(options) > 0 { - handlerError = options[0] - } - if c == nil { - return &RetrySender{send: send, Log: log, Goroutines: goroutines, Error: handlerError} - } else { - retries := DurationsFromValue(*c, "Retry", 20) - if len(retries) == 0 { - return &RetrySender{send: send, Log: log, Goroutines: goroutines} - } - return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError} - } -} -func NewSender(send func(context.Context, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries []time.Duration, options... func(context.Context, []byte, map[string]string) error) *RetrySender { - var handlerError func(context.Context, []byte, map[string]string) error - if len(options) > 0 { - handlerError = options[0] - } - return &RetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError} -} -func (c *RetrySender) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) { - if !c.Goroutines { - return Send(ctx, c.send, data, attributes, c.Log, c.Error, c.Retries...) - } else { - go Send(ctx, c.send, data, attributes, c.Log, c.Error, c.Retries...) - return "", nil - } -} -func Send(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, log func(context.Context, string), handlerError func(context.Context, []byte, map[string]string) error, retries ...time.Duration) (string, error) { - l := len(retries) - if l == 0 { - r, err := send(ctx, data, attributes) - if err != nil && handlerError != nil { - handlerError(ctx, data, attributes) - } - return r, err - } else { - r, err := SendWithRetries(ctx, send, data, attributes, retries, log) - if err != nil && handlerError != nil { - handlerError(ctx, data, attributes) - } - return r, err - } -} - -func SendWithRetries(ctx context.Context, send func(context.Context, []byte, map[string]string) (string, error), data []byte, attributes map[string]string, retries []time.Duration, log func(context.Context, string)) (string, error) { - id, er1 := send(ctx, data, attributes) - if er1 == nil { - return id, er1 - } - i := 0 - err := Retry(ctx, retries, func() (err error) { - i = i + 1 - id2, er2 := send(ctx, data, attributes) - id = id2 - if er2 == nil && log != nil { - log(ctx, fmt.Sprintf("Send successfully after %d retries %s", i, data)) - } - return er2 - }, log) - if err != nil && log != nil { - log(ctx, fmt.Sprintf("Failed to send after %d retries: %s. Error: %s.", len(retries), data, err.Error())) - } - return id, err -} diff --git a/retry_service.go b/retry_service.go deleted file mode 100644 index a16578f..0000000 --- a/retry_service.go +++ /dev/null @@ -1,32 +0,0 @@ -package mq - -import "context" - -type RetryService struct { - send func(ctx context.Context, data []byte, attributes map[string]string) (string, error) - LogError func(context.Context, string) - LogInfo func(context.Context, string) -} - -func NewRetryService(send func(context.Context, []byte, map[string]string) (string, error), logs ...func(context.Context, string)) *RetryService { - s := &RetryService{send: send} - if len(logs) >= 1 { - s.LogError = logs[0] - } - if len(logs) >= 2 { - s.LogInfo = logs[1] - } - return s -} - -func (s *RetryService) Retry(ctx context.Context, data []byte, header map[string]string) error { - _, err := s.send(ctx, data, header) - if err != nil { - if s.LogError != nil { - s.LogError(ctx, `Retry put to mq error: `+err.Error()) - } - } else if s.LogInfo != nil { - s.LogInfo(ctx, `Retry put to mq success.`) - } - return err -} diff --git a/retry_writer.go b/retry_writer.go deleted file mode 100644 index 670a447..0000000 --- a/retry_writer.go +++ /dev/null @@ -1,83 +0,0 @@ -package mq - -import ( - "context" - "fmt" - "time" -) - -type RetryWriter struct { - write func(ctx context.Context, model interface{}) error - Retries []time.Duration - Log func(context.Context, string) - WriteError func(ctx context.Context, model interface{}) error - Goroutines bool -} - -func NewWriterByConfig(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), c *RetryConfig, options ...func(context.Context, interface{}) error) *RetryWriter { - var writeError func(context.Context, interface{}) error - if len(options) > 0 { - writeError = options[0] - } - if c == nil { - return &RetryWriter{write: write, Log: log, Goroutines: goroutines} - } else { - retries := DurationsFromValue(*c, "Retry", 20) - if len(retries) == 0 { - return &RetryWriter{write: write, Log: log, Goroutines: goroutines, WriteError: writeError} - } - return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines, WriteError: writeError} - } -} -func NewWriter(write func(context.Context, interface{}) error, goroutines bool, log func(context.Context, string), retries []time.Duration, options ...func(context.Context, interface{}) error) *RetryWriter { - var writeError func(context.Context, interface{}) error - if len(options) > 0 { - writeError = options[0] - } - return &RetryWriter{write: write, Log: log, Retries: retries, Goroutines: goroutines, WriteError: writeError} -} -func (c *RetryWriter) Write(ctx context.Context, model interface{}) error { - if !c.Goroutines { - return WriteTo(ctx, c.write, model, c.Log, c.WriteError, c.Retries...) - } else { - go WriteTo(ctx, c.write, model, c.Log, c.WriteError, c.Retries...) - return nil - } -} -func WriteTo(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, log func(context.Context, string), writeError func(context.Context, interface{}) error, retries ...time.Duration) error { - l := len(retries) - if l == 0 { - err := write(ctx, model) - if err != nil && writeError != nil { - writeError(ctx, model) - } - return err - } else { - err := WriteWithRetries(ctx, write, model, retries, log) - if err != nil && writeError != nil { - writeError(ctx, model) - } - return err - } -} - -func WriteWithRetries(ctx context.Context, write func(context.Context, interface{}) error, model interface{}, retries []time.Duration, log func(context.Context, string)) error { - er1 := write(ctx, model) - if er1 == nil { - return er1 - } - i := 0 - err := Retry(ctx, retries, func() (err error) { - i = i + 1 - er2 := write(ctx, model) - - if er2 == nil && log != nil { - log(ctx, fmt.Sprintf("Write successfully after %d retries %s", i, model)) - } - return er2 - }, log) - if err != nil && log != nil { - log(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), model, err.Error())) - } - return err -} diff --git a/simple_retry_sender.go b/simple_retry_sender.go deleted file mode 100644 index cc4c7f8..0000000 --- a/simple_retry_sender.go +++ /dev/null @@ -1,83 +0,0 @@ -package mq - -import ( - "context" - "fmt" - "time" -) - -type SimpleRetrySender struct { - send func(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) - Retries []time.Duration - Log func(context.Context, string) - Error func(ctx context.Context, data []byte, attrs map[string]string) error - Goroutines bool -} - -func NewSimpleSenderByConfig(send func(context.Context, string, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), c *RetryConfig, options... func(context.Context, []byte, map[string]string) error) *SimpleRetrySender { - var handlerError func(context.Context, []byte, map[string]string) error - if len(options) > 0 { - handlerError = options[0] - } - if c == nil { - return &SimpleRetrySender{send: send, Log: log, Goroutines: goroutines, Error: handlerError} - } else { - retries := DurationsFromValue(*c, "Retry", 20) - if len(retries) == 0 { - return &SimpleRetrySender{send: send, Log: log, Goroutines: goroutines} - } - return &SimpleRetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError} - } -} -func NewSimpleSender(send func(context.Context, string, []byte, map[string]string) (string, error), goroutines bool, log func(context.Context, string), retries []time.Duration, options... func(context.Context, []byte, map[string]string) error) *SimpleRetrySender { - var handlerError func(context.Context, []byte, map[string]string) error - if len(options) > 0 { - handlerError = options[0] - } - return &SimpleRetrySender{send: send, Log: log, Retries: retries, Goroutines: goroutines, Error: handlerError} -} -func (c *SimpleRetrySender) Send(ctx context.Context, topic string, data []byte, attributes map[string]string) (string, error) { - if !c.Goroutines { - return SendTo(ctx, c.send, topic, data, attributes, c.Log, c.Error, c.Retries...) - } else { - go SendTo(ctx, c.send, topic, data, attributes, c.Log, c.Error, c.Retries...) - return "", nil - } -} -func SendTo(ctx context.Context, send func(context.Context, string, []byte, map[string]string) (string, error), to string, data []byte, attributes map[string]string, log func(context.Context, string), handlerError func(context.Context, []byte, map[string]string) error, retries ...time.Duration) (string, error) { - l := len(retries) - if l == 0 { - r, err := send(ctx, to, data, attributes) - if err != nil && handlerError != nil { - handlerError(ctx, data, attributes) - } - return r, err - } else { - r, err := SendToWithRetries(ctx, send, to, data, attributes, retries, log) - if err != nil && handlerError != nil { - handlerError(ctx, data, attributes) - } - return r, err - } -} - -func SendToWithRetries(ctx context.Context, send func(context.Context, string, []byte, map[string]string) (string, error), to string, data []byte, attributes map[string]string, retries []time.Duration, log func(context.Context, string)) (string, error) { - id, er1 := send(ctx, to, data, attributes) - if er1 == nil { - return id, er1 - } - i := 0 - err := Retry(ctx, retries, func() (err error) { - i = i + 1 - id2, er2 := send(ctx, to, data, attributes) - id = id2 - if er2 == nil && log != nil { - log(ctx, fmt.Sprintf("Send successfully after %d retries %s", i, data)) - } - return er2 - }, log) - if err != nil && log != nil { - log(ctx, fmt.Sprintf("Failed to send after %d retries: %s. Error: %s.", len(retries), data, err.Error())) - } - return id, err -} diff --git a/subscription.go b/subscription.go deleted file mode 100644 index ab14872..0000000 --- a/subscription.go +++ /dev/null @@ -1,50 +0,0 @@ -package mq - -import ( - "context" - "fmt" -) - -type Subscription struct { - receive func(ctx context.Context, message *Message) - Validate func(ctx context.Context, message *Message) error - LogError func(context.Context, string) - LogInfo func(context.Context, string) -} - -func NewSubscription(receive func(context.Context, *Message), validate func(context.Context, *Message) error, logs ...func(context.Context, string)) *Subscription { - b := Subscription{receive: receive, Validate: validate} - if len(logs) >= 1 { - b.LogError = logs[0] - } - if len(logs) >= 2 { - b.LogInfo = logs[1] - } - return &b -} - -func (c *Subscription) Receive(ctx context.Context, message *Message, err error) error { - if err != nil { - if c.LogError != nil { - c.LogError(ctx, fmt.Sprintf("Processing message error: %s", err.Error())) - } - return err - } else if message == nil { - return nil - } - if c.LogInfo != nil { - c.LogInfo(ctx, fmt.Sprintf("Received message: %s", message.Data)) - } - if c.Validate != nil { - er2 := c.Validate(ctx, message) - if er2 != nil { - if c.LogError != nil { - x := CreateLog(message.Data, message.Attributes, message.Id, message.Timestamp) - c.LogError(ctx, fmt.Sprintf("Message is invalid: %s . Error: %s", x, er2.Error())) - } - return er2 - } - } - c.receive(ctx, message) - return nil -} diff --git a/validator.go b/validator.go deleted file mode 100644 index 00a8167..0000000 --- a/validator.go +++ /dev/null @@ -1,35 +0,0 @@ -package mq - -import ( - "context" - "encoding/json" - "fmt" - "reflect" -) - -type Validator struct { - modelType reflect.Type - Check func(ctx context.Context, model interface{}) error - Unmarshal func([]byte, interface{}) error -} - -func NewValidator(modelType reflect.Type, check func(context.Context, interface{}) error, opts...func([]byte, interface{}) error) *Validator { - var unmarshal func([]byte, interface{}) error - if len(opts) > 0 && opts[0] != nil { - unmarshal = opts[0] - } else { - unmarshal = json.Unmarshal - } - v := &Validator{modelType: modelType, Check: check, Unmarshal: unmarshal} - return v -} - -func (v *Validator) Validate(ctx context.Context, message *Message) error { - item := InitModel(v.modelType) - err := v.Unmarshal(message.Data, item) - if err != nil { - return fmt.Errorf(`cannot unmarshal item: %s. Error: %s`, message.Data, err.Error()) - } - message.Value = item - return v.Check(ctx, message.Value) -} diff --git a/validator/custom.go b/validator/custom.go index d712a0f..a9a6856 100644 --- a/validator/custom.go +++ b/validator/custom.go @@ -1,6 +1,12 @@ package validator -import "github.com/go-playground/validator/v10" +import ( + "github.com/go-playground/validator/v10" + "strings" + "time" + + s "github.com/core-go/mq" +) type CustomValidate struct { Fn validator.Func @@ -13,6 +19,27 @@ var PatternMap = map[string]string{ "code": "^\\w*\\d*$", } +var translations = map[string]string{ + "email": "{0} must be a valid email address", + "url": "{0} must be a valid URL", + "uri": "{0} must be a valid URI", + "fax": "{0} must be a valid fax number", + "phone": "{0} must be a valid phone number", + "ip": "{0} must be a valid IP address", + "ipv4": "{0} must be a valid IPv4 address", + "ipv6": "{0} must be a valid IPv6 address", + "digit": "{0} must contain only digits", + "pin": "{0} must contain only digits", + "abc": "{0} must contain only letters", + "id": "{0} must be a valid ID", + "code": "{0} must be a valid code", + "country_code": "{0} must be a valid country code", + "username": "{0} must be a valid username", + "regex": "{0} must match the provided regex pattern", + "after_now": "{0} must be after now", + "now_or_after": "{0} must be now or after", +} + func GetCustomValidateList() (list []CustomValidate) { list = append(list, CustomValidate{Fn: CheckEmail, Tag: "email"}) list = append(list, CustomValidate{Fn: CheckUrl, Tag: "url"}) @@ -29,6 +56,8 @@ func GetCustomValidateList() (list []CustomValidate) { list = append(list, CustomValidate{Fn: CheckCountryCode, Tag: "country_code"}) list = append(list, CustomValidate{Fn: CheckUsername, Tag: "username"}) list = append(list, CustomValidate{Fn: CheckPattern, Tag: "regex"}) + list = append(list, CustomValidate{Fn: CheckAfterNow, Tag: "after_now"}) + list = append(list, CustomValidate{Fn: CheckNowOrAfter, Tag: "now_or_after"}) return } func CheckString(fl validator.FieldLevel, fn func(string) bool) bool { @@ -88,3 +117,75 @@ func CheckPattern(fl validator.FieldLevel) bool { panic("invalid pattern") } } + +// CheckAfterNow validates if the given time is greater than the current time +func CheckAfterNow(fl validator.FieldLevel) bool { + var inputTime time.Time + + switch t := fl.Field().Interface().(type) { + case string: + parsedTime, err := time.Parse(time.RFC3339, t) + if err != nil { + return false + } + inputTime = parsedTime + case time.Time: + inputTime = t + case *time.Time: + inputTime = *t + default: + return false + } + + return inputTime.UTC().After(time.Now().UTC()) +} + +// CheckNowOrAfter validates if the given time is greater or equal than the current time +func CheckNowOrAfter(fl validator.FieldLevel) bool { + var inputTime time.Time + + switch t := fl.Field().Interface().(type) { + case string: + parsedTime, err := time.Parse(time.RFC3339, t) + if err != nil { + return false + } + inputTime = parsedTime + case time.Time: + inputTime = t + case *time.Time: + inputTime = *t + default: + return false + } + + return inputTime.UTC().After(time.Now().UTC()) || inputTime.UTC().Equal(time.Now().UTC()) +} +func RemoveRequiredError(errors []s.ErrorMessage) []s.ErrorMessage { + if errors == nil || len(errors) == 0 { + return errors + } + errs := make([]s.ErrorMessage, 0) + for _, s := range errors { + if s.Code != "required" && !strings.HasPrefix(s.Code, "minlength") { + errs = append(errs, s) + } else if strings.Index(s.Field, ".") >= 0 { + errs = append(errs, s) + } + } + return errs +} +func FormatErrorField(s string) string { + splitField := strings.Split(s, ".") + length := len(splitField) + if length == 1 { + return lcFirstChar(splitField[0]) + } else if length > 1 { + var tmp []string + for _, v := range splitField[1:] { + tmp = append(tmp, lcFirstChar(v)) + } + return strings.Join(tmp, ".") + } + return s +} diff --git a/validator/default_validator.go b/validator/default_validator.go deleted file mode 100644 index 5e0f9fe..0000000 --- a/validator/default_validator.go +++ /dev/null @@ -1,129 +0,0 @@ -package validator - -import ( - "context" - "fmt" - "github.com/go-playground/validator/v10" - "reflect" - "strings" - "unicode" -) - -const ( - method = "method" - patch = "patch" -) - -type DefaultValidator struct { - validate *validator.Validate - CustomValidateList []CustomValidate -} - -func NewDefaultValidator() *DefaultValidator { - list := GetCustomValidateList() - return &DefaultValidator{CustomValidateList: list} -} - -func (p *DefaultValidator) Validate(ctx context.Context, model interface{}) ([]ErrorMessage, error) { - errors := make([]ErrorMessage, 0) - if p.validate == nil { - validate := validator.New() - validate = p.RegisterCustomValidate(validate) - p.validate = validate - } - err := p.validate.Struct(model) - - if err != nil { - errors, err = MapErrors(err) - } - v := ctx.Value(method) - if v != nil { - v2, ok := v.(string) - if ok { - if v2 == patch { - errs := RemoveRequiredError(errors) - return errs, nil - } - } - } - return errors, err -} - -var alias = map[string]string{ - "max": "maxlength", - "min": "minlength", - "gtefield": "minfield", - "ltefield": "maxfield", -} - -func MapErrors(err error) (list []ErrorMessage, err1 error) { - if _, ok := err.(*validator.InvalidValidationError); ok { - err1 = fmt.Errorf("InvalidValidationError") - return - } - for _, err := range err.(validator.ValidationErrors) { - code := formatCodeMsg(err) - list = append(list, ErrorMessage{Field: formatErrField(err.Namespace()), Code: code}) - } - return -} - -func formatCodeMsg(err validator.FieldError) string { - var code string - if aliasTag, ok := alias[err.Tag()]; ok { - if (err.Tag() == "max" || err.Tag() == "min") && err.Kind() != reflect.String { - code = err.Tag() - } else { - code = aliasTag - } - } else { - code = err.Tag() - } - if err.Param() != "" { - code += ":" + lcFirstChar(err.Param()) - } - return code -} - -func formatErrField(s string) string { - splitField := strings.Split(s, ".") - length := len(splitField) - if length == 1 { - return lcFirstChar(splitField[0]) - } else if length > 1 { - var tmp []string - for _, v := range splitField[1:] { - tmp = append(tmp, lcFirstChar(v)) - } - return strings.Join(tmp, ".") - } - return s -} -func lcFirstChar(s string) string { - if len(s) > 0 { - runes := []rune(s) - runes[0] = unicode.ToLower(runes[0]) - return string(runes) - } - return s -} -func (p *DefaultValidator) RegisterCustomValidate(validate *validator.Validate) *validator.Validate { - for _, v := range p.CustomValidateList { - validate.RegisterValidation(v.Tag, v.Fn) - } - return validate -} -func RemoveRequiredError(errors []ErrorMessage) []ErrorMessage { - if errors == nil || len(errors) == 0 { - return errors - } - errs := make([]ErrorMessage, 0) - for _, s := range errors { - if s.Code != "required" && !strings.HasPrefix(s.Code, "minlength") { - errs = append(errs, s) - } else if strings.Index(s.Field, ".") >= 0 { - errs = append(errs, s) - } - } - return errs -} diff --git a/validator/error_checker.go b/validator/error_checker.go deleted file mode 100644 index d968de2..0000000 --- a/validator/error_checker.go +++ /dev/null @@ -1,31 +0,0 @@ -package validator - -import ( - "context" - "fmt" -) - -type ErrorChecker struct { - validate func(ctx context.Context, model interface{}) ([]ErrorMessage, error) -} - -func NewErrorChecker(validate func(context.Context, interface{}) ([]ErrorMessage, error)) *ErrorChecker { - return &ErrorChecker{validate: validate} -} - -func (v *ErrorChecker) Check(ctx context.Context, model interface{}) error { - errors, err := v.validate(ctx, model) - if err != nil { - return err - } - if errors != nil && len(errors) > 0 { - m := fmt.Sprintf("%s", errors) - return fmt.Errorf(m) - } - return nil -} - -func NewDefaultErrorChecker() *ErrorChecker { - v := NewDefaultValidator() - return NewErrorChecker(v.Validate) -} diff --git a/validator/error_message.go b/validator/error_message.go deleted file mode 100644 index f927600..0000000 --- a/validator/error_message.go +++ /dev/null @@ -1,8 +0,0 @@ -package validator - -type ErrorMessage struct { - Field string `mapstructure:"field" json:"field,omitempty" gorm:"column:field" bson:"field,omitempty" dynamodbav:"field,omitempty" firestore:"field,omitempty"` - Code string `mapstructure:"code" json:"code,omitempty" gorm:"column:code" bson:"code,omitempty" dynamodbav:"code,omitempty" firestore:"code,omitempty"` - Param string `mapstructure:"param" json:"param,omitempty" gorm:"column:param" bson:"param,omitempty" dynamodbav:"param,omitempty" firestore:"param,omitempty"` - Message string `mapstructure:"message" json:"message,omitempty" gorm:"column:message" bson:"message,omitempty" dynamodbav:"message,omitempty" firestore:"message,omitempty"` -} diff --git a/validator/map_validator.go b/validator/map_validator.go deleted file mode 100644 index ca5f110..0000000 --- a/validator/map_validator.go +++ /dev/null @@ -1,7 +0,0 @@ -package validator - -import "context" - -type MapValidator interface { - Validate(ctx context.Context, model map[string]interface{}) ([]ErrorMessage, error) -} diff --git a/validator/translator.go b/validator/translator.go new file mode 100644 index 0000000..a8c91d7 --- /dev/null +++ b/validator/translator.go @@ -0,0 +1,52 @@ +package validator + +import ( + "github.com/go-playground/locales/en_US" + ut "github.com/go-playground/universal-translator" + "github.com/go-playground/validator/v10" + tr "github.com/go-playground/validator/v10/translations/en" + "log" +) + +var trans *ut.Translator + +func RegisterGlobalTranslator(validate *validator.Validate) { + t, err := RegisterTranslatorEn(validate) + if err != nil { + panic(err) + } + trans = &t +} + +func RegisterTranslatorEn(validate *validator.Validate) (ut.Translator, error) { + en := en_US.New() + uni := ut.New(en, en) + transEn, _ := uni.GetTranslator("en_US") + err := tr.RegisterDefaultTranslations(validate, transEn) + if err != nil { + return nil, err + } + return transEn, nil +} + +func AddMessage(v *validator.Validate, trans ut.Translator, tag string, translation string, override bool) error { + return v.RegisterTranslation(tag, trans, RegistrationFunc(tag, translation, override), TranslateFunc) +} +func RegistrationFunc(tag string, translation string, override bool) validator.RegisterTranslationsFunc { + return func(ut ut.Translator) (err error) { + if err = ut.Add(tag, translation, override); err != nil { + return + } + return + } +} + +func TranslateFunc(ut ut.Translator, fe validator.FieldError) string { + t, err := ut.T(fe.Tag(), fe.Field()) + if err != nil { + log.Printf("warning: error translating FieldError: %#v", fe) + return fe.(error).Error() + } + + return t +} diff --git a/validator/validator.go b/validator/validator.go index 4ff9529..2ecf291 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -1,7 +1,155 @@ package validator -import "context" +import ( + "context" + "fmt" + "reflect" + "unicode" -type Validator interface { - Validate(ctx context.Context, model interface{}) ([]ErrorMessage, error) + ut "github.com/go-playground/universal-translator" + "github.com/go-playground/validator/v10" + + s "github.com/core-go/mq" +) + +type Validator[T any] struct { + validate *validator.Validate + Trans *ut.Translator + CustomValidateList []CustomValidate + IgnoreField bool + Map map[string]string +} + +func NewValidator[T any](opts ...bool) (*Validator[T], error) { + return NewValidatorWithMap[T](nil, opts...) +} +func NewValidatorWithMap[T any](mp map[string]string, opts ...bool) (*Validator[T], error) { + register := true + if len(opts) > 0 { + register = opts[0] + } + ignoreField := false + if len(opts) > 1 { + ignoreField = opts[1] + } + uValidate, uTranslator, err := NewDefaultValidator() + if err != nil { + return nil, err + } + list := GetCustomValidateList() + validator := &Validator[T]{Map: mp, validate: uValidate, Trans: &uTranslator, CustomValidateList: list, IgnoreField: ignoreField} + if register { + err2 := validator.RegisterCustomValidate() + if err2 != nil { + return validator, err2 + } + } + return validator, nil +} +func NewDefaultChecker() (*validator.Validate, ut.Translator, error) { + return NewDefaultValidator() +} +func NewDefaultValidator() (*validator.Validate, ut.Translator, error) { + validate := validator.New() + var transl ut.Translator + if trans != nil { + transl = *trans + } else { + list := GetCustomValidateList() + for _, v := range list { + err := validate.RegisterValidation(v.Tag, v.Fn) + if err != nil { + return nil, nil, err + } + } + ptr, err := RegisterTranslatorEn(validate) + if err != nil { + return nil, nil, err + } + transl = ptr + } + return validate, transl, nil +} +func (p *Validator[T]) Validate(ctx context.Context, model T) ([]s.ErrorMessage, error) { + errors := make([]s.ErrorMessage, 0) + err := p.validate.Struct(model) + + if err != nil { + errors, err = p.MapErrors(err) + } + if p.Map != nil { + l := len(errors) + for i := 0; i < l; i++ { + nv, ok := p.Map[errors[i].Code] + if ok { + errors[i].Code = nv + } + } + } + return errors, err +} + +var alias = map[string]string{ + "max": "maxlength", + "min": "minlength", + "gtefield": "minfield", + "ltefield": "maxfield", +} + +func getTagName(err validator.FieldError) string { + var code string + if aliasTag, ok := alias[err.Tag()]; ok { + if (err.Tag() == "max" || err.Tag() == "min") && err.Kind() != reflect.String { + code = err.Tag() + } else { + code = aliasTag + } + } else { + code = err.Tag() + } + if err.Param() != "" { + code += ":" + lcFirstChar(err.Param()) + } + return code +} +func lcFirstChar(s string) string { + if len(s) > 0 { + runes := []rune(s) + runes[0] = unicode.ToLower(runes[0]) + return string(runes) + } + return s +} +func (p *Validator[T]) RegisterCustomValidate() error { + for _, v := range p.CustomValidateList { + err := p.validate.RegisterValidation(v.Tag, v.Fn) + if err != nil { + return err + } + } + if p.Trans != nil && p.validate != nil { + // register default translate + for _, validate := range p.CustomValidateList { + if text, ok := translations[validate.Tag]; ok { + err := AddMessage(p.validate, *p.Trans, validate.Tag, text, true) + if err != nil { + return err + } + } + } + } + return nil +} + +func (p *Validator[T]) MapErrors(err error) (list []s.ErrorMessage, err1 error) { + if _, ok := err.(*validator.InvalidValidationError); ok { + err1 = fmt.Errorf("InvalidValidationError") + return + } + tr := *p.Trans + for _, err := range err.(validator.ValidationErrors) { + code := getTagName(err) + list = append(list, s.ErrorMessage{Field: FormatErrorField(err.Namespace()), Code: code, Message: err.Translate(tr), Param: err.Param()}) + } + return }