Skip to content

Commit

Permalink
feat: change related module and struct name from hessian2 to dubbo, e…
Browse files Browse the repository at this point in the history
…nhance exception support
  • Loading branch information
DMwangnima committed Aug 16, 2023
1 parent cb3ca9c commit bab3919
Show file tree
Hide file tree
Showing 28 changed files with 123 additions and 97 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# codec-hessian2
# codec-dubbo


---

> **Notice: When decoding, the java version of hessian will default skip and ignore non-exist fields.**

It's a golang hessian library used by kitex (https://github.com/cloudwego/kitex).
It's a golang dubbo library used by kitex (https://github.com/cloudwego/kitex).


## Feature List
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/kitex-contrib/codec-hessian2
module github.com/kitex-contrib/codec-dubbo

go 1.13

Expand Down
2 changes: 1 addition & 1 deletion go_format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# format go imports style
go install golang.org/x/tools/cmd/goimports
goimports -local [email protected]/kitex-contrib/codec-hessian2 -w .
goimports -local [email protected]/kitex-contrib/codec-dubbo -w .

# format licence style
go install github.com/apache/skywalking-eyes/cmd/license-eye@latest
Expand Down
134 changes: 80 additions & 54 deletions pkg/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,59 @@
* limitations under the License.
*/

package hessian2
package dubbo

import (
"context"
"errors"
"fmt"

commons "github.com/kitex-contrib/codec-hessian2/pkg/common"
commons "github.com/kitex-contrib/codec-dubbo/pkg/common"

hessian "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go-hessian2/java_exception"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/codec"
"github.com/kitex-contrib/codec-hessian2/pkg/dubbo"
"github.com/kitex-contrib/codec-hessian2/pkg/iface"
"github.com/kitex-contrib/codec-dubbo/pkg/dubbo_spec"
"github.com/kitex-contrib/codec-dubbo/pkg/iface"
)

var _ remote.Codec = (*Hessian2Codec)(nil)
var _ remote.Codec = (*DubboCodec)(nil)

// Hessian2Codec NewHessian2Codec creates the hessian2 codec.
type Hessian2Codec struct{}
// DubboCodec NewDubboCodec creates the dubbo codec.
type DubboCodec struct{}

