Skip to content

Commit

Permalink
日志封装 & fix kafka发送消息阻塞
Browse files Browse the repository at this point in the history
  • Loading branch information
hookokoko committed Nov 4, 2023
1 parent 8c03692 commit b0d9bd7
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 48 deletions.
11 changes: 10 additions & 1 deletion channel/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package email
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/smtp"

"github.com/ecodeclub/notify-go/pkg/log"

"github.com/ecodeclub/ekit/slice"
"github.com/ecodeclub/notify-go/pkg/notifier"
"github.com/jordan-wright/email"
Expand Down Expand Up @@ -58,7 +61,10 @@ func NewEmailChannel(cfg Config) *ChannelEmailImpl {
}

func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery) error {
var err error
var (
err error
logger = log.FromContext(ctx)
)
msgContent := c.initEmailContent(deli.Content)

c.email.To = slice.Map[notifier.Receiver, string](deli.Receivers, func(idx int, src notifier.Receiver) string {
Expand All @@ -72,6 +78,7 @@ func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery)
c.email.HTML = []byte(msgContent.Html)
c.email.Text = []byte(msgContent.Text)

logger.Info("email execute", "params", fmt.Sprintf("to[%v], from[%s]", c.email.To, c.email.From))
ch := make(chan struct{})
go func() {
defer func() {
Expand All @@ -85,9 +92,11 @@ func (c *ChannelEmailImpl) Execute(ctx context.Context, deli notifier.Delivery)
select {
case <-ctx.Done():
err = ctx.Err()
logger.Error("email execute err", "err", err.Error())
case <-ch:
if err != nil {
err = errors.Wrap(err, "failed to send mail")
logger.Error("email execute err", "err", err.Error())
}
}

Expand Down
11 changes: 11 additions & 0 deletions channel/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strconv"
"time"

"github.com/ecodeclub/notify-go/pkg/log"

"github.com/ecodeclub/ekit/slice"
"github.com/ecodeclub/notify-go/pkg/notifier"
"github.com/ecodeclub/notify-go/pkg/ral"
Expand Down Expand Up @@ -81,6 +83,8 @@ func NewPushChannel(c Config, client ral.Client) *ChannelPushImpl {
}

func (pc *ChannelPushImpl) Execute(ctx context.Context, deli notifier.Delivery) error {
var logger = log.FromContext(ctx)

token, err := pc.getToken(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -110,6 +114,7 @@ func (pc *ChannelPushImpl) Execute(ctx context.Context, deli notifier.Delivery)
var resp map[string]any
err = pc.client.Ral(ctx, "Send", req, &resp, map[string]any{})

logger.Auto("res", err, "resp", resp)
return err
}

Expand All @@ -118,7 +123,11 @@ func (pc *ChannelPushImpl) Name() string {
}

func (pc *ChannelPushImpl) getToken(ctx context.Context) (token string, err error) {
var logger = log.FromContext(ctx)

ts, sign := pc.getSign()
logger.Debug("get sign", "timestamp", ts, "sign", sign)

req := ral.Request{
Header: map[string]string{"content-type": "application/json;charset=utf-8"},
Body: map[string]interface{}{
Expand All @@ -140,6 +149,8 @@ func (pc *ChannelPushImpl) getToken(ctx context.Context) (token string, err erro
if !ok {
err = errors.New("[push] 获取token失败")
}

logger.Debug("get token", "token", token)
return
}

Expand Down
70 changes: 64 additions & 6 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,79 @@ package log

import (
"context"
"io"
"log/slog"
"os"

"github.com/pborman/uuid"
)

type ContextLogKey struct{}
type (
ContextLogKey struct{}
LogIDKey struct{}
)

func FromContext(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(ContextLogKey{}).(*slog.Logger); ok {
func FromContext(ctx context.Context) *Logger {
if l, ok := ctx.Value(ContextLogKey{}).(*Logger); ok {
return l
}
return slog.Default()
return Default()
}

type Logger struct {
*slog.Logger
}

func Default() *Logger {
l := newLogger(os.Stdout, slog.LevelInfo)
l.Logger = l.Logger.With("LOGID", uuid.NewUUID().String())
return l
}

func New() *Logger {
return newLogger(os.Stdout, slog.LevelInfo)
}

func newLogger(w io.Writer, level slog.Level) *Logger {
l := &Logger{
Logger: slog.New(
slog.NewTextHandler(w, &slog.HandlerOptions{
AddSource: false,
Level: level,
}),
),
}
return l
}

func WithContext(ctx context.Context, l *slog.Logger) context.Context {
if _, ok := ctx.Value(ContextLogKey{}).(*slog.Logger); ok {
func (l *Logger) WithContext(ctx context.Context) context.Context {
if _, ok := ctx.Value(ContextLogKey{}).(*Logger); ok {
return ctx
}
return context.WithValue(ctx, ContextLogKey{}, l)
}

func (l *Logger) Auto(msg string, err error, args ...any) {
if err != nil {
l.Error(msg, append(args, "err", err.Error())...)
return
}
l.Info(msg, append(args, "err", nil)...)
}

func (l *Logger) WithFields(args ...any) *Logger {
return &Logger{Logger: l.Logger.With(args...)}
}

func (l *Logger) WithLogID(ctx context.Context) *Logger {
var (
logId string
ok bool
)
logId, ok = ctx.Value(LogIDKey{}).(string)
if !ok {
logId = uuid.NewUUID().String()
}

return &Logger{Logger: l.Logger.With("LOGID", logId)}
}
7 changes: 4 additions & 3 deletions pkg/ral/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"log/slog"
"sync"
"time"

"github.com/ecodeclub/notify-go/pkg/log"
)

type Record struct {
Expand Down Expand Up @@ -89,7 +91,7 @@ func (s *StaticsItem) GetDuration() time.Duration {
return s.StopPoint.Sub(s.StartPoint)
}

func (lr *Record) Flush(l *slog.Logger) {
func (lr *Record) Flush(l *log.Logger) {
field := make([]any, 0, 16)
field = append(field,
"code", lr.RspCode,
Expand All @@ -98,8 +100,7 @@ func (lr *Record) Flush(l *slog.Logger) {
"host", lr.Host,
"retry", lr.retry,
"protocol", lr.Protocol,
"method", lr.Method,
"curl", lr.CurlCmd)
"method", lr.Method)

for name, sItem := range lr.timeCostSpan {
dura := sItem.GetDuration()
Expand Down
45 changes: 14 additions & 31 deletions queue/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ package kafka
import (
"context"
"encoding/json"
"log/slog"
"sync"
"time"

"github.com/ecodeclub/notify-go/pkg/log"

"github.com/IBM/sarama"
"github.com/ecodeclub/notify-go/pkg/log"
"github.com/ecodeclub/notify-go/pkg/notifier"
)

Expand Down Expand Up @@ -67,42 +63,27 @@ func (k *Kafka) Produce(ctx context.Context, c notifier.IChannel, delivery notif
config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer(k.Config.Hosts, config)
if err != nil {
slog.Error("[mq] 创建生产者出错", "err", err)
logger.Error("创建生产者出错", "err", err)
return err
}
defer producer.AsyncClose()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-producer.Successes()
}()

wg.Add(1)
go func() {
defer wg.Done()
<-producer.Errors()
}()

// 根据channel类型,和路由策略选取发送的topic
topic, err := k.topicBalancer[c.Name()].GetNext()
if err != nil {
logger.Error("[Producer] choose topic fail", "channel", c.Name(), "err", err)
logger.Error("选择发送topic失败", "channel", c.Name(), "err", err)
return err
}

// 序列化data
data, _ := json.Marshal(delivery)
saramaMsg := &sarama.ProducerMessage{Topic: topic.Name, Key: nil, Value: sarama.ByteEncoder(data)}
producer.Input() <- &sarama.ProducerMessage{Topic: topic.Name, Key: nil, Value: sarama.ByteEncoder(data)}

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
select {
case producer.Input() <- saramaMsg:
logger.Info("[mq] 发送消息成功")
case <-ctx.Done():
logger.Warn("[mq] 发送消息超时")
case <-producer.Successes():
case msgErr := <-producer.Errors():
logger.Error("发送kafka消息失败", "msgErr", msgErr)
}
cancel()
wg.Wait()

return nil
}
Expand All @@ -111,7 +92,7 @@ func (k *Kafka) Consume(ctx context.Context, c notifier.IChannel) {
logger := log.FromContext(ctx)
consumer, err := k.newConsumer(c.Name())
if err != nil {
logger.Error("[kafka] 消费者启动失败", "err", err)
logger.Error("消费者启动失败", "err", err)
}

for {
Expand Down Expand Up @@ -165,7 +146,7 @@ func (k *Kafka) WrapSaramaHandler(ctx context.Context, executor notifier.IChanne
}

type ConsumeWrapper struct {
logger *slog.Logger
logger *log.Logger
Executor notifier.IChannel
}

Expand All @@ -176,7 +157,9 @@ func (c *ConsumeWrapper) ConsumeClaim(session sarama.ConsumerGroupSession, claim
if err != nil {
c.logger.Error("[consumer] unmarshal task detail fail", "err", err)
}
err = c.Executor.Execute(context.TODO(), delivery)

ctx := c.logger.WithContext(context.Background())
err = c.Executor.Execute(ctx, delivery)
if err != nil {
c.logger.Error("[consumer] 执行消息发送失败",
"topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "err", err)
Expand Down
13 changes: 7 additions & 6 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package notify_go

import (
"context"
"log/slog"
"time"

"github.com/ecodeclub/notify-go/pkg/log"

"github.com/ecodeclub/ekit/slice"
"github.com/ecodeclub/notify-go/pkg/iterator"
"github.com/gorhill/cronexpr"
Expand Down Expand Up @@ -49,8 +50,8 @@ type CircleTask struct {
EndTime time.Time
HookBefore func()
HookAfter func()
CircleNum uint64
CircleFailNum uint64
circleNum uint64
circleFailNum uint64
*Notification
}

Expand Down Expand Up @@ -128,11 +129,11 @@ func (ct *CircleTask) Send(ctx context.Context) {
break
}
<-time.After(time.Until(triggerPoint))
ct.CircleNum++
ct.circleNum++
err := ct.Notification.Send(context.TODO())
if err != nil {
ct.CircleFailNum++
slog.Error("circle task execute fail", "err", err)
ct.circleFailNum++
log.Default().Error("circle task execute fail", "err", err)
}

Check warning on line 137 in task.go

View check run for this annotation

Codecov / codecov/patch

task.go#L135-L137

Added lines #L135 - L137 were not covered by tests
}
}
Expand Down
2 changes: 1 addition & 1 deletion task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestCircleTask_Send(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
defer cancel()
ct.Send(ctx)
assert.Equal(t, tt.wantCnt, ct.CircleNum)
assert.Equal(t, tt.wantCnt, ct.circleNum)
})
}
}
Expand Down

0 comments on commit b0d9bd7

Please sign in to comment.