Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement communication between dubbo-go cli and kitex srv #26

Merged
merged 7 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317 h1:SReMVmTCeJ5Nf0h
github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/bytedance/mockey v1.2.0/go.mod h1:+Jm/fzWZAuhEDrPXVjDf/jLM2BlLXJkwk94zf2JZ3X4=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.8 h1:Kj4AYbZSeENfyXicsYppYKO0K2YWab+i2UTSY7Ukz9Q=
github.com/bytedance/sonic v1.8.8/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chenzhuoyu/iasm v0.0.0-20220818063314-28c361dae733/go.mod h1:wOQ0nsbeOLa2awv8bUYFW/EHXbjQMlZ10fAlXDB2sz8=
github.com/chenzhuoyu/iasm v0.0.0-20230222070914-0b1b64b0e762/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog=
Expand All @@ -93,6 +95,7 @@ github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86c
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/configmanager v0.2.0 h1:niVpVg+wQ+npNqnH3dup96SMbR02Pk+tNErubYCJqKo=
github.com/cloudwego/configmanager v0.2.0/go.mod h1:FLIQTjxsZRGjnmDhTttWQTy6f6DghPTatfBVOs2gQLk=
github.com/cloudwego/dynamicgo v0.1.0/go.mod h1:Mdsz0XGsIImi15vxhZaHZpspNChEmBMIiWkUfD6JDKg=
github.com/cloudwego/fastpb v0.0.3/go.mod h1:/V13XFTq2TUkxj2qWReV8MwfPC4NnPcy6FsrojnsSG0=
Expand All @@ -110,6 +113,7 @@ github.com/cloudwego/netpoll v0.4.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY5
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cloudwego/thriftgo v0.2.4/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.7/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.11 h1:uwFyTMBwmBJKpwxRdBvn46aHEVJJSgxkHo93RN0r3fw=
github.com/cloudwego/thriftgo v0.2.11/go.mod h1:dAyXHEmKXo0LfMCrblVEY3mUZsdeuA5+i0vF5f09j7E=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -300,6 +304,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.1.0/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -436,6 +441,7 @@ github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcy
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/v2pro/plz v0.0.0-20221028024117-e5f9aec5b631/go.mod h1:3gacX+hQo+xvl0vtLqCMufzxuNCwt4geAVOMt2LQYfE=
github.com/v2pro/quokka v0.0.0-20171201153428-382cb39c6ee6/go.mod h1:0VP5W9AFNVWU8C1QLNeVg8TvzoEkIHWZ4vxtxEVFWUY=
Expand Down Expand Up @@ -479,6 +485,7 @@ go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.0.0-20220722155209-00200b7164a7/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.2.0 h1:W1sUEHXiJTfjaFJ5SLo0N6lZn+0eO5gWD1MFeTGqQEY=
golang.org/x/arch v0.2.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
194 changes: 185 additions & 9 deletions pkg/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package hessian2

