Skip to content

Commit

Permalink
Merge pull request #26 from DMwangnima/main
Browse files Browse the repository at this point in the history
feat: implement communication between dubbo-go cli and kitex srv
  • Loading branch information
felix021 committed Aug 7, 2023
2 parents e8180ca + 27b03f2 commit 1141a29
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 33 deletions.
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317 h1:SReMVmTCeJ5Nf0h
github.com/bytedance/gopkg v0.0.0-20230531144706-a12972768317/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/bytedance/mockey v1.2.0/go.mod h1:+Jm/fzWZAuhEDrPXVjDf/jLM2BlLXJkwk94zf2JZ3X4=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.8 h1:Kj4AYbZSeENfyXicsYppYKO0K2YWab+i2UTSY7Ukz9Q=
github.com/bytedance/sonic v1.8.8/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chenzhuoyu/iasm v0.0.0-20220818063314-28c361dae733/go.mod h1:wOQ0nsbeOLa2awv8bUYFW/EHXbjQMlZ10fAlXDB2sz8=
github.com/chenzhuoyu/iasm v0.0.0-20230222070914-0b1b64b0e762/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog=
Expand All @@ -93,6 +95,7 @@ github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86c
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/configmanager v0.2.0 h1:niVpVg+wQ+npNqnH3dup96SMbR02Pk+tNErubYCJqKo=
github.com/cloudwego/configmanager v0.2.0/go.mod h1:FLIQTjxsZRGjnmDhTttWQTy6f6DghPTatfBVOs2gQLk=
github.com/cloudwego/dynamicgo v0.1.0/go.mod h1:Mdsz0XGsIImi15vxhZaHZpspNChEmBMIiWkUfD6JDKg=
github.com/cloudwego/fastpb v0.0.3/go.mod h1:/V13XFTq2TUkxj2qWReV8MwfPC4NnPcy6FsrojnsSG0=
Expand All @@ -110,6 +113,7 @@ github.com/cloudwego/netpoll v0.4.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY5
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cloudwego/thriftgo v0.2.4/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.7/go.mod h1:8i9AF5uDdWHGqzUhXDlubCjx4MEfKvWXGQlMWyR0tM4=
github.com/cloudwego/thriftgo v0.2.11 h1:uwFyTMBwmBJKpwxRdBvn46aHEVJJSgxkHo93RN0r3fw=
github.com/cloudwego/thriftgo v0.2.11/go.mod h1:dAyXHEmKXo0LfMCrblVEY3mUZsdeuA5+i0vF5f09j7E=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -300,6 +304,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.1.0/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -436,6 +441,7 @@ github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcy
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/v2pro/plz v0.0.0-20221028024117-e5f9aec5b631/go.mod h1:3gacX+hQo+xvl0vtLqCMufzxuNCwt4geAVOMt2LQYfE=
github.com/v2pro/quokka v0.0.0-20171201153428-382cb39c6ee6/go.mod h1:0VP5W9AFNVWU8C1QLNeVg8TvzoEkIHWZ4vxtxEVFWUY=
Expand Down Expand Up @@ -479,6 +485,7 @@ go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.0.0-20220722155209-00200b7164a7/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.2.0 h1:W1sUEHXiJTfjaFJ5SLo0N6lZn+0eO5gWD1MFeTGqQEY=
golang.org/x/arch v0.2.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
194 changes: 185 additions & 9 deletions pkg/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package hessian2

