Skip to content

Commit

Permalink
Refactor pubsub, kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jun 2, 2024
1 parent 895611d commit 8e7316f
Show file tree
Hide file tree
Showing 34 changed files with 1,024 additions and 1,497 deletions.
109 changes: 25 additions & 84 deletions batch_handler.go
Original file line number Diff line number Diff line change
@@ -1,100 +1,41 @@
package mq

import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"
)
import "context"

type Message struct {
Id string `json:"id,omitempty" gorm:"column:id;primary_key" bson:"id,omitempty" dynamodbav:"id,omitempty" firestore:"id,omitempty"`
Data []byte `json:"data,omitempty" gorm:"column:data" bson:"data,omitempty" dynamodbav:"data,omitempty" firestore:"data,omitempty"`
Attributes map[string]string `json:"attributes,omitempty" gorm:"column:attributes" bson:"attributes,omitempty" dynamodbav:"attributes,omitempty" firestore:"attributes,omitempty"`
Timestamp *time.Time `json:"timestamp,omitempty" gorm:"column:timestamp" bson:"timestamp,omitempty" dynamodbav:"timestamp,omitempty" firestore:"timestamp,omitempty"`
Raw interface{} `json:"-" bson:"-" dynamodbav:"-" firestore:"-"`
Value interface{} `json:"-" bson:"-" dynamodbav:"-" firestore:"-"`
type Message[T any] struct {
Data []byte `yaml:"data" mapstructure:"data" json:"data,omitempty" gorm:"column:data" bson:"data,omitempty" dynamodbav:"data,omitempty" firestore:"data,omitempty"`
Attributes map[string]string `yaml:"attributes" mapstructure:"attributes" json:"attributes,omitempty" gorm:"column:attributes" bson:"attributes,omitempty" dynamodbav:"attributes,omitempty" firestore:"attributes,omitempty"`
Value T `yaml:"value" mapstructure:"value" json:"value,omitempty" gorm:"column:value" bson:"value,omitempty" dynamodbav:"value,omitempty" firestore:"value,omitempty"`
}

type BatchHandler struct {
modelType reflect.Type
modelsType reflect.Type
Write func(ctx context.Context, models interface{}) ([]int, []int, error) // Return: Success indices, Fail indices, Error
Unmarshal func(data []byte, v interface{}) error
Marshal func(v interface{}) ([]byte, error)
LogError func(context.Context, string)
LogInfo func(context.Context, string)
type BatchHandler[T any] struct {
Write func(context.Context, []T) ([]int, error) // Return: Fail indices, Error
}

func NewBatchHandler(writeBatch func(context.Context, interface{}) ([]int, []int, error), modelType reflect.Type, unmarshal func(data []byte, v interface{}) error, logs ...func(context.Context, string)) *BatchHandler {
modelsType := reflect.Zero(reflect.SliceOf(modelType)).Type()
if unmarshal == nil {
unmarshal = json.Unmarshal
}
h := &BatchHandler{modelType: modelType, modelsType: modelsType, Write: writeBatch}
if len(logs) >= 1 {
h.LogError = logs[0]
}
if len(logs) >= 2 {
h.LogInfo = logs[1]
}
func NewBatchHandler[T any](writeBatch func(context.Context, []T) ([]int, error)) *BatchHandler[T] {
h := &BatchHandler[T]{Write: writeBatch}
return h
}

func (h *BatchHandler) Handle(ctx context.Context, data []*Message) ([]*Message, error) {
failMessages := make([]*Message, 0)

vs := InitModels(h.modelsType)
var v = reflect.Indirect(reflect.ValueOf(vs))
for _, message := range data {
if message.Value != nil {
vo := reflect.ValueOf(message.Value)
v1 := reflect.Indirect(vo)
v = reflect.Append(v, v1)
} else {
item := InitModel(h.modelType)
err := h.Unmarshal(message.Data, item)
if err != nil {
failMessages = append(failMessages, message)
return failMessages, fmt.Errorf(`cannot unmarshal item: %s. Error: %s`, message.Data, err.Error())
}
vo := reflect.ValueOf(item)
x := reflect.Indirect(vo).Interface()
v = reflect.Append(v, reflect.ValueOf(x))
func (h *BatchHandler[T]) Handle(ctx context.Context, data []Message[T]) ([]Message[T], error) {
failMessages := make([]Message[T], 0)
var vs []T
le := len(data)
if le > 0 {
for i := 0; i < le; i++ {
message := data[i]
vs = append(vs, message.Value)
}
}
if h.LogInfo != nil {
if h.Marshal != nil {
sv, er0 := h.Marshal(v.Interface())
if er0 != nil {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v))
} else {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, sv))
}
} else {
h.LogInfo(ctx, fmt.Sprintf(`models: %s`, v))
failIndices, err := h.Write(ctx, vs)
if err != nil {
return failMessages, err
}
}
successIndices, failIndices, er1 := h.Write(ctx, v.Interface())
if h.LogInfo != nil {
h.LogInfo(ctx, fmt.Sprintf(`success indices %v fail indices %v`, successIndices, failIndices))
}
if successIndices != nil && len(successIndices) > 0 {
if failIndices != nil && len(failIndices) > 0 {
for _, failIndex := range failIndices {
failMessages = append(failMessages, data[failIndex])
sl := len(failIndices)
if sl > 0 {
for j := 0; j < sl; j++ {
failMessages = append(failMessages, data[failIndices[j]])
}
}
return failMessages, nil
}
if er1 != nil && h.LogError != nil {
sv, er2 := json.Marshal(v.Interface())
if er2 != nil {
h.LogError(ctx, fmt.Sprintf("Cannot write batch: %s Error: %s", v.Interface(), er1.Error()))
} else {
h.LogError(ctx, fmt.Sprintf("Cannot write batch: %s Error: %s", sv, er1.Error()))
}
}
return data, er1
return failMessages, nil
}
Loading

0 comments on commit 8e7316f

Please sign in to comment.