import (
"context"
"errors"
"fmt"

hessian "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go-hessian2/java_exception"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/codec"
"github.com/kitex-contrib/codec-hessian2/pkg/dubbo"
"github.com/kitex-contrib/codec-hessian2/pkg/iface"
)
Expand All @@ -46,12 +49,29 @@ func (m *Hessian2Codec) Name() string {

// Marshal encode method
func (m *Hessian2Codec) Encode(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
payload, err := m.buildPayload(ctx, message)
var payload []byte
var err error
var status dubbo.StatusCode
msgType := message.MessageType()
switch msgType {
case remote.Call, remote.Oneway:
payload, err = m.encodeRequestPayload(ctx, message)
case remote.Exception:
payload, err = m.encodeExceptionPayload(ctx, message)
// use StatusOK by default, regardless of whether it is Reply or Exception
status = dubbo.StatusOK
case remote.Reply:
payload, err = m.encodeResponsePayload(ctx, message)
status = dubbo.StatusOK
default:
return fmt.Errorf("unsupported MessageType: %v", msgType)
}

if err != nil {
return err
}

header := m.buildDubboHeader(message, len(payload))
header := m.buildDubboHeader(message, status, len(payload))

// write header
if err := header.Encode(out); err != nil {
Expand All @@ -65,7 +85,7 @@ func (m *Hessian2Codec) Encode(ctx context.Context, message remote.Message, out
return nil
}

func (m *Hessian2Codec) buildPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
func (m *Hessian2Codec) encodeRequestPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()

service := &dubbo.Service{
Expand Down Expand Up @@ -94,13 +114,85 @@ func (m *Hessian2Codec) buildPayload(ctx context.Context, message remote.Message
return encoder.Buffer(), nil
}

func (m *Hessian2Codec) buildDubboHeader(message remote.Message, size int) *dubbo.DubboHeader {
func (m *Hessian2Codec) encodeResponsePayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()
var payloadType dubbo.PayloadType
if len(message.Tags()) != 0 {
payloadType = dubbo.RESPONSE_VALUE_WITH_ATTACHMENTS
} else {
payloadType = dubbo.RESPONSE_VALUE
}

if err := encoder.Encode(payloadType); err != nil {
return nil, err
}

// encode data
data, ok := message.Data().(iface.Message)
if !ok {
return nil, fmt.Errorf("invalid data: not hessian2.MessageWriter")
}

if err := data.Encode(encoder); err != nil {
return nil, err
}

// encode attachments if needed
if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := encoder.Encode(message.Tags()); err != nil {
return nil, err
}
}

return encoder.Buffer(), nil
}

func (m *Hessian2Codec) encodeExceptionPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()
var payloadType dubbo.PayloadType
if len(message.Tags()) != 0 {
payloadType = dubbo.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
} else {
payloadType = dubbo.RESPONSE_WITH_EXCEPTION
}
DMwangnima marked this conversation as resolved.
Show resolved Hide resolved

if err := encoder.Encode(payloadType); err != nil {
return nil, err
}

// encode exception
data := message.Data()
errRaw, ok := data.(error)
if !ok {
return nil, fmt.Errorf("%v exception does not implement Error", data)
}
if exception, ok := data.(java_exception.Throwabler); ok {
if err := encoder.Encode(exception); err != nil {
return nil, err
}
} else {
if err := encoder.Encode(java_exception.NewException(errRaw.Error())); err != nil {
return nil, err
}
}

if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := encoder.Encode(message.Tags()); err != nil {
return nil, err
}
}

return encoder.Buffer(), nil
}

func (m *Hessian2Codec) buildDubboHeader(message remote.Message, status dubbo.StatusCode, size int) *dubbo.DubboHeader {
msgType := message.MessageType()
return &dubbo.DubboHeader{
IsRequest: msgType == remote.Call || msgType == remote.Oneway,
IsEvent: false,
IsOneWay: msgType == remote.Oneway,
SerializationID: dubbo.SERIALIZATION_ID_HESSIAN,
Status: status,
RequestID: uint64(message.RPCInfo().Invocation().SeqID()),
DataLength: uint32(size),
}
Expand Down Expand Up @@ -151,32 +243,98 @@ func (m *Hessian2Codec) Decode(ctx context.Context, message remote.Message, in r
if err := header.Decode(in); err != nil {
return err
}
if err := codec.SetOrCheckSeqID(int32(header.RequestID), message); err != nil {
return err
}

// parse body part
body, err := in.Peek(int(header.DataLength))
if header.IsRequest {
return m.decodeRequestBody(ctx, header, message, in)
}
return m.decodeResponseBody(ctx, header, message, in)
}

func (m *Hessian2Codec) decodeRequestBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
length := int(header.DataLength)
if in.ReadableLen() < length {
return errors.New("invalid dubbo package with body length being less than header specified")
}
body, err := in.Next(length)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
service := new(dubbo.Service)
if err := service.Decode(decoder); err != nil {
return err
}

// decode payload
// there is no need to make use of types
if _, err = decoder.Decode(); err != nil {
return err
}
if err := codec.NewDataIfNeeded(service.Method, message); err != nil {
return err
}
arg, ok := message.Data().(iface.Message)
if !ok {
return fmt.Errorf("invalid data: not hessian2.MessageReader")
}
if err := arg.Decode(decoder); err != nil {
return err
}
if err := codec.SetOrCheckMethodName(service.Method, message); err != nil {
return err
}

if err := processAttachments(decoder, message); err != nil {
return err
}

return nil
}

func (m *Hessian2Codec) decodeResponseBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
length := int(header.DataLength)
if in.ReadableLen() < length {
return errors.New("invalid dubbo package with body length being less than header specified")
}
body, err := in.Next(length)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
payloadType, err := decoder.Decode()
payloadType, err := dubbo.DecodePayloadType(decoder)
if err != nil {
return err
}
switch payloadType {
// todo: processing other payload types with attachments
case dubbo.RESPONSE_VALUE:
case dubbo.RESPONSE_VALUE, dubbo.RESPONSE_VALUE_WITH_ATTACHMENTS:
msg, ok := message.Data().(iface.Message)
if !ok {
return fmt.Errorf("invalid data %v: not hessian2.MessageReader", msg)
}
if err := msg.Decode(decoder); err != nil {
return err
}
case dubbo.RESPONSE_WITH_EXCEPTION:
if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
}
case dubbo.RESPONSE_WITH_EXCEPTION, dubbo.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
exception, err := decoder.Decode()
if err != nil {
return err
}
if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
}
if exceptionErr, ok := exception.(error); ok {
return exceptionErr
}
Expand All @@ -186,3 +344,21 @@ func (m *Hessian2Codec) Decode(ctx context.Context, message remote.Message, in r
}
return nil
}

