Skip to content

Commit

Permalink
push handler error_writer files
Browse files Browse the repository at this point in the history
  • Loading branch information
vanhop993 committed Oct 5, 2021
1 parent 9c02ed7 commit a59976c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
48 changes: 48 additions & 0 deletions error_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package mq

import (
"context"
"encoding/json"
"fmt"
"reflect"
)

func NewErrorWriter(write func(ctx context.Context, model interface{}) error, modelType *reflect.Type, logError ...func(context.Context, string)) *ErrorWriter {
h := &ErrorWriter{Write: write, ModelType: modelType}
if len(logError) >= 1 {
h.LogError = logError[0]
}
return h
}

type ErrorWriter struct {
Write func(ctx context.Context, model interface{}) error
ModelType *reflect.Type
LogError func(context.Context, string)
}

func (w *ErrorWriter) HandleError(ctx context.Context, data []byte, attrs map[string]string) error {
if data == nil {
return nil
}
if w.Write != nil {
if w.ModelType == nil {
return w.Write(ctx, data)
} else {
v := InitModel(*w.ModelType)
err := json.Unmarshal(data, v)
if err != nil {
return err
}
return w.Write(ctx, v)
}
}
if w.LogError != nil {
if attrs == nil || len(attrs) == 0 {
w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s", data))
} else {
w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s %s", data, attrs))
}
}
return nil
}
24 changes: 15 additions & 9 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ type Handler struct {
func NewHandlerByConfig(c HandlerConfig, write func(context.Context, interface{}) error, modelType *reflect.Type, retry func(context.Context, []byte, map[string]string) error, validate func(context.Context, *Message) error, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *Handler {
return NewHandlerWithRetryService(write, modelType, c.LimitRetry, retry, c.RetryCountName, validate, handleError, c.Goroutines, logs...)
}
func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, logs ...func(context.Context, string)) *Handler {
func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *Handler {
if c == nil {
return NewHandlerWithRetries(write, modelType, validate, nil, goroutines, logs...)
return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, logs...)
}
retries := DurationsFromValue(*c, "Retry", 20)
if len(retries) == 0 {
return NewHandlerWithRetries(write, modelType, validate, nil, goroutines, logs...)
return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, logs...)
}
return NewHandlerWithRetries(write, modelType, validate, retries, goroutines, logs...)
return NewHandlerWithRetries(write, modelType, validate, retries, handleError, goroutines, logs...)
}
func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, goroutines bool, logs ...func(context.Context, string)) *Handler {
func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, goroutines bool, logs ...func(context.Context, string)) *Handler {
c := &Handler{
ModelType: modelType,
Write: write,
Validate: validate,
Goroutines: goroutines,
Error: handleError,
}
if retries != nil {
c.Retries = retries
Expand Down Expand Up @@ -144,13 +145,13 @@ func (c *Handler) Handle(ctx context.Context, data []byte, header map[string]str
func (c *Handler) write(ctx context.Context, data []byte, header map[string]string, item interface{}) error {
// ctx = context.WithValue(ctx, "message", message)
if c.Retry == nil && c.Retries != nil && len(c.Retries) > 0 {
return WriteWithRetry(ctx, c.Write, data, item, c.Retries, c.LogError)
return WriteWithRetry(ctx, c.Write, data, item, c.Retries, c.Error, c.LogError)
} else {
return Write(ctx, c.Write, data, header, item, c.Error, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo)
}
}

func WriteWithRetry(ctx context.Context, write func(context.Context, interface{}) error, data []byte, item interface{}, retries []time.Duration, logs...func(context.Context, string)) error {
func WriteWithRetry(ctx context.Context, write func(context.Context, interface{}) error, data []byte, item interface{}, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) error {
var logError func(context.Context, string)
if len(logs) > 0 {
logError = logs[0]
Expand All @@ -165,8 +166,13 @@ func WriteWithRetry(ctx context.Context, write func(context.Context, interface{}
}
return er2
}, logError)
if err != nil && logError != nil {
logError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), data, er1.Error()))
if err != nil {
if handleError != nil {
handleError(ctx, data, nil)
}
if logError != nil {
logError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), data, er1.Error()))
}
}
return err
}
Expand Down

0 comments on commit a59976c

Please sign in to comment.