Skip to content

Commit

Permalink
日志封装 & fix kafka发送消息阻塞
Browse files Browse the repository at this point in the history
  • Loading branch information
hookokoko committed Nov 4, 2023
1 parent 8c03692 commit 86eefc1
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 49 deletions.
10 changes: 9 additions & 1 deletion channel/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package email
import (
"context"
"crypto/tls"
"fmt"
"github.com/ecodeclub/notify-go/pkg/log"
"net"
"net/smtp"

Expand Down Expand Up @@ -58,7 +60,10 @@ func NewEmailChannel(cfg Config) *ChannelEmailImpl {
}

func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery) error {
var err error
var (
err error
logger = log.FromContext(ctx)
)
msgContent := c.initEmailContent(deli.Content)

c.email.To = slice.Map[notifier.Receiver, string](deli.Receivers, func(idx int, src notifier.Receiver) string {
Expand All @@ -72,6 +77,7 @@ func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery)
c.email.HTML = []byte(msgContent.Html)
c.email.Text = []byte(msgContent.Text)

logger.Info("email execute", "params", fmt.Sprintf("to[%v], from[%s]", c.email.To, c.email.From))
ch := make(chan struct{})
go func() {
defer func() {
Expand All @@ -85,9 +91,11 @@ func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery)
select {
case <-ctx.Done():
err = ctx.Err()
logger.Error("email execute err", "err", err.Error())
case <-ch:
if err != nil {
err = errors.Wrap(err, "failed to send mail")
logger.Error("email execute err", "err", err.Error())
}
}

Expand Down
10 changes: 10 additions & 0 deletions channel/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"github.com/ecodeclub/notify-go/pkg/log"
"strconv"
"time"

Expand Down Expand Up @@ -81,6 +82,8 @@ func NewPushChannel(c Config, client ral.Client) *ChannelPushImpl {
}

func (pc *ChannelPushImpl) Execute(ctx context.Context, deli notifier.Delivery) error {
var logger = log.FromContext(ctx)

token, err := pc.getToken(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -110,6 +113,7 @@ func (pc *ChannelPushImpl) Execute(ctx context.Context, deli notifier.Delivery)
var resp map[string]any
err = pc.client.Ral(ctx, "Send", req, &resp, map[string]any{})

logger.Auto("res", err, "resp", resp)
return err
}

Expand All @@ -118,7 +122,11 @@ func (pc *ChannelPushImpl) Name() string {
}

func (pc *ChannelPushImpl) getToken(ctx context.Context) (token string, err error) {
var logger = log.FromContext(ctx)

ts, sign := pc.getSign()
logger.Debug("get sign", "timestamp", ts, "sign", sign)

req := ral.Request{
Header: map[string]string{"content-type": "application/json;charset=utf-8"},
Body: map[string]interface{}{
Expand All @@ -140,6 +148,8 @@ func (pc *ChannelPushImpl) getToken(ctx context.Context) (token string, err erro
if !ok {
err = errors.New("[push] 获取token失败")
}

logger.Debug("get token", "token", token)
return
}

Expand Down
69 changes: 63 additions & 6 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,78 @@ package log

import (
"context"
"github.com/pborman/uuid"
"io"
"log/slog"
"os"
)

type ContextLogKey struct{}
type (
ContextLogKey struct{}
LogIDKey struct{}
)

func FromContext(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(ContextLogKey{}).(*slog.Logger); ok {
func FromContext(ctx context.Context) *Logger {
if l, ok := ctx.Value(ContextLogKey{}).(*Logger); ok {
return l
}
return slog.Default()
return Default()
}

type Logger struct {
*slog.Logger
}

func Default() *Logger {
l := newLogger(os.Stdout, slog.LevelInfo)
l.Logger = l.Logger.With("LOGID", uuid.NewUUID().String())
return l
}

func New() *Logger {
return newLogger(os.Stdout, slog.LevelInfo)
}

func newLogger(w io.Writer, level slog.Level) *Logger {
l := &Logger{
Logger: slog.New(
slog.NewTextHandler(w, &slog.HandlerOptions{
AddSource: false,
Level: level,
}),
),
}
return l
}

func WithContext(ctx context.Context, l *slog.Logger) context.Context {
if _, ok := ctx.Value(ContextLogKey{}).(*slog.Logger); ok {
func (l *Logger) WithContext(ctx context.Context) context.Context {
if _, ok := ctx.Value(ContextLogKey{}).(*Logger); ok {
return ctx
}
return context.WithValue(ctx, ContextLogKey{}, l)
}

func (l *Logger) Auto(msg string, err error, args ...any) {
if err != nil {
l.Error(msg, append(args, "err", err.Error())...)
return
}
l.Info(msg, append(args, "err", nil)...)
}

func (l *Logger) WithFields(args ...any) *Logger {
return &Logger{Logger: l.Logger.With(args...)}
}

func (l *Logger) WithLogID(ctx context.Context) *Logger {
var (
logId string
ok bool
)
logId, ok = ctx.Value(LogIDKey{}).(string)
if !ok {
logId = uuid.NewUUID().String()
}

return &Logger{Logger: l.Logger.With("LOGID", logId)}
}
6 changes: 3 additions & 3 deletions pkg/ral/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ral

import (
"fmt"
"github.com/ecodeclub/notify-go/pkg/log"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (s *StaticsItem) GetDuration() time.Duration {
return s.StopPoint.Sub(s.StartPoint)
}

func (lr *Record) Flush(l *slog.Logger) {
func (lr *Record) Flush(l *log.Logger) {
field := make([]any, 0, 16)
field = append(field,
"code", lr.RspCode,
Expand All @@ -98,8 +99,7 @@ func (lr *Record) Flush(l *slog.Logger) {
"host", lr.Host,
"retry", lr.retry,
"protocol", lr.Protocol,
"method", lr.Method,
"curl", lr.CurlCmd)
"method", lr.Method)

for name, sItem := range lr.timeCostSpan {
dura := sItem.GetDuration()
Expand Down
46 changes: 14 additions & 32 deletions queue/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ package kafka
import (
"context"
"encoding/json"
"log/slog"
"sync"
"time"

"github.com/ecodeclub/notify-go/pkg/log"

"github.com/IBM/sarama"
"github.com/ecodeclub/notify-go/pkg/log"
"github.com/ecodeclub/notify-go/pkg/notifier"
)

Expand Down Expand Up @@ -67,42 +62,27 @@ func (k *Kafka) Produce(ctx context.Context, c notifier.IChannel, delivery notif
config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer(k.Config.Hosts, config)
if err != nil {
slog.Error("[mq] 创建生产者出错", "err", err)
logger.Error("创建生产者出错", "err", err)
return err
}
defer producer.AsyncClose()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-producer.Successes()
}()

wg.Add(1)
go func() {
defer wg.Done()
<-producer.Errors()
}()

// 根据channel类型,和路由策略选取发送的topic
topic, err := k.topicBalancer[c.Name()].GetNext()
if err != nil {
logger.Error("[Producer] choose topic fail", "channel", c.Name(), "err", err)
logger.Error("选择发送topic失败", "channel", c.Name(), "err", err)
return err
}

// 序列化data
data, _ := json.Marshal(delivery)
saramaMsg := &sarama.ProducerMessage{Topic: topic.Name, Key: nil, Value: sarama.ByteEncoder(data)}
producer.Input() <- &sarama.ProducerMessage{Topic: topic.Name, Key: nil, Value: sarama.ByteEncoder(data)}

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
select {
case producer.Input() <- saramaMsg:
logger.Info("[mq] 发送消息成功")
case <-ctx.Done():
logger.Warn("[mq] 发送消息超时")
case <-producer.Successes():
case msgErr := <-producer.Errors():
logger.Error("发送kafka消息失败", "msgErr", msgErr)
}
cancel()
wg.Wait()

return nil
}
Expand All @@ -111,7 +91,7 @@ func (k *Kafka) Consume(ctx context.Context, c notifier.IChannel) {
logger := log.FromContext(ctx)
consumer, err := k.newConsumer(c.Name())
if err != nil {
logger.Error("[kafka] 消费者启动失败", "err", err)
logger.Error("消费者启动失败", "err", err)
}

for {
Expand Down Expand Up @@ -165,7 +145,7 @@ func (k *Kafka) WrapSaramaHandler(ctx context.Context, executor notifier.IChanne
}

type ConsumeWrapper struct {
logger *slog.Logger
logger *log.Logger
Executor notifier.IChannel
}

Expand All @@ -176,7 +156,9 @@ func (c *ConsumeWrapper) ConsumeClaim(session sarama.ConsumerGroupSession, claim
if err != nil {
c.logger.Error("[consumer] unmarshal task detail fail", "err", err)
}
err = c.Executor.Execute(context.TODO(), delivery)

ctx := c.logger.WithContext(context.Background())
err = c.Executor.Execute(ctx, delivery)
if err != nil {
c.logger.Error("[consumer] 执行消息发送失败",
"topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "err", err)
Expand Down
12 changes: 6 additions & 6 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package notify_go

import (
"context"
"log/slog"
"github.com/ecodeclub/notify-go/pkg/log"
"time"

"github.com/ecodeclub/ekit/slice"
Expand Down Expand Up @@ -49,8 +49,8 @@ type CircleTask struct {
EndTime time.Time
HookBefore func()
HookAfter func()
CircleNum uint64
CircleFailNum uint64
circleNum uint64
circleFailNum uint64
*Notification
}

Expand Down Expand Up @@ -128,11 +128,11 @@ func (ct *CircleTask) Send(ctx context.Context) {
break
}
<-time.After(time.Until(triggerPoint))
ct.CircleNum++
ct.circleNum++
err := ct.Notification.Send(context.TODO())
if err != nil {
ct.CircleFailNum++
slog.Error("circle task execute fail", "err", err)
ct.circleFailNum++
log.Default().Error("circle task execute fail", "err", err)
}

Check warning on line 136 in task.go

View check run for this annotation

Codecov / codecov/patch

task.go#L134-L136

Added lines #L134 - L136 were not covered by tests
}
}
Expand Down
2 changes: 1 addition & 1 deletion task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestCircleTask_Send(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
defer cancel()
ct.Send(ctx)
assert.Equal(t, tt.wantCnt, ct.CircleNum)
assert.Equal(t, tt.wantCnt, ct.circleNum)
})
}
}
Expand Down

0 comments on commit 86eefc1

Please sign in to comment.