Skip to content

Commit

Permalink
Support jsonline protocol
Browse files Browse the repository at this point in the history
* support jsonline protocol
* add UTs
* support field rename and protocol rename
* reuse bytebuf
* use jsoniter instead of encoding/json
  • Loading branch information
lujiajing1126 committed Dec 15, 2023
1 parent a1e3ce5 commit 9f0b7b7
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pkg/protocol/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ProtocolCustomSingleFlatten = "custom_single_flatten"
ProtocolOtlpV1 = "otlp_v1"
ProtocolInfluxdb = "influxdb"
ProtocolJsonline = "jsonline"
ProtocolRaw = "raw"
)

Expand Down Expand Up @@ -110,6 +111,9 @@ var supportedEncodingMap = map[string]map[string]bool{
ProtocolInfluxdb: {
EncodingCustom: true,
},
ProtocolJsonline: {
EncodingJSON: true,
},
ProtocolRaw: {
EncodingCustom: true,
},
Expand Down Expand Up @@ -181,6 +185,8 @@ func (c *Converter) ToByteStreamWithSelectedFields(logGroup *protocol.LogGroup,
return c.ConvertToSingleProtocolStreamFlatten(logGroup, targetFields)
case ProtocolInfluxdb:
return c.ConvertToInfluxdbProtocolStream(logGroup, targetFields)
case ProtocolJsonline:
return c.ConvertToJsonlineProtocolStreamFlatten(logGroup)
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", c.Protocol)
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/protocol/converter/jsonline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package protocol

import (
"bytes"
"fmt"

jsoniter "github.com/json-iterator/go"

"github.com/alibaba/ilogtail/pkg/protocol"
)

var (
sep = []byte("\n")
)

func (c *Converter) ConvertToJsonlineProtocolStreamFlatten(logGroup *protocol.LogGroup) ([]byte, []map[string]string, error) {
convertedLogs, _, err := c.ConvertToSingleProtocolLogsFlatten(logGroup, nil)
if err != nil {
return nil, nil, err
}
joinedStream := *GetPooledByteBuf()
for i, log := range convertedLogs {
switch c.Encoding {
case EncodingJSON:
var err error
joinedStream, err = marshalWithoutHTMLEscapedWithoutAlloc(log, bytes.NewBuffer(joinedStream))
if err != nil {
// release byte buffer
PutPooledByteBuf(&joinedStream)
return nil, nil, fmt.Errorf("unable to marshal log: %v", log)
}
// trim and append a \n
joinedStream = trimRightByte(joinedStream, sep[0])
if i < len(convertedLogs)-1 {
joinedStream = append(joinedStream, sep[0])
}
default:
return nil, nil, fmt.Errorf("unsupported encoding format: %s", c.Encoding)
}
}
return joinedStream, nil, nil
}

func marshalWithoutHTMLEscapedWithoutAlloc(data interface{}, bf *bytes.Buffer) ([]byte, error) {
enc := jsoniter.ConfigFastest.NewEncoder(bf)
if err := enc.Encode(data); err != nil {
return nil, err
}
return bf.Bytes(), nil
}

func trimRightByte(s []byte, c byte) []byte {
for len(s) > 0 && s[len(s)-1] == c {
s = s[:len(s)-1]
}
return s
}
246 changes: 246 additions & 0 deletions pkg/protocol/converter/jsonline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package protocol

import (
"bytes"
"encoding/json"
"testing"

. "github.com/smartystreets/goconvey/convey"

"github.com/alibaba/ilogtail/pkg/flags"
"github.com/alibaba/ilogtail/pkg/protocol"
)

func TestNewConvertToJsonlineLogs(t *testing.T) {
Convey("When constructing converter with unsupported encoding", t, func() {
_, err := NewConverter(ProtocolJsonline, EncodingNone, nil, nil)
So(err, ShouldNotBeNil)
})

Convey("Given a converter with protocol: single, encoding: json, with tag rename and protocol key rename", t, func() {
keyRenameMap := map[string]string{
"k8s.node.ip": "ip",
"host.name": "hostname",
"label": "tag",
"env": "env_tag",
}
protocolKeyRenameMap := map[string]string{
"time": "@timestamp",
}
c, err := NewConverter(ProtocolJsonline, EncodingJSON, keyRenameMap, protocolKeyRenameMap)
So(err, ShouldBeNil)

Convey("When the logGroup is generated from files and from k8s daemonset environment", func() {
*flags.K8sFlag = true
time := []uint32{1662434209, 1662434487}
method := []string{"PUT", "GET"}
status := []string{"200", "404"}
logs := make([]*protocol.Log, 2)
for i := 0; i < 2; i++ {
logs[i] = &protocol.Log{
Time: time[i],
Contents: []*protocol.Log_Content{
{Key: "method", Value: method[i]},
{Key: "status", Value: status[i]},
{Key: "__tag__:__user_defined_id__", Value: "machine"},
{Key: "__tag__:__path__", Value: "/root/test/origin/example.log"},
{Key: "__tag__:_node_name_", Value: "node"},
{Key: "__tag__:_node_ip_", Value: "172.10.1.19"},
{Key: "__tag__:_namespace_", Value: "default"},
{Key: "__tag__:_pod_name_", Value: "container"},
{Key: "__tag__:_pod_uid_", Value: "12AFERR234SG-SBH6D67HJ9-AAD-VF34"},
{Key: "__tag__:_container_name_", Value: "container"},
{Key: "__tag__:_container_ip_", Value: "172.10.0.45"},
{Key: "__tag__:_image_name_", Value: "image"},
{Key: "__tag__:label", Value: "tag"},
{Key: "__log_topic__", Value: "file"},
},
}
}
tags := []*protocol.LogTag{
{Key: "__hostname__", Value: "alje834hgf"},
{Key: "__pack_id__", Value: "AEDCFGHNJUIOPLMN-1E"},
{Key: "env", Value: "K8S"},
}
logGroup := &protocol.LogGroup{
Logs: logs,
Category: "test",
Topic: "file",
Source: "172.10.0.56",
LogTags: tags,
}

Convey("Then the converted log should be valid", func() {
b, err := c.ToByteStream(logGroup)
So(err, ShouldBeNil)

for _, s := range bytes.Split(b.([]byte), []byte("\n")) {
unmarshaledLog := make(map[string]interface{})
err = json.Unmarshal(s, &unmarshaledLog)
So(err, ShouldBeNil)
So(unmarshaledLog, ShouldContainKey, "method")
So(unmarshaledLog, ShouldContainKey, "@timestamp")
So(unmarshaledLog, ShouldContainKey, "log.file.path")
So(unmarshaledLog, ShouldContainKey, "hostname")
So(unmarshaledLog, ShouldContainKey, "host.ip")
So(unmarshaledLog, ShouldContainKey, "log.topic")
So(unmarshaledLog, ShouldContainKey, "ip")
So(unmarshaledLog, ShouldContainKey, "k8s.node.name")
So(unmarshaledLog, ShouldContainKey, "k8s.namespace.name")
So(unmarshaledLog, ShouldContainKey, "k8s.pod.name")
So(unmarshaledLog, ShouldContainKey, "k8s.pod.uid")
So(unmarshaledLog, ShouldContainKey, "k8s.container.name")
So(unmarshaledLog, ShouldContainKey, "k8s.container.ip")
So(unmarshaledLog, ShouldContainKey, "k8s.container.image.name")
So(unmarshaledLog, ShouldContainKey, "tag")
So(unmarshaledLog, ShouldContainKey, "env_tag")
}
})

Convey("Then the corresponding value of the required fields are returned correctly", func() {
_, values, err := c.ToByteStreamWithSelectedFields(logGroup, []string{"content.method", "tag.host.name", "tag.ip"})
So(err, ShouldBeNil)
So(values, ShouldHaveLength, 0)
})
})
})

Convey("Given a converter with protocol: single, encoding: json, with null tag rename", t, func() {
keyRenameMap := map[string]string{
"k8s.node.ip": "",
"host.name": "",
"label": "",
"env": "",
}
c, err := NewConverter(ProtocolJsonline, EncodingJSON, keyRenameMap, nil)
So(err, ShouldBeNil)

Convey("When the logGroup is generated from files and from k8s daemonset environment", func() {
*flags.K8sFlag = true
time := []uint32{1662434209, 1662434487}
method := []string{"PUT", "GET"}
status := []string{"200", "404"}
logs := make([]*protocol.Log, 2)
for i := 0; i < 2; i++ {
logs[i] = &protocol.Log{
Time: time[i],
Contents: []*protocol.Log_Content{
{Key: "method", Value: method[i]},
{Key: "status", Value: status[i]},
{Key: "__tag__:__user_defined_id__", Value: "machine"},
{Key: "__tag__:__path__", Value: "/root/test/origin/example.log"},
{Key: "__tag__:_node_name_", Value: "node"},
{Key: "__tag__:_node_ip_", Value: "172.10.1.19"},
{Key: "__tag__:_namespace_", Value: "default"},
{Key: "__tag__:_pod_name_", Value: "container"},
{Key: "__tag__:_pod_uid_", Value: "12AFERR234SG-SBH6D67HJ9-AAD-VF34"},
{Key: "__tag__:_container_name_", Value: "container"},
{Key: "__tag__:_container_ip_", Value: "172.10.0.45"},
{Key: "__tag__:_image_name_", Value: "image"},
{Key: "__tag__:label", Value: "tag"},
{Key: "__log_topic__", Value: "file"},
},
}
}
tags := []*protocol.LogTag{
{Key: "__hostname__", Value: "alje834hgf"},
{Key: "__pack_id__", Value: "AEDCFGHNJUIOPLMN-1E"},
{Key: "env", Value: "K8S"},
}
logGroup := &protocol.LogGroup{
Logs: logs,
Category: "test",
Topic: "file",
Source: "172.10.0.56",
LogTags: tags,
}

Convey("Then the converted log should be valid", func() {
b, err := c.ToByteStream(logGroup)
So(err, ShouldBeNil)

for _, s := range bytes.Split(b.([]byte), []byte("\n")) {
unmarshaledLog := make(map[string]interface{})
err = json.Unmarshal(s, &unmarshaledLog)
So(err, ShouldBeNil)
So(unmarshaledLog, ShouldContainKey, "time")
So(unmarshaledLog, ShouldContainKey, "method")
So(unmarshaledLog, ShouldContainKey, "status")
So(unmarshaledLog, ShouldContainKey, "log.file.path")
So(unmarshaledLog, ShouldContainKey, "host.ip")
So(unmarshaledLog, ShouldContainKey, "log.topic")
So(unmarshaledLog, ShouldContainKey, "k8s.node.name")
So(unmarshaledLog, ShouldContainKey, "k8s.namespace.name")
So(unmarshaledLog, ShouldContainKey, "k8s.pod.name")
So(unmarshaledLog, ShouldContainKey, "k8s.pod.uid")
So(unmarshaledLog, ShouldContainKey, "k8s.container.name")
So(unmarshaledLog, ShouldContainKey, "k8s.container.ip")
So(unmarshaledLog, ShouldContainKey, "k8s.container.image.name")
}
})
})

Convey("When the log is standardized", func() {
*flags.K8sFlag = true
time := []uint32{1662434209, 1662434487}
method := []string{"PUT", "GET"}
status := []string{"200", "404"}
logs := make([]*protocol.Log, 2)
for i := 0; i < 2; i++ {
logs[i] = &protocol.Log{
Time: time[i],
Contents: []*protocol.Log_Content{
{Key: "method", Value: method[i]},
{Key: "status", Value: status[i]},
},
}
}
tags := []*protocol.LogTag{
{Key: "__user_defined_id__", Value: "machine"},
{Key: "__hostname__", Value: "alje834hgf"},
{Key: "__pack_id__", Value: "AEDCFGHNJUIOPLMN-1E"},
{Key: "__path__", Value: "/root/test/origin/example.log"},
{Key: "_node_name_", Value: "node"},
{Key: "_node_ip_", Value: "172.10.1.19"},
{Key: "_namespace_", Value: "default"},
{Key: "_pod_name_", Value: "container"},
{Key: "_pod_uid_", Value: "12AFERR234SG-SBH6D67HJ9-AAD-VF34"},
{Key: "_container_name_", Value: "container"},
{Key: "_container_ip_", Value: "172.10.0.45"},
{Key: "_image_name_", Value: "image"},
{Key: "label", Value: "tag"},
}
logGroup := &protocol.LogGroup{
Logs: logs,
Category: "test",
Topic: "topic",
Source: "172.10.0.56",
LogTags: tags,
}

Convey("Then the converted log should be valid", func() {
b, err := c.ToByteStream(logGroup)
So(err, ShouldBeNil)

for _, s := range bytes.Split(b.([]byte), []byte("\n")) {
unmarshaledLog := make(map[string]interface{})
err = json.Unmarshal(s, &unmarshaledLog)
So(err, ShouldBeNil)
So(unmarshaledLog, ShouldContainKey, "time")
So(unmarshaledLog, ShouldContainKey, "method")
So(unmarshaledLog, ShouldContainKey, "status")
So(unmarshaledLog, ShouldContainKey, "log.file.path")
So(unmarshaledLog, ShouldContainKey, "host.ip")
So(unmarshaledLog, ShouldContainKey, "log.topic")
So(unmarshaledLog, ShouldContainKey, "k8s.node.name")
So(unmarshaledLog, ShouldContainKey, "k8s.namespace.name")
So(unmarshaledLog, ShouldContainKey, "k8s.pod.name")
So(unmarshaledLog, ShouldContainKey, "k8s.pod.uid")
So(unmarshaledLog, ShouldContainKey, "k8s.container.name")
So(unmarshaledLog, ShouldContainKey, "k8s.container.ip")
So(unmarshaledLog, ShouldContainKey, "k8s.container.image.name")
}
})
})
})
}
2 changes: 1 addition & 1 deletion plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (f *FlusherHTTP) initRequestInterceptors(transport http.RoundTripper) (http
}

func (f *FlusherHTTP) getConverter() (*converter.Converter, error) {
return converter.NewConverterWithSep(f.Convert.Protocol, f.Convert.Encoding, f.Convert.Separator, f.Convert.IgnoreUnExpectedData, nil, nil)
return converter.NewConverterWithSep(f.Convert.Protocol, f.Convert.Encoding, f.Convert.Separator, f.Convert.IgnoreUnExpectedData, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename)
}

func (f *FlusherHTTP) addTask(log interface{}) {
Expand Down

0 comments on commit 9f0b7b7

Please sign in to comment.