import (
"context"
"errors"
"fmt"

hessian "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go-hessian2/java_exception"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/codec"
"github.com/kitex-contrib/codec-hessian2/pkg/dubbo"
"github.com/kitex-contrib/codec-hessian2/pkg/iface"
)
Expand All @@ -46,12 +49,29 @@ func (m *Hessian2Codec) Name() string {

// Marshal encode method
func (m *Hessian2Codec) Encode(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
payload, err := m.buildPayload(ctx, message)
var payload []byte
var err error
var status dubbo.StatusCode
msgType := message.MessageType()
switch msgType {
case remote.Call, remote.Oneway:
payload, err = m.encodeRequestPayload(ctx, message)
case remote.Exception:
payload, err = m.encodeExceptionPayload(ctx, message)
// use StatusOK by default, regardless of whether it is Reply or Exception
status = dubbo.StatusOK
case remote.Reply:
payload, err = m.encodeResponsePayload(ctx, message)
status = dubbo.StatusOK
default:
return fmt.Errorf("unsupported MessageType: %v", msgType)
}

if err != nil {
return err
}

header := m.buildDubboHeader(message, len(payload))
header := m.buildDubboHeader(message, status, len(payload))

// write header
if err := header.Encode(out); err != nil {
Expand All @@ -65,7 +85,7 @@ func (m *Hessian2Codec) Encode(ctx context.Context, message remote.Message, out
return nil
}

func (m *Hessian2Codec) buildPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
func (m *Hessian2Codec) encodeRequestPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()

service := &dubbo.Service{
Expand Down Expand Up @@ -94,13 +114,85 @@ func (m *Hessian2Codec) buildPayload(ctx context.Context, message remote.Message
return encoder.Buffer(), nil
}

func (m *Hessian2Codec) buildDubboHeader(message remote.Message, size int) *dubbo.DubboHeader {
func (m *Hessian2Codec) encodeResponsePayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()
var payloadType dubbo.PayloadType
if len(message.Tags()) != 0 {
payloadType = dubbo.RESPONSE_VALUE_WITH_ATTACHMENTS
} else {
payloadType = dubbo.RESPONSE_VALUE
}

if err := encoder.Encode(payloadType); err != nil {
return nil, err
}

// encode data
data, ok := message.Data().(iface.Message)
if !ok {
return nil, fmt.Errorf("invalid data: not hessian2.MessageWriter")
}

if err := data.Encode(encoder); err != nil {
return nil, err
}

// encode attachments if needed
if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := encoder.Encode(message.Tags()); err != nil {
return nil, err
}
}

return encoder.Buffer(), nil
}

func (m *Hessian2Codec) encodeExceptionPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()
var payloadType dubbo.PayloadType
if len(message.Tags()) != 0 {
payloadType = dubbo.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
} else {
payloadType = dubbo.RESPONSE_WITH_EXCEPTION
}

if err := encoder.Encode(payloadType); err != nil {
return nil, err
}

// encode exception
data := message.Data()
errRaw, ok := data.(error)
if !ok {
return nil, fmt.Errorf("%v exception does not implement Error", data)
}
if exception, ok := data.(java_exception.Throwabler); ok {
if err := encoder.Encode(exception); err != nil {
return nil, err
}
} else {
if err := encoder.Encode(java_exception.NewException(errRaw.Error())); err != nil {
return nil, err
}
}

if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := encoder.Encode(message.Tags()); err != nil {
return nil, err
}
}

return encoder.Buffer(), nil
}

func (m *Hessian2Codec) buildDubboHeader(message remote.Message, status dubbo.StatusCode, size int) *dubbo.DubboHeader {
msgType := message.MessageType()
return &dubbo.DubboHeader{
IsRequest: msgType == remote.Call || msgType == remote.Oneway,
IsEvent: false,
IsOneWay: msgType == remote.Oneway,
SerializationID: dubbo.SERIALIZATION_ID_HESSIAN,
Status: status,
RequestID: uint64(message.RPCInfo().Invocation().SeqID()),
DataLength: uint32(size),
}
Expand Down Expand Up @@ -151,32 +243,98 @@ func (m *Hessian2Codec) Decode(ctx context.Context, message remote.Message, in r
if err := header.Decode(in); err != nil {
return err
}
if err := codec.SetOrCheckSeqID(int32(header.RequestID), message); err != nil {
return err
}

// parse body part
body, err := in.Peek(int(header.DataLength))
if header.IsRequest {
return m.decodeRequestBody(ctx, header, message, in)
}
return m.decodeResponseBody(ctx, header, message, in)
}

func (m *Hessian2Codec) decodeRequestBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
length := int(header.DataLength)
if in.ReadableLen() < length {
return errors.New("invalid dubbo package with body length being less than header specified")
}
body, err := in.Next(length)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
service := new(dubbo.Service)
if err := service.Decode(decoder); err != nil {
return err
}

// decode payload
// there is no need to make use of types
if _, err = decoder.Decode(); err != nil {
return err
}
if err := codec.NewDataIfNeeded(service.Method, message); err != nil {
return err
}
arg, ok := message.Data().(iface.Message)
if !ok {
return fmt.Errorf("invalid data: not hessian2.MessageReader")
}
if err := arg.Decode(decoder); err != nil {
return err
}
if err := codec.SetOrCheckMethodName(service.Method, message); err != nil {
return err
}

if err := processAttachments(decoder, message); err != nil {
return err
}

return nil
}

func (m *Hessian2Codec) decodeResponseBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
length := int(header.DataLength)
if in.ReadableLen() < length {
return errors.New("invalid dubbo package with body length being less than header specified")
}
body, err := in.Next(length)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
payloadType, err := decoder.Decode()
payloadType, err := dubbo.DecodePayloadType(decoder)
if err != nil {
return err
}
switch payloadType {
// todo: processing other payload types with attachments
case dubbo.RESPONSE_VALUE:
case dubbo.RESPONSE_VALUE, dubbo.RESPONSE_VALUE_WITH_ATTACHMENTS:
msg, ok := message.Data().(iface.Message)
if !ok {
return fmt.Errorf("invalid data %v: not hessian2.MessageReader", msg)
}
if err := msg.Decode(decoder); err != nil {
return err
}
case dubbo.RESPONSE_WITH_EXCEPTION:
if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
}
case dubbo.RESPONSE_WITH_EXCEPTION, dubbo.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
exception, err := decoder.Decode()
if err != nil {
return err
}
if dubbo.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
}
if exceptionErr, ok := exception.(error); ok {
return exceptionErr
}
Expand All @@ -186,3 +344,21 @@ func (m *Hessian2Codec) Decode(ctx context.Context, message remote.Message, in r
}
return nil
}