func processAttachments(decoder iface.Decoder, message remote.Message) error {
// decode attachments
attachmentsRaw, err := decoder.Decode()
if err != nil {
return err
}
if attachments, ok := attachmentsRaw.(map[interface{}]interface{}); ok {
for keyRaw, val := range attachments {
if key, ok := keyRaw.(string); ok {
message.Tags()[key] = val
}
}
return nil
}

return fmt.Errorf("unsupported attachments: %v", attachmentsRaw)
}
19 changes: 10 additions & 9 deletions pkg/dubbo/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ const (
MAGIC_HIGH = 0xda
MAGIC_LOW = 0xbb

IS_REQUEST = 1
IS_RESPONSE = 0
REQUEST_BIT_SHIT = 7
IS_REQUEST = 1
IS_RESPONSE = 0
REQUEST_BIT_SHIFT = 7

IS_ONEWAY = 0
IS_PINGPONG = 1
Expand Down Expand Up @@ -78,7 +78,7 @@ type DubboHeader struct {

func (h *DubboHeader) RequestResponseByte() byte {
if h.IsRequest {
return IS_REQUEST << REQUEST_BIT_SHIT
return IS_REQUEST << REQUEST_BIT_SHIFT
}
return 0
}
Expand Down Expand Up @@ -117,10 +117,11 @@ func (h *DubboHeader) DecodeFromByteSlice(buf []byte) error {
if buf[0] != MAGIC_HIGH || buf[1] != MAGIC_LOW {
return ErrInvalidHeader
}
h.IsRequest = isRequest(buf[3])
h.IsOneWay = isOneWay(buf[3])
h.IsEvent = isEvent(buf[3])
h.SerializationID = getSerializationID(buf[3])
h.IsRequest = isRequest(buf[2])
h.IsOneWay = isOneWay(buf[2])
h.IsEvent = isEvent(buf[2])
h.SerializationID = getSerializationID(buf[2])
// todo: process status
h.RequestID = binary.BigEndian.Uint64(buf[4:12])
h.DataLength = binary.BigEndian.Uint32(buf[12:])
return nil
Expand All @@ -143,7 +144,7 @@ func BitTest(b byte, shift int, expected byte) bool {
}

func isRequest(b byte) bool {
return BitTest(b, REQUEST_BIT_SHIT, IS_REQUEST)
return BitTest(b, REQUEST_BIT_SHIFT, IS_REQUEST)
}

func isOneWay(b byte) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dubbo/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDubboHeader_RequestResponseByte(t *testing.T) {
fields: fields{
IsRequest: true,
},
want: IS_REQUEST << REQUEST_BIT_SHIT,
want: IS_REQUEST << REQUEST_BIT_SHIFT,
},
{
name: "response",
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestDubboHeader_EncodeToByteSlice(t *testing.T) {
want: []byte{
MAGIC_HIGH,
MAGIC_LOW,
(IS_REQUEST << REQUEST_BIT_SHIT) | (IS_PINGPONG << ONEWAY_BIT_SHIFT) | (IS_EVENT << EVENT_BIT_SHIFT) | SERIALIZATION_ID_HESSIAN,
(IS_REQUEST << REQUEST_BIT_SHIFT) | (IS_PINGPONG << ONEWAY_BIT_SHIFT) | (IS_EVENT << EVENT_BIT_SHIFT) | SERIALIZATION_ID_HESSIAN,
byte(StatusOK),
0x12, 0x34, 0x56, 0x78, 0x87, 0x65, 0x43, 0x21,
0x12, 0x34, 0x43, 0x21,
Expand Down
Loading
Loading