From 86eefc18ca193a07b9d4547e5132cb0745b33ad0 Mon Sep 17 00:00:00 2001 From: hookokoko Date: Sun, 5 Nov 2023 00:00:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E5=B0=81=E8=A3=85=20&=20fix?= =?UTF-8?q?=20kafka=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- channel/email/email.go | 10 +++++- channel/push/push.go | 10 ++++++ pkg/log/log.go | 69 ++++++++++++++++++++++++++++++++++++++---- pkg/ral/record.go | 6 ++-- queue/kafka/kafka.go | 46 +++++++++------------------- task.go | 12 ++++---- task_test.go | 2 +- 7 files changed, 106 insertions(+), 49 deletions(-) diff --git a/channel/email/email.go b/channel/email/email.go index ba1ebe7..5511473 100644 --- a/channel/email/email.go +++ b/channel/email/email.go @@ -17,6 +17,8 @@ package email import ( "context" "crypto/tls" + "fmt" + "github.com/ecodeclub/notify-go/pkg/log" "net" "net/smtp" @@ -58,7 +60,10 @@ func NewEmailChannel(cfg Config) *ChannelEmailImpl { } func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery) error { - var err error + var ( + err error + logger = log.FromContext(ctx) + ) msgContent := c.initEmailContent(deli.Content) c.email.To = slice.Map[notifier.Receiver, string](deli.Receivers, func(idx int, src notifier.Receiver) string { @@ -72,6 +77,7 @@ func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery) c.email.HTML = []byte(msgContent.Html) c.email.Text = []byte(msgContent.Text) + logger.Info("email execute", "params", fmt.Sprintf("to[%v], from[%s]", c.email.To, c.email.From)) ch := make(chan struct{}) go func() { defer func() { @@ -85,9 +91,11 @@ func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery) select { case <-ctx.Done(): err = ctx.Err() + logger.Error("email execute err", "err", err.Error()) case <-ch: if err != nil { err = errors.Wrap(err, "failed to send mail") + logger.Error("email execute err", "err", err.Error()) } } diff --git a/channel/push/push.go b/channel/push/push.go index b1ccba6..ddf8eb8 100644 --- a/channel/push/push.go +++ b/channel/push/push.go @@ -18,6 +18,7 @@ import ( "context" "crypto/sha256" "fmt" + "github.com/ecodeclub/notify-go/pkg/log" "strconv" "time" @@ -81,6 +82,8 @@ func NewPushChannel(c Config, client ral.Client) *ChannelPushImpl { } func (pc *ChannelPushImpl) Execute(ctx context.Context, deli notifier.Delivery) error { + var logger = log.FromContext(ctx) + token, err := pc.getToken(ctx) if err != nil { return err @@ -110,6 +113,7 @@ func (pc *ChannelPushImpl) Execute(ctx context.Context, deli notifier.Delivery) var resp map[string]any err = pc.client.Ral(ctx, "Send", req, &resp, map[string]any{}) + logger.Auto("res", err, "resp", resp) return err } @@ -118,7 +122,11 @@ func (pc *ChannelPushImpl) Name() string { } func (pc *ChannelPushImpl) getToken(ctx context.Context) (token string, err error) { + var logger = log.FromContext(ctx) + ts, sign := pc.getSign() + logger.Debug("get sign", "timestamp", ts, "sign", sign) + req := ral.Request{ Header: map[string]string{"content-type": "application/json;charset=utf-8"}, Body: map[string]interface{}{ @@ -140,6 +148,8 @@ func (pc *ChannelPushImpl) getToken(ctx context.Context) (token string, err erro if !ok { err = errors.New("[push] 获取token失败") } + + logger.Debug("get token", "token", token) return } diff --git a/pkg/log/log.go b/pkg/log/log.go index 122f782..0950ba7 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -16,21 +16,78 @@ package log import ( "context" + "github.com/pborman/uuid" + "io" "log/slog" + "os" ) -type ContextLogKey struct{} +type ( + ContextLogKey struct{} + LogIDKey struct{} +) -func FromContext(ctx context.Context) *slog.Logger { - if l, ok := ctx.Value(ContextLogKey{}).(*slog.Logger); ok { +func FromContext(ctx context.Context) *Logger { + if l, ok := ctx.Value(ContextLogKey{}).(*Logger); ok { return l } - return slog.Default() + return Default() +} + +type Logger struct { + *slog.Logger +} + +func Default() *Logger { + l := newLogger(os.Stdout, slog.LevelInfo) + l.Logger = l.Logger.With("LOGID", uuid.NewUUID().String()) + return l +} + +func New() *Logger { + return newLogger(os.Stdout, slog.LevelInfo) +} + +func newLogger(w io.Writer, level slog.Level) *Logger { + l := &Logger{ + Logger: slog.New( + slog.NewTextHandler(w, &slog.HandlerOptions{ + AddSource: false, + Level: level, + }), + ), + } + return l } -func WithContext(ctx context.Context, l *slog.Logger) context.Context { - if _, ok := ctx.Value(ContextLogKey{}).(*slog.Logger); ok { +func (l *Logger) WithContext(ctx context.Context) context.Context { + if _, ok := ctx.Value(ContextLogKey{}).(*Logger); ok { return ctx } return context.WithValue(ctx, ContextLogKey{}, l) } + +func (l *Logger) Auto(msg string, err error, args ...any) { + if err != nil { + l.Error(msg, append(args, "err", err.Error())...) + return + } + l.Info(msg, append(args, "err", nil)...) +} + +func (l *Logger) WithFields(args ...any) *Logger { + return &Logger{Logger: l.Logger.With(args...)} +} + +func (l *Logger) WithLogID(ctx context.Context) *Logger { + var ( + logId string + ok bool + ) + logId, ok = ctx.Value(LogIDKey{}).(string) + if !ok { + logId = uuid.NewUUID().String() + } + + return &Logger{Logger: l.Logger.With("LOGID", logId)} +} diff --git a/pkg/ral/record.go b/pkg/ral/record.go index 1cbe979..a1684bb 100644 --- a/pkg/ral/record.go +++ b/pkg/ral/record.go @@ -16,6 +16,7 @@ package ral import ( "fmt" + "github.com/ecodeclub/notify-go/pkg/log" "log/slog" "sync" "time" @@ -89,7 +90,7 @@ func (s *StaticsItem) GetDuration() time.Duration { return s.StopPoint.Sub(s.StartPoint) } -func (lr *Record) Flush(l *slog.Logger) { +func (lr *Record) Flush(l *log.Logger) { field := make([]any, 0, 16) field = append(field, "code", lr.RspCode, @@ -98,8 +99,7 @@ func (lr *Record) Flush(l *slog.Logger) { "host", lr.Host, "retry", lr.retry, "protocol", lr.Protocol, - "method", lr.Method, - "curl", lr.CurlCmd) + "method", lr.Method) for name, sItem := range lr.timeCostSpan { dura := sItem.GetDuration() diff --git a/queue/kafka/kafka.go b/queue/kafka/kafka.go index ca6c292..2a59b8e 100644 --- a/queue/kafka/kafka.go +++ b/queue/kafka/kafka.go @@ -17,13 +17,8 @@ package kafka import ( "context" "encoding/json" - "log/slog" - "sync" - "time" - - "github.com/ecodeclub/notify-go/pkg/log" - "github.com/IBM/sarama" + "github.com/ecodeclub/notify-go/pkg/log" "github.com/ecodeclub/notify-go/pkg/notifier" ) @@ -67,42 +62,27 @@ func (k *Kafka) Produce(ctx context.Context, c notifier.IChannel, delivery notif config.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer(k.Config.Hosts, config) if err != nil { - slog.Error("[mq] 创建生产者出错", "err", err) + logger.Error("创建生产者出错", "err", err) + return err } defer producer.AsyncClose() - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - <-producer.Successes() - }() - - wg.Add(1) - go func() { - defer wg.Done() - <-producer.Errors() - }() - // 根据channel类型,和路由策略选取发送的topic topic, err := k.topicBalancer[c.Name()].GetNext() if err != nil { - logger.Error("[Producer] choose topic fail", "channel", c.Name(), "err", err) + logger.Error("选择发送topic失败", "channel", c.Name(), "err", err) + return err } // 序列化data data, _ := json.Marshal(delivery) - saramaMsg := &sarama.ProducerMessage{Topic: topic.Name, Key: nil, Value: sarama.ByteEncoder(data)} + producer.Input() <- &sarama.ProducerMessage{Topic: topic.Name, Key: nil, Value: sarama.ByteEncoder(data)} - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) select { - case producer.Input() <- saramaMsg: - logger.Info("[mq] 发送消息成功") - case <-ctx.Done(): - logger.Warn("[mq] 发送消息超时") + case <-producer.Successes(): + case msgErr := <-producer.Errors(): + logger.Error("发送kafka消息失败", "msgErr", msgErr) } - cancel() - wg.Wait() return nil } @@ -111,7 +91,7 @@ func (k *Kafka) Consume(ctx context.Context, c notifier.IChannel) { logger := log.FromContext(ctx) consumer, err := k.newConsumer(c.Name()) if err != nil { - logger.Error("[kafka] 消费者启动失败", "err", err) + logger.Error("消费者启动失败", "err", err) } for { @@ -165,7 +145,7 @@ func (k *Kafka) WrapSaramaHandler(ctx context.Context, executor notifier.IChanne } type ConsumeWrapper struct { - logger *slog.Logger + logger *log.Logger Executor notifier.IChannel } @@ -176,7 +156,9 @@ func (c *ConsumeWrapper) ConsumeClaim(session sarama.ConsumerGroupSession, claim if err != nil { c.logger.Error("[consumer] unmarshal task detail fail", "err", err) } - err = c.Executor.Execute(context.TODO(), delivery) + + ctx := c.logger.WithContext(context.Background()) + err = c.Executor.Execute(ctx, delivery) if err != nil { c.logger.Error("[consumer] 执行消息发送失败", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "err", err) diff --git a/task.go b/task.go index b900fea..b7cab1c 100644 --- a/task.go +++ b/task.go @@ -16,7 +16,7 @@ package notify_go import ( "context" - "log/slog" + "github.com/ecodeclub/notify-go/pkg/log" "time" "github.com/ecodeclub/ekit/slice" @@ -49,8 +49,8 @@ type CircleTask struct { EndTime time.Time HookBefore func() HookAfter func() - CircleNum uint64 - CircleFailNum uint64 + circleNum uint64 + circleFailNum uint64 *Notification } @@ -128,11 +128,11 @@ func (ct *CircleTask) Send(ctx context.Context) { break } <-time.After(time.Until(triggerPoint)) - ct.CircleNum++ + ct.circleNum++ err := ct.Notification.Send(context.TODO()) if err != nil { - ct.CircleFailNum++ - slog.Error("circle task execute fail", "err", err) + ct.circleFailNum++ + log.Default().Error("circle task execute fail", "err", err) } } } diff --git a/task_test.go b/task_test.go index b41a673..882479a 100644 --- a/task_test.go +++ b/task_test.go @@ -138,7 +138,7 @@ func TestCircleTask_Send(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second) defer cancel() ct.Send(ctx) - assert.Equal(t, tt.wantCnt, ct.CircleNum) + assert.Equal(t, tt.wantCnt, ct.circleNum) }) } }