// NewHessian2Codec creates a new codec instance.
func NewHessian2Codec() *Hessian2Codec {
return &Hessian2Codec{}
// NewDubboCodec creates a new codec instance.
func NewDubboCodec() *DubboCodec {
return &DubboCodec{}
}

// Name codec name
func (m *Hessian2Codec) Name() string {
return "hessian2"
func (m *DubboCodec) Name() string {
return "dubbo"
}

// Marshal encode method
func (m *Hessian2Codec) Encode(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
func (m *DubboCodec) Encode(ctx context.Context, message remote.Message, out remote.ByteBuffer) error {
var payload []byte
var err error
var status dubbo.StatusCode
var status dubbo_spec.StatusCode
msgType := message.MessageType()
switch msgType {
case remote.Call, remote.Oneway:
payload, err = m.encodeRequestPayload(ctx, message)
case remote.Exception:
payload, err = m.encodeExceptionPayload(ctx, message)
// use StatusOK by default, regardless of whether it is Reply or Exception
status = dubbo.StatusOK
// todo(DMwangnima): refer to exception processing logic of dubbo-java, use status to determine if this exception
// is in outside layer.(eg. non-exist InterfaceName)
// for now, use StatusOK by default, regardless of whether it is in outside layer.
status = dubbo_spec.StatusOK
case remote.Reply:
payload, err = m.encodeResponsePayload(ctx, message)
status = dubbo.StatusOK
status = dubbo_spec.StatusOK
case remote.Heartbeat:
payload, err = m.encodeHeartbeatPayload(ctx, message)
status = dubbo.StatusOK
status = dubbo_spec.StatusOK
default:
return fmt.Errorf("unsupported MessageType: %v", msgType)
}
Expand All @@ -90,11 +92,11 @@ func (m *Hessian2Codec) Encode(ctx context.Context, message remote.Message, out
return nil
}

func (m *Hessian2Codec) encodeRequestPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
func (m *DubboCodec) encodeRequestPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()

service := &dubbo.Service{
ProtocolVersion: dubbo.DEFAULT_DUBBO_PROTOCOL_VERSION,
service := &dubbo_spec.Service{
ProtocolVersion: dubbo_spec.DEFAULT_DUBBO_PROTOCOL_VERSION,
// todo: should be message.RPCInfo().Invocation.ServiceName
Path: message.RPCInfo().To().ServiceName(),
// todo: kitex mapping
Expand All @@ -119,13 +121,13 @@ func (m *Hessian2Codec) encodeRequestPayload(ctx context.Context, message remote
return encoder.Buffer(), nil
}

func (m *Hessian2Codec) encodeResponsePayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
func (m *DubboCodec) encodeResponsePayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()
var payloadType dubbo.PayloadType
var payloadType dubbo_spec.PayloadType
if len(message.Tags()) != 0 {
payloadType = dubbo.RESPONSE_VALUE_WITH_ATTACHMENTS
payloadType = dubbo_spec.RESPONSE_VALUE_WITH_ATTACHMENTS
} else {
payloadType = dubbo.RESPONSE_VALUE
payloadType = dubbo_spec.RESPONSE_VALUE
}

if err := encoder.Encode(payloadType); err != nil {
Expand All @@ -143,7 +145,7 @@ func (m *Hessian2Codec) encodeResponsePayload(ctx context.Context, message remot
}

// encode attachments if needed
if dubbo.IsAttachmentsPayloadType(payloadType) {
if dubbo_spec.IsAttachmentsPayloadType(payloadType) {
if err := encoder.Encode(message.Tags()); err != nil {
return nil, err
}
Expand All @@ -152,13 +154,13 @@ func (m *Hessian2Codec) encodeResponsePayload(ctx context.Context, message remot
return encoder.Buffer(), nil
}

func (m *Hessian2Codec) encodeExceptionPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
func (m *DubboCodec) encodeExceptionPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()
var payloadType dubbo.PayloadType
var payloadType dubbo_spec.PayloadType
if len(message.Tags()) != 0 {
payloadType = dubbo.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
payloadType = dubbo_spec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
} else {
payloadType = dubbo.RESPONSE_WITH_EXCEPTION
payloadType = dubbo_spec.RESPONSE_WITH_EXCEPTION
}

if err := encoder.Encode(payloadType); err != nil {
Expand All @@ -181,7 +183,7 @@ func (m *Hessian2Codec) encodeExceptionPayload(ctx context.Context, message remo
}
}

if dubbo.IsAttachmentsPayloadType(payloadType) {
if dubbo_spec.IsAttachmentsPayloadType(payloadType) {
if err := encoder.Encode(message.Tags()); err != nil {
return nil, err
}
Expand All @@ -196,7 +198,7 @@ func (m *Hessian2Codec) encodeExceptionPayload(ctx context.Context, message remo
// Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)))
// For hessian2, NullByte is 'N'.
// As a result, we need to encode nil in heartbeat response body for both dubbo-go side and dubbo-java side.
func (m *Hessian2Codec) encodeHeartbeatPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
func (m *DubboCodec) encodeHeartbeatPayload(ctx context.Context, message remote.Message) (buf []byte, err error) {
encoder := hessian.NewEncoder()

if err := encoder.Encode(nil); err != nil {
Expand All @@ -206,26 +208,26 @@ func (m *Hessian2Codec) encodeHeartbeatPayload(ctx context.Context, message remo
return encoder.Buffer(), nil
}

func (m *Hessian2Codec) buildDubboHeader(message remote.Message, status dubbo.StatusCode, size int) *dubbo.DubboHeader {
func (m *DubboCodec) buildDubboHeader(message remote.Message, status dubbo_spec.StatusCode, size int) *dubbo_spec.DubboHeader {
msgType := message.MessageType()
return &dubbo.DubboHeader{
return &dubbo_spec.DubboHeader{
IsRequest: msgType == remote.Call || msgType == remote.Oneway,
// todo(DMwangnima): message contains heartbeat information or heartbeat flag passed in
IsEvent: false,
IsOneWay: msgType == remote.Oneway,
SerializationID: dubbo.SERIALIZATION_ID_HESSIAN,
SerializationID: dubbo_spec.SERIALIZATION_ID_HESSIAN,
Status: status,
RequestID: uint64(message.RPCInfo().Invocation().SeqID()),
DataLength: uint32(size),
}
}

func (m *Hessian2Codec) messageData(message remote.Message, e iface.Encoder) error {
func (m *DubboCodec) messageData(message remote.Message, e iface.Encoder) error {
data, ok := message.Data().(iface.Message)
if !ok {
return fmt.Errorf("invalid data: not hessian2.MessageWriter")
}
types, err := dubbo.GetTypes(data)
types, err := dubbo_spec.GetTypes(data)
if err != nil {
return err
}
Expand All @@ -235,7 +237,7 @@ func (m *Hessian2Codec) messageData(message remote.Message, e iface.Encoder) err
return data.Encode(e)
}

func (m *Hessian2Codec) messageServiceInfo(ctx context.Context, service *dubbo.Service, e iface.Encoder) error {
func (m *DubboCodec) messageServiceInfo(ctx context.Context, service *dubbo_spec.Service, e iface.Encoder) error {
if err := e.Encode(service.ProtocolVersion); err != nil {
return err
}
Expand All @@ -251,8 +253,8 @@ func (m *Hessian2Codec) messageServiceInfo(ctx context.Context, service *dubbo.S
return nil
}

func (m *Hessian2Codec) messageAttachment(ctx context.Context, service *dubbo.Service, e iface.Encoder) error {
attachment := dubbo.NewAttachment(
func (m *DubboCodec) messageAttachment(ctx context.Context, service *dubbo_spec.Service, e iface.Encoder) error {
attachment := dubbo_spec.NewAttachment(
service.Path,
service.Group,
service.Path,
Expand All @@ -263,9 +265,9 @@ func (m *Hessian2Codec) messageAttachment(ctx context.Context, service *dubbo.Se
}

// Unmarshal decode method
func (m *Hessian2Codec) Decode(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {
func (m *DubboCodec) Decode(ctx context.Context, message remote.Message, in remote.ByteBuffer) error {
// parse header part
header := new(dubbo.DubboHeader)
header := new(dubbo_spec.DubboHeader)
if err := header.Decode(in); err != nil {
return err
}
Expand All @@ -281,10 +283,14 @@ func (m *Hessian2Codec) Decode(ctx context.Context, message remote.Message, in r
}
return m.decodeRequestBody(ctx, header, message, in)
}

if header.Status != dubbo_spec.StatusOK {
return m.decodeExceptionBody(ctx, header, message, in)
}
return m.decodeResponseBody(ctx, header, message, in)
}

func (m *Hessian2Codec) decodeEventBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
func (m *DubboCodec) decodeEventBody(ctx context.Context, header *dubbo_spec.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
body, err := readBody(header, in)
if err != nil {
return err
Expand All @@ -299,14 +305,14 @@ func (m *Hessian2Codec) decodeEventBody(ctx context.Context, header *dubbo.Dubbo
return nil
}

func (m *Hessian2Codec) decodeRequestBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
func (m *DubboCodec) decodeRequestBody(ctx context.Context, header *dubbo_spec.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
body, err := readBody(header, in)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
service := new(dubbo.Service)
service := new(dubbo_spec.Service)
if err := service.Decode(decoder); err != nil {
return err
}
Expand Down Expand Up @@ -340,38 +346,58 @@ func (m *Hessian2Codec) decodeRequestBody(ctx context.Context, header *dubbo.Dub
return nil
}

func (m *Hessian2Codec) decodeResponseBody(ctx context.Context, header *dubbo.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
// decodeExceptionBody is responsible for processing exception in the outer layer which means business logic
// in the remoting service has not been invoked. (eg. wrong request with non-exist InterfaceName)
func (m *DubboCodec) decodeExceptionBody(ctx context.Context, header *dubbo_spec.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
body, err := readBody(header, in)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
exception, err := decoder.Decode()
if err != nil {
return err
}
exceptionStr, ok := exception.(string)
if !ok {
return fmt.Errorf("exception %v is not of string", exception)
}
return fmt.Errorf("dubbo side exception: %s", exceptionStr)
}

func (m *DubboCodec) decodeResponseBody(ctx context.Context, header *dubbo_spec.DubboHeader, message remote.Message, in remote.ByteBuffer) error {
body, err := readBody(header, in)
if err != nil {
return err
}

decoder := hessian.NewDecoder(body)
payloadType, err := dubbo.DecodePayloadType(decoder)
payloadType, err := dubbo_spec.DecodePayloadType(decoder)
if err != nil {
return err
}
switch payloadType {
case dubbo.RESPONSE_VALUE, dubbo.RESPONSE_VALUE_WITH_ATTACHMENTS:
case dubbo_spec.RESPONSE_VALUE, dubbo_spec.RESPONSE_VALUE_WITH_ATTACHMENTS:
msg, ok := message.Data().(iface.Message)
if !ok {
return fmt.Errorf("invalid data %v: not hessian2.MessageReader", msg)
}
if err := msg.Decode(decoder); err != nil {
return err
}
if dubbo.IsAttachmentsPayloadType(payloadType) {
if dubbo_spec.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
}
// business logic exception
case dubbo.RESPONSE_WITH_EXCEPTION, dubbo.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
case dubbo_spec.RESPONSE_WITH_EXCEPTION, dubbo_spec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
exception, err := decoder.Decode()
if err != nil {
return err
}
if dubbo.IsAttachmentsPayloadType(payloadType) {
if dubbo_spec.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
Expand All @@ -380,8 +406,8 @@ func (m *Hessian2Codec) decodeResponseBody(ctx context.Context, header *dubbo.Du
return exceptionErr
}
return fmt.Errorf("dubbo side exception: %v", exception)
case dubbo.RESPONSE_NULL_VALUE, dubbo.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
if dubbo.IsAttachmentsPayloadType(payloadType) {
case dubbo_spec.RESPONSE_NULL_VALUE, dubbo_spec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
if dubbo_spec.IsAttachmentsPayloadType(payloadType) {
if err := processAttachments(decoder, message); err != nil {
return err
}
Expand Down Expand Up @@ -411,7 +437,7 @@ func processAttachments(decoder iface.Decoder, message remote.Message) error {
return fmt.Errorf("unsupported attachments: %v", attachmentsRaw)
}

func readBody(header *dubbo.DubboHeader, in remote.ByteBuffer) ([]byte, error) {
func readBody(header *dubbo_spec.DubboHeader, in remote.ByteBuffer) ([]byte, error) {
length := int(header.DataLength)
if in.ReadableLen() < length {
return nil, errors.New("invalid dubbo package with body length being less than header specified")
Expand Down
2 changes: 1 addition & 1 deletion pkg/data/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package hessian2

import (
commons "github.com/kitex-contrib/codec-hessian2/pkg/common"
commons "github.com/kitex-contrib/codec-dubbo/pkg/common"
)

// EncodeNull ::= 'N'
Expand Down
4 changes: 2 additions & 2 deletions pkg/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
* limitations under the License.
*/

package hessian2
package dubbo

import (
"bufio"
"bytes"
"fmt"

commons "github.com/kitex-contrib/codec-hessian2/pkg/common"
commons "github.com/kitex-contrib/codec-dubbo/pkg/common"
)

func NewDecoder(r bufio.Reader) *Decoder {
Expand Down
Loading

0 comments on commit bab3919

Please sign in to comment.