diff --git a/error_writer.go b/error_writer.go new file mode 100644 index 0000000..a4f5d69 --- /dev/null +++ b/error_writer.go @@ -0,0 +1,48 @@ +package mq + +import ( + "context" + "encoding/json" + "fmt" + "reflect" +) + +func NewErrorWriter(write func(ctx context.Context, model interface{}) error, modelType *reflect.Type, logError ...func(context.Context, string)) *ErrorWriter { + h := &ErrorWriter{Write: write, ModelType: modelType} + if len(logError) >= 1 { + h.LogError = logError[0] + } + return h +} + +type ErrorWriter struct { + Write func(ctx context.Context, model interface{}) error + ModelType *reflect.Type + LogError func(context.Context, string) +} + +func (w *ErrorWriter) HandleError(ctx context.Context, data []byte, attrs map[string]string) error { + if data == nil { + return nil + } + if w.Write != nil { + if w.ModelType == nil { + return w.Write(ctx, data) + } else { + v := InitModel(*w.ModelType) + err := json.Unmarshal(data, v) + if err != nil { + return err + } + return w.Write(ctx, v) + } + } + if w.LogError != nil { + if attrs == nil || len(attrs) == 0 { + w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s", data)) + } else { + w.LogError(ctx, fmt.Sprintf("Fail to consume message: %s %s", data, attrs)) + } + } + return nil +} diff --git a/handler.go b/handler.go index db5dea5..02c8075 100644 --- a/handler.go +++ b/handler.go @@ -31,22 +31,23 @@ type Handler struct { func NewHandlerByConfig(c HandlerConfig, write func(context.Context, interface{}) error, modelType *reflect.Type, retry func(context.Context, []byte, map[string]string) error, validate func(context.Context, *Message) error, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *Handler { return NewHandlerWithRetryService(write, modelType, c.LimitRetry, retry, c.RetryCountName, validate, handleError, c.Goroutines, logs...) } -func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, logs ...func(context.Context, string)) *Handler { +func NewHandlerWithRetryConfig(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, c *RetryConfig, goroutines bool, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) *Handler { if c == nil { - return NewHandlerWithRetries(write, modelType, validate, nil, goroutines, logs...) + return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, logs...) } retries := DurationsFromValue(*c, "Retry", 20) if len(retries) == 0 { - return NewHandlerWithRetries(write, modelType, validate, nil, goroutines, logs...) + return NewHandlerWithRetries(write, modelType, validate, nil, handleError, goroutines, logs...) } - return NewHandlerWithRetries(write, modelType, validate, retries, goroutines, logs...) + return NewHandlerWithRetries(write, modelType, validate, retries, handleError, goroutines, logs...) } -func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, goroutines bool, logs ...func(context.Context, string)) *Handler { +func NewHandlerWithRetries(write func(context.Context, interface{}) error, modelType *reflect.Type, validate func(context.Context, *Message) error, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, goroutines bool, logs ...func(context.Context, string)) *Handler { c := &Handler{ ModelType: modelType, Write: write, Validate: validate, Goroutines: goroutines, + Error: handleError, } if retries != nil { c.Retries = retries @@ -144,13 +145,13 @@ func (c *Handler) Handle(ctx context.Context, data []byte, header map[string]str func (c *Handler) write(ctx context.Context, data []byte, header map[string]string, item interface{}) error { // ctx = context.WithValue(ctx, "message", message) if c.Retry == nil && c.Retries != nil && len(c.Retries) > 0 { - return WriteWithRetry(ctx, c.Write, data, item, c.Retries, c.LogError) + return WriteWithRetry(ctx, c.Write, data, item, c.Retries, c.Error, c.LogError) } else { return Write(ctx, c.Write, data, header, item, c.Error, c.Retry, c.LimitRetry, c.RetryCountName, c.LogError, c.LogInfo) } } -func WriteWithRetry(ctx context.Context, write func(context.Context, interface{}) error, data []byte, item interface{}, retries []time.Duration, logs...func(context.Context, string)) error { +func WriteWithRetry(ctx context.Context, write func(context.Context, interface{}) error, data []byte, item interface{}, retries []time.Duration, handleError func(context.Context, []byte, map[string]string) error, logs ...func(context.Context, string)) error { var logError func(context.Context, string) if len(logs) > 0 { logError = logs[0] @@ -165,8 +166,13 @@ func WriteWithRetry(ctx context.Context, write func(context.Context, interface{} } return er2 }, logError) - if err != nil && logError != nil { - logError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), data, er1.Error())) + if err != nil { + if handleError != nil { + handleError(ctx, data, nil) + } + if logError != nil { + logError(ctx, fmt.Sprintf("Failed to write after %d retries: %s. Error: %s.", len(retries), data, er1.Error())) + } } return err }