Skip to content
Snippets Groups Projects
Select Git revision
  • c67d34a91067698e288f7318f2cee29b5534cd2d
  • master default protected
  • 3.0
  • develop
  • revert-2069-tripleVersion
  • 3.1
  • rest-protocol
  • feat/remoting_rocketmq
  • dapr-support
  • 1.5
  • 1.4
  • 1.3
  • 1.2
  • 1.1
  • v3.0.3-rc2
  • v3.0.3-rc1
  • v3.0.2
  • v1.5.8
  • v1.5.9-rc1
  • v3.0.1
  • v1.5.8-rc1
  • v3.0.0
  • v3.0.0-rc4-1
  • v3.0.0-rc4
  • v3.0.0-rc3
  • v1.5.7
  • v1.5.7-rc2
  • v3.0.0-rc2
  • remove
  • v1.5.7-rc1
  • v3.0.0-rc1
  • v1.5.7-rc1-tmp
  • 1.5.6
  • v1.5.6
34 results

dubbo_codec.go

Blame
  • dubbo_codec.go 10.46 KiB
    package dubbo
    
    import (
    	"bufio"
    	"bytes"
    	"fmt"
    	"strconv"
    	"time"
    
    	hessian "github.com/apache/dubbo-go-hessian2"
    	"github.com/apache/dubbo-go/common/constant"
    	"github.com/apache/dubbo-go/common/logger"
    	"github.com/apache/dubbo-go/protocol"
    	"github.com/apache/dubbo-go/protocol/invocation"
    	"github.com/apache/dubbo-go/remoting"
    	perrors "github.com/pkg/errors"
    )
    
    //SerialID serial ID
    type SerialID byte
    
    const (
    	// S_Dubbo dubbo serial id
    	S_Dubbo SerialID = 2
    )
    
    func init() {
    	codec := &DubboCodec{}
    	remoting.NewCodec("dubbo", codec)
    }
    
    // DubboPackage ...
    type DubboPackage struct {
    	Header  hessian.DubboHeader
    	Service hessian.Service
    	Body    interface{}
    	Err     error
    }
    
    func (p DubboPackage) String() string {
    	return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
    }
    
    // Marshal ...
    func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
    	codec := hessian.NewHessianCodec(nil)
    
    	pkg, err := codec.Write(p.Service, p.Header, p.Body)
    	if err != nil {
    		return nil, perrors.WithStack(err)
    	}
    
    	return bytes.NewBuffer(pkg), nil
    }
    
    // Unmarshal ...
    func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
    	// fix issue https://github.com/apache/dubbo-go/issues/380
    	bufLen := buf.Len()
    	if bufLen < hessian.HEADER_LENGTH {
    		return perrors.WithStack(hessian.ErrHeaderNotEnough)
    	}
    
    	codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
    
    	// read header
    	err := codec.ReadHeader(&p.Header)
    	if err != nil {
    		return perrors.WithStack(err)
    	}
    
    	if resp != nil { // for client
    		if p.Header.Type&hessian.PackageRequest != 0x00 {
    			// size of this array must be '7'
    			// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
    			p.Body = make([]interface{}, 7)
    		} else {
    			pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
    			if pendingRsp == nil {
    				return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
    			}
    			p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
    		}
    	}
    	// read body
    	err = codec.ReadBody(p.Body)
    	return perrors.WithStack(err)
    }
    
    type DubboCodec struct {
    }
    
    func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
    	if request.Event {
    		return c.encodeHeartbeartReqeust(request)
    	}
    
    	invoc, ok := request.Data.(*protocol.Invocation)
    	if !ok {
    		logger.Errorf("encode request failed for parameter type :%+v", request)
    		return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
    	}
    	invocation := *invoc
    
    	p := &DubboPackage{}
    	p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
    	p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
    	p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
    	p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
    	p.Service.Method = invocation.MethodName()
    
    	timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000"))
    	if err != nil {
    		panic(err)
    	}
    	p.Service.Timeout = time.Duration(timeout)
    	//var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
    	//if len(timeout) != 0 {
    	//	if t, err := time.ParseDuration(timeout); err == nil {
    	//		p.Service.Timeout = t
    	//	}
    	//}
    
    	p.Header.SerialID = byte(S_Dubbo)
    	p.Header.ID = request.Id
    	if request.TwoWay {
    		p.Header.Type = hessian.PackageRequest_TwoWay
    	} else {
    		p.Header.Type = hessian.PackageRequest
    	}
    
    	p.Body = hessian.NewRequest(invocation.Arguments(), invocation.Attachments())
    
    	codec := hessian.NewHessianCodec(nil)
    
    	pkg, err := codec.Write(p.Service, p.Header, p.Body)
    	if err != nil {
    		return nil, perrors.WithStack(err)
    	}
    
    	return bytes.NewBuffer(pkg), nil
    }
    func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
    	pkg := &DubboPackage{}
    	pkg.Body = []interface{}{}
    	pkg.Header.ID = request.Id
    	pkg.Header.Type = hessian.PackageHeartbeat
    	pkg.Header.SerialID = byte(S_Dubbo)
    
    	codec := hessian.NewHessianCodec(nil)
    
    	byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body)
    	if err != nil {
    		return nil, perrors.WithStack(err)
    	}
    
    	return bytes.NewBuffer(byt), nil
    }
    func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
    	var ptype = hessian.PackageResponse
    	if response.IsHeartbeat() {
    		ptype = hessian.PackageHeartbeat
    	}
    	resp := &DubboPackage{
    		Header: hessian.DubboHeader{
    			SerialID:       response.SerialID,
    			Type:           ptype,
    			ID:             response.Id,
    			ResponseStatus: response.Status,
    		},
    	}
    	if !response.IsHeartbeat() {
    		resp.Body = &hessian.Response{
    			RspObj:      response.Result.(protocol.RPCResult).Rest,
    			Exception:   response.Result.(protocol.RPCResult).Err,
    			Attachments: response.Result.(protocol.RPCResult).Attrs,
    		}
    	}
    
    	//if response.Header.Type&hessian.PackageRequest != 0x00 {
    	//	resp.Body = req.Body
    	//} else {
    	//	resp.Body = nil
    	//}
    	codec := hessian.NewHessianCodec(nil)
    
    	pkg, err := codec.Write(resp.Service, resp.Header, resp.Body)
    	if err != nil {
    		return nil, perrors.WithStack(err)
    	}
    
    	return bytes.NewBuffer(pkg), nil
    }
    func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
    	if c.isRequest(data) {
    		req, len, err := c.decodeRequest(data)
    		if err != nil {
    			return remoting.DecodeResult{}, len, err
    		}
    		return remoting.DecodeResult{IsRequest: true, Result: req}, len, err
    	} else {
    		resp, len, err := c.decodeResponse(data)
    		if err != nil {
    			return remoting.DecodeResult{}, len, err
    		}
    		return remoting.DecodeResult{IsRequest: false, Result: resp}, len, err
    	}
    }
    func (c *DubboCodec) isRequest(data []byte) bool {
    	if data[2]&byte(0x80) == 0x00 {
    		return false
    	}
    	return true
    }
    
    func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
    	pkg := &DubboPackage{
    		Body: make([]interface{}, 7),
    	}
    	var request *remoting.Request = nil
    	buf := bytes.NewBuffer(data)
    	err := pkg.Unmarshal(buf, nil)
    	if err != nil {
    		originErr := perrors.Cause(err)
    		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
    			//FIXME
    			return nil, 0, originErr
    		}
    		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
    
    		return request, 0, perrors.WithStack(err)
    	}
    	request = &remoting.Request{
    		Id:       pkg.Header.ID,
    		SerialID: pkg.Header.SerialID,
    		TwoWay:   pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
    		Event:    pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
    	}
    	if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
    		// convert params of request
    		req := pkg.Body.([]interface{}) // length of body should be 7
    		if len(req) > 0 {
    			//invocation := request.Data.(*invocation.RPCInvocation)
    			var methodName string
    			var args []interface{}
    			var attachments map[string]string = make(map[string]string)
    			if req[0] != nil {
    				//dubbo version
    				request.Version = req[0].(string)
    			}
    			if req[1] != nil {
    				//path
    				attachments[constant.PATH_KEY] = req[1].(string)
    			}
    			if req[2] != nil {
    				//version
    				attachments[constant.VERSION_KEY] = req[2].(string)
    			}
    			if req[3] != nil {
    				//method
    				methodName = req[3].(string)
    			}
    			if req[4] != nil {
    				//argsType
    				//invocation.ParameterTypes(constant., req[1].(string))
    				//argsTypes = req[4].(string)
    			}
    			if req[5] != nil {
    				args = req[5].([]interface{})
    			}
    			if req[6] != nil {
    				attachments = req[6].(map[string]string)
    			}
    			//if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 {
    			//	pkg.Service.Path = attachments[constant.PATH_KEY]
    			//}
    			//if _, ok := attachments[constant.INTERFACE_KEY]; ok {
    			//	pkg.Service.Interface = attachments[constant.INTERFACE_KEY]
    			//} else {
    			//	pkg.Service.Interface = pkg.Service.Path
    			//}
    			//if len(attachments[constant.GROUP_KEY]) > 0 {
    			//	pkg.Service.Group = attachments[constant.GROUP_KEY]
    			//}
    			invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
    				invocation.WithArguments(args), invocation.WithMethodName(methodName))
    			request.Data = invoc
    			//pkg.Body = map[string]interface{}{
    			//	"dubboVersion": dubboVersion,
    			//	"argsTypes":    argsTypes,
    			//	"args":         args,
    			//	"service":      common.ServiceMap.GetService("dubbo", pkg.Service.Path), // path as a key
    			//	"attachments":  attachments,
    			//}
    		}
    	}
    	return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
    }
    
    func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error) {
    	pkg := &DubboPackage{}
    	buf := bytes.NewBuffer(data)
    	response := &remoting.Response{}
    	err := pkg.Unmarshal(buf, response)
    	if err != nil {
    		originErr := perrors.Cause(err)
    		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
    			return nil, 0, originErr
    		}
    		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
    
    		return nil, 0, perrors.WithStack(err)
    	}
    	response = &remoting.Response{
    		Id: pkg.Header.ID,
    		//Version:  pkg.Header.,
    		SerialID: pkg.Header.SerialID,
    		Status:   pkg.Header.ResponseStatus,
    		Event:    (pkg.Header.Type & hessian.PackageHeartbeat) != 0,
    	}
    	var error error
    	if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 {
    		if pkg.Header.Type&hessian.PackageResponse != 0x00 {
    			logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
    			if pkg.Err != nil {
    				logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err)
    				error = pkg.Err
    			}
    		} else {
    			logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body)
    			response.Status = hessian.Response_OK
    			//reply(session, p, hessian.PackageHeartbeat)
    		}
    		return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error
    	}
    	logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
    	rpcResult := &protocol.RPCResult{}
    	response.Result = rpcResult
    	if pkg.Header.Type&hessian.PackageRequest == 0x00 {
    		if pkg.Err != nil {
    			rpcResult.Err = pkg.Err
    		} else if pkg.Body.(*hessian.Response).Exception != nil {
    			rpcResult.Err = pkg.Body.(*hessian.Response).Exception
    			response.Error = rpcResult.Err
    		}
    		rpcResult.Attrs = pkg.Body.(*hessian.Response).Attachments
    		rpcResult.Rest = pkg.Body.(*hessian.Response).RspObj
    	}
    
    	//h.conn.updateSession(session)
    	//pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
    	//if pendingResponse == nil {
    	//	logger.Errorf("failed to get pending response context for response package %s", *p)
    	//	return
    	//}
    
    	return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
    }