func processAttachments(decoder iface.Decoder, message remote.Message) error {
// decode attachments
attachmentsRaw, err := decoder.Decode()
if err != nil {
return err
}
if attachments, ok := attachmentsRaw.(map[interface{}]interface{}); ok {
for keyRaw, val := range attachments {
if key, ok := keyRaw.(string); ok {
message.Tags()[key] = val
}
}
return nil
}

return fmt.Errorf("unsupported attachments: %v", attachmentsRaw)
}
19 changes: 10 additions & 9 deletions pkg/dubbo/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ const (
MAGIC_HIGH = 0xda
MAGIC_LOW = 0xbb

IS_REQUEST = 1
IS_RESPONSE = 0
REQUEST_BIT_SHIT = 7
IS_REQUEST = 1
IS_RESPONSE = 0
REQUEST_BIT_SHIFT = 7

IS_ONEWAY = 0
IS_PINGPONG = 1
Expand Down Expand Up @@ -78,7 +78,7 @@ type DubboHeader struct {

func (h *DubboHeader) RequestResponseByte() byte {
if h.IsRequest {
return IS_REQUEST << REQUEST_BIT_SHIT
return IS_REQUEST << REQUEST_BIT_SHIFT
}
return 0
}
Expand Down Expand Up @@ -117,10 +117,11 @@ func (h *DubboHeader) DecodeFromByteSlice(buf []byte) error {
if buf[0] != MAGIC_HIGH || buf[1] != MAGIC_LOW {
return ErrInvalidHeader
}
h.IsRequest = isRequest(buf[3])
h.IsOneWay = isOneWay(buf[3])
h.IsEvent = isEvent(buf[3])
h.SerializationID = getSerializationID(buf[3])
h.IsRequest = isRequest(buf[2])
h.IsOneWay = isOneWay(buf[2])
h.IsEvent = isEvent(buf[2])
h.SerializationID = getSerializationID(buf[2])
// todo: process status
h.RequestID = binary.BigEndian.Uint64(buf[4:12])
h.DataLength = binary.BigEndian.Uint32(buf[12:])
return nil
Expand All @@ -143,7 +144,7 @@ func BitTest(b byte, shift int, expected byte) bool {
}

func isRequest(b byte) bool {
return BitTest(b, REQUEST_BIT_SHIT, IS_REQUEST)
return BitTest(b, REQUEST_BIT_SHIFT, IS_REQUEST)
}

func isOneWay(b byte) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dubbo/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDubboHeader_RequestResponseByte(t *testing.T) {
fields: fields{
IsRequest: true,
},
want: IS_REQUEST << REQUEST_BIT_SHIT,
want: IS_REQUEST << REQUEST_BIT_SHIFT,
},
{
name: "response",
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestDubboHeader_EncodeToByteSlice(t *testing.T) {
want: []byte{
MAGIC_HIGH,
MAGIC_LOW,
(IS_REQUEST << REQUEST_BIT_SHIT) | (IS_PINGPONG << ONEWAY_BIT_SHIFT) | (IS_EVENT << EVENT_BIT_SHIFT) | SERIALIZATION_ID_HESSIAN,
(IS_REQUEST << REQUEST_BIT_SHIFT) | (IS_PINGPONG << ONEWAY_BIT_SHIFT) | (IS_EVENT << EVENT_BIT_SHIFT) | SERIALIZATION_ID_HESSIAN,
byte(StatusOK),
0x12, 0x34, 0x56, 0x78, 0x87, 0x65, 0x43, 0x21,
0x12, 0x34, 0x43, 0x21,
Expand Down
Loading

0 comments on commit 1141a29

Please sign in to comment.