Skip to content
Snippets Groups Projects
Unverified Commit 256c1ed7 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #673 from apache/refact-seri

Rft: network & codec
parents 163d645c 4f3d4ffb
No related branches found
No related tags found
No related merge requests found
Showing
with 2108 additions and 273 deletions
......@@ -29,6 +29,8 @@ config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
metadata/report/zookeeper/zookeeper-4unittest/
registry/consul/agent*
metadata/report/consul/agent*
remoting/consul/agent*
config_center/apollo/mockDubbog.properties.json
# vim stuff
......
......@@ -45,6 +45,7 @@ const (
DEFAULT_REST_CLIENT = "resty"
DEFAULT_REST_SERVER = "go-restful"
DEFAULT_PORT = 20000
DEFAULT_SERIALIZATION = HESSIAN2_SERIALIZATION
)
const (
......
......@@ -22,31 +22,32 @@ const (
)
const (
PORT_KEY = "port"
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
PROTOCOL_KEY = "protocol"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
CATEGORY_KEY = "category"
CHECK_KEY = "check"
ENABLED_KEY = "enabled"
SIDE_KEY = "side"
OVERRIDE_PROVIDERS_KEY = "providerAddresses"
BEAN_NAME_KEY = "bean.name"
GENERIC_KEY = "generic"
CLASSIFIER_KEY = "classifier"
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
PATH_SEPARATOR = "/"
DUBBO_KEY = "dubbo"
RELEASE_KEY = "release"
ANYHOST_KEY = "anyhost"
SSL_ENABLED_KEY = "ssl-enabled"
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
CATEGORY_KEY = "category"
CHECK_KEY = "check"
ENABLED_KEY = "enabled"
SIDE_KEY = "side"
OVERRIDE_PROVIDERS_KEY = "providerAddresses"
BEAN_NAME_KEY = "bean.name"
GENERIC_KEY = "generic"
CLASSIFIER_KEY = "classifier"
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
DEFAULT_REMOTING_TIMEOUT = 3000
RELEASE_KEY = "release"
ANYHOST_KEY = "anyhost"
PORT_KEY = "port"
PROTOCOL_KEY = "protocol"
PATH_SEPARATOR = "/"
DUBBO_KEY = "dubbo"
SSL_ENABLED_KEY = "ssl-enabled"
)
const (
......@@ -81,6 +82,7 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
SERIALIZATION_KEY = "serialization"
PID_KEY = "pid"
SYNC_REPORT_KEY = "sync.report"
RETRY_PERIOD_KEY = "retry.period"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package constant
const (
S_Hessian2 byte = 2
S_Proto byte = 21
)
const (
HESSIAN2_SERIALIZATION = "hessian2"
PROTOBUF_SERIALIZATION = "protobuf"
)
......@@ -18,6 +18,7 @@
package common
import (
"bytes"
"encoding/base64"
"fmt"
"math"
......@@ -325,12 +326,15 @@ func (c URL) Key() string {
// ServiceKey gets a unique key of a service.
func (c URL) ServiceKey() string {
intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
return ServiceKey(c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")),
c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
}
func ServiceKey(intf string, group string, version string) string {
if intf == "" {
return ""
}
var buf strings.Builder
group := c.GetParam(constant.GROUP_KEY, "")
buf := &bytes.Buffer{}
if group != "" {
buf.WriteString(group)
buf.WriteString("/")
......@@ -338,7 +342,6 @@ func (c URL) ServiceKey() string {
buf.WriteString(intf)
version := c.GetParam(constant.VERSION_KEY, "")
if version != "" && version != "0.0.0" {
buf.WriteString(":")
buf.WriteString(version)
......
......@@ -59,6 +59,7 @@ type ServiceConfig struct {
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
......@@ -270,7 +271,8 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role())
// todo: move
urlMap.Set(constant.SERIALIZATION_KEY, c.Serialization)
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
......
......@@ -30,15 +30,15 @@ import (
"github.com/apache/dubbo-go/config"
_ "github.com/apache/dubbo-go/filter/filter_impl"
"github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/apache/dubbo-go/protocol/dubbo"
_ "github.com/apache/dubbo-go/protocol/dubbo"
"github.com/apache/dubbo-go/remoting/getty"
)
func TestConfigurableExporter(t *testing.T) {
dubbo.SetServerConfig(dubbo.ServerConfig{
getty.SetServerConfig(getty.ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
GettySessionParam: dubbo.GettySessionParam{
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dubbo
import (
"bufio"
"bytes"
"fmt"
"time"
)
import (
"github.com/apache/dubbo-go/common"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/protocol/dubbo/hessian2"
)
//SerialID serial ID
type SerialID byte
const (
// S_Dubbo dubbo serial id
S_Dubbo SerialID = 2
)
//CallType call type
type CallType int32
const (
// CT_UNKNOWN unknown call type
CT_UNKNOWN CallType = 0
// CT_OneWay call one way
CT_OneWay CallType = 1
// CT_TwoWay call in request/response
CT_TwoWay CallType = 2
)
////////////////////////////////////////////
// dubbo package
////////////////////////////////////////////
// SequenceType sequence type
type SequenceType int64
// nolint
type DubboPackage struct {
Header hessian2.DubboHeader
Service hessian2.Service
Body interface{}
Err error
}
// String prints dubbo package detail include header、path、body etc.
func (p DubboPackage) String() string {
return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
}
// Marshal encode hessian package.
func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
codec := hessian2.NewHessianCodec(nil)
pkg, err := codec.Write(p.Service, p.Header, p.Body)
if err != nil {
return nil, perrors.WithStack(err)
}
return bytes.NewBuffer(pkg), nil
}
// Unmarshal decodes hessian package.
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
// fix issue https://github.com/apache/dubbo-go/issues/380
bufLen := buf.Len()
if bufLen < hessian2.HEADER_LENGTH {
return perrors.WithStack(hessian2.ErrHeaderNotEnough)
}
codec := hessian2.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
// read header
err := codec.ReadHeader(&p.Header)
if err != nil {
return perrors.WithStack(err)
}
if len(opts) != 0 { // for client
client, ok := opts[0].(*Client)
if !ok {
return perrors.Errorf("opts[0] is not of type *Client")
}
if p.Header.Type&hessian2.PackageRequest != 0x00 {
// size of this array must be '7'
// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
p.Body = make([]interface{}, 7)
} else {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
}
p.Body = &hessian2.DubboResponse{RspObj: pendingRsp.(*PendingResponse).response.reply}
}
}
// read body
err = codec.ReadBody(p.Body)
return perrors.WithStack(err)
}
////////////////////////////////////////////
// PendingResponse
////////////////////////////////////////////
// PendingResponse is a pending response.
type PendingResponse struct {
seq uint64
err error
start time.Time
readStart time.Time
callback common.AsyncCallback
response *Response
done chan struct{}
}
// NewPendingResponse create a PendingResponses.
func NewPendingResponse() *PendingResponse {
return &PendingResponse{
start: time.Now(),
response: &Response{},
done: make(chan struct{}),
}
}
// GetCallResponse get AsyncCallbackResponse.
func (r PendingResponse) GetCallResponse() common.CallbackResponse {
return AsyncCallbackResponse{
Cause: r.err,
Start: r.start,
ReadStart: r.readStart,
Reply: r.response,
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dubbo
import (
"bytes"
"strconv"
"time"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/dubbo/impl"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
)
//SerialID serial ID
type SerialID byte
func init() {
codec := &DubboCodec{}
// this is for registry dubboCodec of dubbo protocol
remoting.RegistryCodec("dubbo", codec)
}
// DubboCodec. It is implements remoting.Codec
type DubboCodec struct {
}
// encode request for transport
func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
if request.Event {
return c.encodeHeartbeartReqeust(request)
}
invoc, ok := request.Data.(*protocol.Invocation)
if !ok {
err := perrors.Errorf("encode request failed for parameter type :%+v", request)
logger.Errorf(err.Error())
return nil, err
}
invocation := *invoc
svc := impl.Service{}
svc.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
svc.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
svc.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
svc.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
svc.Method = invocation.MethodName()
timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT)))
if err != nil {
// it will be wrapped in readwrite.Write .
return nil, perrors.WithStack(err)
}
svc.Timeout = time.Duration(timeout)
header := impl.DubboHeader{}
serialization := invocation.AttachmentsByKey(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION)
if serialization == constant.PROTOBUF_SERIALIZATION {
header.SerialID = constant.S_Proto
} else {
header.SerialID = constant.S_Hessian2
}
header.ID = request.ID
if request.TwoWay {
header.Type = impl.PackageRequest_TwoWay
} else {
header.Type = impl.PackageRequest
}
pkg := &impl.DubboPackage{
Header: header,
Service: svc,
Body: impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
Err: nil,
Codec: impl.NewDubboCodec(nil),
}
if err := impl.LoadSerializer(pkg); err != nil {
return nil, perrors.WithStack(err)
}
return pkg.Marshal()
}
// encode heartbeart request
func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
header := impl.DubboHeader{
Type: impl.PackageHeartbeat,
SerialID: constant.S_Hessian2,
ID: request.ID,
}
pkg := &impl.DubboPackage{
Header: header,
Service: impl.Service{},
Body: impl.NewRequestPayload([]interface{}{}, nil),
Err: nil,
Codec: impl.NewDubboCodec(nil),
}
if err := impl.LoadSerializer(pkg); err != nil {
return nil, err
}
return pkg.Marshal()
}
// encode response
func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
var ptype = impl.PackageResponse
if response.IsHeartbeat() {
ptype = impl.PackageHeartbeat
}
resp := &impl.DubboPackage{
Header: impl.DubboHeader{
SerialID: response.SerialID,
Type: ptype,
ID: response.ID,
ResponseStatus: response.Status,
},
}
if !response.IsHeartbeat() {
resp.Body = &impl.ResponsePayload{
RspObj: response.Result.(protocol.RPCResult).Rest,
Exception: response.Result.(protocol.RPCResult).Err,
Attachments: response.Result.(protocol.RPCResult).Attrs,
}
}
codec := impl.NewDubboCodec(nil)
pkg, err := codec.Encode(*resp)
if err != nil {
return nil, perrors.WithStack(err)
}
return bytes.NewBuffer(pkg), nil
}
// Decode data, including request and response.
func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
if c.isRequest(data) {
req, len, err := c.decodeRequest(data)
if err != nil {
return remoting.DecodeResult{}, len, perrors.WithStack(err)
}
return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
}
resp, len, err := c.decodeResponse(data)
if err != nil {
return remoting.DecodeResult{}, len, perrors.WithStack(err)
}
return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
}
func (c *DubboCodec) isRequest(data []byte) bool {
if data[2]&byte(0x80) == 0x00 {
return false
}
return true
}
// decode request
func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
var request *remoting.Request = nil
buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
pkg.SetBody(make([]interface{}, 7))
err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
//FIXME
return nil, 0, originErr
}
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
return request, 0, perrors.WithStack(err)
}
request = &remoting.Request{
ID: pkg.Header.ID,
SerialID: pkg.Header.SerialID,
TwoWay: pkg.Header.Type&impl.PackageRequest_TwoWay != 0x00,
Event: pkg.Header.Type&impl.PackageHeartbeat != 0x00,
}
if (pkg.Header.Type & impl.PackageHeartbeat) == 0x00 {
// convert params of request
req := pkg.Body.(map[string]interface{})
//invocation := request.Data.(*invocation.RPCInvocation)
var methodName string
var args []interface{}
attachments := make(map[string]interface{})
if req[impl.DubboVersionKey] != nil {
//dubbo version
request.Version = req[impl.DubboVersionKey].(string)
}
//path
attachments[constant.PATH_KEY] = pkg.Service.Path
//version
attachments[constant.VERSION_KEY] = pkg.Service.Version
//method
methodName = pkg.Service.Method
args = req[impl.ArgsKey].([]interface{})
attachments = req[impl.AttachmentsKey].(map[string]interface{})
invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
invocation.WithArguments(args), invocation.WithMethodName(methodName))
request.Data = invoc
}
return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
// decode response
func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error) {
buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
response := &remoting.Response{}
err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
// if the data is very big, so the receive need much times.
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return nil, 0, originErr
}
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
return nil, 0, perrors.WithStack(err)
}
response = &remoting.Response{
ID: pkg.Header.ID,
//Version: pkg.Header.,
SerialID: pkg.Header.SerialID,
Status: pkg.Header.ResponseStatus,
Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0,
}
var error error
if pkg.Header.Type&impl.PackageHeartbeat != 0x00 {
if pkg.Header.Type&impl.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
if pkg.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err)
error = pkg.Err
}
} else {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body)
response.Status = hessian.Response_OK
//reply(session, p, hessian.PackageHeartbeat)
}
return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
rpcResult := &protocol.RPCResult{}
response.Result = rpcResult
if pkg.Header.Type&impl.PackageRequest == 0x00 {
if pkg.Err != nil {
rpcResult.Err = pkg.Err
} else if pkg.Body.(*impl.ResponsePayload).Exception != nil {
rpcResult.Err = pkg.Body.(*impl.ResponsePayload).Exception
response.Error = rpcResult.Err
}
rpcResult.Attrs = pkg.Body.(*impl.ResponsePayload).Attachments
rpcResult.Rest = pkg.Body.(*impl.ResponsePayload).RspObj
}
return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
......@@ -20,6 +20,7 @@ package dubbo
import (
"context"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
......@@ -34,8 +35,10 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
)
var (
......@@ -46,24 +49,35 @@ var (
)
var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
constant.VERSION_KEY}
)
// DubboInvoker is dubbo client invoker.
// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip.
type DubboInvoker struct {
protocol.BaseInvoker
client *Client
// the exchange layer, it is focus on network communication.
client *remoting.ExchangeClient
quitOnce sync.Once
// timeout for service(interface) level.
timeout time.Duration
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}
// NewDubboInvoker create dubbo client invoker.
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
// NewDubboInvoker constructor
func NewDubboInvoker(url common.URL, client *remoting.ExchangeClient) *DubboInvoker {
requestTimeout := config.GetConsumerConfig().RequestTimeout
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
reqNum: 0,
timeout: requestTimeout,
}
}
......@@ -84,6 +98,8 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
defer atomic.AddInt64(&(di.reqNum), -1)
inv := invocation.(*invocation_impl.RPCInvocation)
// init param
inv.SetAttachments(constant.PATH_KEY, di.GetUrl().GetParam(constant.INTERFACE_KEY, ""))
for _, k := range attachmentKey {
if v := di.GetUrl().GetParam(k, ""); len(v) > 0 {
inv.SetAttachments(k, v)
......@@ -94,35 +110,57 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
di.appendCtx(ctx, inv)
url := di.GetUrl()
// default hessian2 serialization, compatible
if url.GetParam(constant.SERIALIZATION_KEY, "") == "" {
url.SetParam(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION)
}
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
if err != nil {
logger.Errorf("ParseBool - error: %v", err)
async = false
}
response := NewResponse(inv.Reply(), nil)
//response := NewResponse(inv.Reply(), nil)
rest := &protocol.RPCResult{}
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
//result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
result.Err = di.client.Send(&invocation, url, timeout)
}
} else {
if inv.Reply() == nil {
result.Err = ErrNoReply
} else {
result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
result.Err = di.client.Request(&invocation, url, timeout, rest)
}
}
if result.Err == nil {
result.Rest = inv.Reply()
result.Attrs = response.atta
result.Attrs = rest.Attrs
}
logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
return &result
}
// get timeout including methodConfig
func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration {
var timeout = di.GetUrl().GetParam(strings.Join([]string{constant.METHOD_KEYS, invocation.MethodName(), constant.TIMEOUT_KEY}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}
// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
......
......@@ -18,6 +18,7 @@
package dubbo
import (
"bytes"
"context"
"sync"
"testing"
......@@ -25,40 +26,37 @@ import (
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/getty"
)
func TestDubboInvokerInvoke(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: new(sync.Map),
conf: *clientConf,
opts: Options{
ConnectTimeout: 3 * time.Second,
RequestTimeout: 6 * time.Second,
},
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
c := getExchangeClient(url)
invoker := NewDubboInvoker(url, c)
user := &User{}
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(mockMethodNameGetUser), invocation.WithArguments([]interface{}{"1", "username"}),
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}),
invocation.WithReply(user), invocation.WithAttachments(map[string]interface{}{"test_key": "test_value"}))
// Call
res := invoker.Invoke(context.Background(), inv)
assert.NoError(t, res.Error())
assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User))
assert.Equal(t, "test_value", res.Attachments()["test_key"]) // test attachments for request/response
// CallOneway
inv.SetAttachments(constant.ASYNC_KEY, "true")
......@@ -69,8 +67,10 @@ func TestDubboInvokerInvoke(t *testing.T) {
lock := sync.Mutex{}
lock.Lock()
inv.SetCallBack(func(response common.CallbackResponse) {
r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
r := response.(remoting.AsyncCallbackResponse)
rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{Id: "1", Name: "username"}, *(rst.Rest.(*User)))
//assert.Equal(t, User{ID: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock()
})
res = invoker.Invoke(context.Background(), inv)
......@@ -92,3 +92,143 @@ func TestDubboInvokerInvoke(t *testing.T) {
proto.Destroy()
lock.Unlock()
}
func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
hessian.RegisterPOJO(&User{})
methods, err := common.ServiceMap.Register("", "dubbo", &UserProvider{})
assert.NoError(t, err)
assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods)
// config
getty.SetClientConf(getty.ClientConfig{
ConnectionNum: 2,
HeartbeatPeriod: "5s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgWQSize: 512,
TcpReadTimeout: "4s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 10240000000,
SessionName: "client",
},
})
getty.SetServerConfig(getty.ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 10240000000,
SessionName: "server",
}})
// Export
proto := GetProtocol()
url, err := common.NewURL("dubbo://127.0.0.1:20702/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
proto.Export(&proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
})
time.Sleep(time.Second * 2)
return proto, url
}
//////////////////////////////////
// provider
//////////////////////////////////
type (
User struct {
Id string `json:"id"`
Name string `json:"name"`
}
UserProvider struct {
user map[string]User
}
)
// size:4801228
func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error {
argBuf := new(bytes.Buffer)
for i := 0; i < 400; i++ {
// use chinese for test
argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
}
rsp.Id = argBuf.String()
rsp.Name = argBuf.String()
return nil
}
func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error {
rsp.Id = req[0].(string)
rsp.Name = req[1].(string)
return nil
}
func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
return User{Id: id, Name: name}, nil
}
func (u *UserProvider) GetUser1() error {
return nil
}
func (u *UserProvider) GetUser2() error {
return perrors.New("error")
}
func (u *UserProvider) GetUser3(rsp *[]interface{}) error {
*rsp = append(*rsp, User{Id: "1", Name: "username"})
return nil
}
func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) {
return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil
}
func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) {
return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil
}
func (u *UserProvider) GetUser6(id int64) (*User, error) {
if id == 0 {
return nil, nil
}
return &User{Id: "1"}, nil
}
func (u *UserProvider) Reference() string {
return "UserProvider"
}
func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
......@@ -18,10 +18,16 @@
package dubbo
import (
"context"
"fmt"
"sync"
"time"
)
import (
"github.com/opentracing/opentracing-go"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -29,14 +35,23 @@ import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/getty"
)
// dubbo protocol constant
const (
// DUBBO is dubbo protocol name
DUBBO = "dubbo"
)
var (
// Make the connection can be shared.
// It will create one connection for one address (ip+port)
exchangeClientMap = new(sync.Map)
exchangeLock = new(sync.Map)
)
func init() {
extension.SetProtocol(DUBBO, GetProtocol)
}
......@@ -45,10 +60,12 @@ var (
dubboProtocol *DubboProtocol
)
// DubboProtocol is a dubbo protocol implement.
// It support dubbo protocol. It implements Protocol interface for dubbo protocol.
type DubboProtocol struct {
protocol.BaseProtocol
serverMap map[string]*Server
// It is store relationship about serviceKey(group/interface:version) and ExchangeServer
// The ExchangeServer is introduced to replace of Server. Because Server is depend on getty directly.
serverMap map[string]*remoting.ExchangeServer
serverLock sync.Mutex
}
......@@ -56,7 +73,7 @@ type DubboProtocol struct {
func NewDubboProtocol() *DubboProtocol {
return &DubboProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
serverMap: make(map[string]*Server),
serverMap: make(map[string]*remoting.ExchangeServer),
}
}
......@@ -67,7 +84,6 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap())
dp.SetExporterMap(serviceKey, exporter)
logger.Infof("Export service: %s", url.String())
// start server
dp.openServer(url)
return exporter
......@@ -75,18 +91,12 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
// Refer create dubbo service reference.
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
exchangeClient := getExchangeClient(url)
if exchangeClient == nil {
logger.Warnf("can't dial the server: %+v", url.Location)
return nil
}
invoker := NewDubboInvoker(url, NewClient(Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: requestTimeout,
}))
invoker := NewDubboInvoker(url, exchangeClient)
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
......@@ -116,9 +126,12 @@ func (dp *DubboProtocol) openServer(url common.URL) {
dp.serverLock.Lock()
_, ok = dp.serverMap[url.Location]
if !ok {
srv := NewServer()
handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
return doHandleRequest(invocation)
}
srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler))
dp.serverMap[url.Location] = srv
srv.Start(url)
srv.Start()
}
dp.serverLock.Unlock()
}
......@@ -131,3 +144,91 @@ func GetProtocol() protocol.Protocol {
}
return dubboProtocol
}
func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
result := protocol.RPCResult{}
if exporter == nil {
err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
logger.Errorf(err.Error())
result.Err = err
//reply(session, p, hessian.PackageResponse)
return result
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
// FIXME
ctx := rebuildCtx(rpcInvocation)
invokeResult := invoker.Invoke(ctx, rpcInvocation)
if err := invokeResult.Error(); err != nil {
result.Err = invokeResult.Error()
//p.Header.ResponseStatus = hessian.Response_OK
//p.Body = hessian.NewResponse(nil, err, result.Attachments())
} else {
result.Rest = invokeResult.Result()
//p.Header.ResponseStatus = hessian.Response_OK
//p.Body = hessian.NewResponse(res, nil, result.Attachments())
}
} else {
result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
}
return result
}
func getExchangeClient(url common.URL) *remoting.ExchangeClient {
clientTmp, ok := exchangeClientMap.Load(url.Location)
if !ok {
var exchangeClientTmp *remoting.ExchangeClient
func() {
// lock for NewExchangeClient and store into map.
_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
// unlock
defer exchangeLock.Delete(url.Location)
if loaded {
// retry for 5 times.
for i := 0; i < 5; i++ {
if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
break
} else {
// if cannot get, sleep a while.
time.Sleep(time.Duration(i*100) * time.Millisecond)
}
}
return
}
// new ExchangeClient
exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: config.GetConsumerConfig().RequestTimeout,
}), config.GetConsumerConfig().ConnectTimeout, false)
// input store
if exchangeClientTmp != nil {
exchangeClientMap.Store(url.Location, exchangeClientTmp)
}
}()
if exchangeClientTmp != nil {
return exchangeClientTmp
}
}
// cannot dial the server
if clientTmp == nil {
return nil
}
return clientTmp.(*remoting.ExchangeClient)
}
// rebuildCtx rebuild the context by attachment.
// Once we decided to transfer more context's key-value, we should change this.
// now we only support rebuild the tracing context
func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
ctx := context.WithValue(context.Background(), "attachment", inv.Attachments())
// actually, if user do not use any opentracing framework, the err will not be nil.
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
opentracing.TextMapCarrier(filterContext(inv.Attachments())))
if err == nil {
ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
}
return ctx
}
......@@ -28,7 +28,9 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting/getty"
)
const (
......@@ -39,14 +41,56 @@ const (
"side=provider&timeout=3000&timestamp=1556509797245"
)
func TestDubboProtocolExport(t *testing.T) {
func initDubboInvokerTest() {
getty.SetServerConfig(getty.ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 10240000000,
SessionName: "server",
}})
getty.SetClientConf(getty.ClientConfig{
ConnectionNum: 1,
HeartbeatPeriod: "3s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgWQSize: 512,
TcpReadTimeout: "4s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 10240000000,
SessionName: "client",
},
})
}
func TestDubboProtocol_Export(t *testing.T) {
initDubboInvokerTest()
srvCfg := getty.GetDefaultServerConfig()
getty.SetServerConfig(srvCfg)
// Export
proto := GetProtocol()
srvConf = &ServerConfig{}
url, err := common.NewURL(mockCommonUrl)
assert.NoError(t, err)
exporter := proto.Export(protocol.NewBaseInvoker(url))
// make sure url
eq := exporter.GetInvoker().GetUrl().URLEqual(url)
assert.True(t, eq)
......@@ -60,10 +104,10 @@ func TestDubboProtocolExport(t *testing.T) {
assert.True(t, eq2)
// make sure exporterMap after 'Unexport'
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey())
assert.True(t, ok)
exporter.Unexport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
exporter2.Unexport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url2.ServiceKey())
assert.False(t, ok)
// make sure serverMap after 'Destroy'
......@@ -74,14 +118,29 @@ func TestDubboProtocolExport(t *testing.T) {
assert.False(t, ok)
}
func TestDubboProtocolRefer(t *testing.T) {
func TestDubboProtocolReferNoConnect(t *testing.T) {
// Refer
initDubboInvokerTest()
proto := GetProtocol()
url, err := common.NewURL(mockCommonUrl)
assert.NoError(t, err)
clientConf = &ClientConfig{}
invoker := proto.Refer(url)
assert.Nil(t, invoker)
}
func TestDubboProtocol_Refer(t *testing.T) {
initDubboInvokerTest()
cliCfg := getty.GetDefaultClientConfig()
getty.SetClientConf(cliCfg)
// Refer
proto := GetProtocol()
url, err := common.NewURL(mockCommonUrl)
proto.Export(&proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
})
assert.NoError(t, err)
invoker := proto.Refer(url)
// make sure url
eq := invoker.GetUrl().URLEqual(url)
assert.True(t, eq)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"bufio"
"encoding/binary"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type ProtocolCodec struct {
reader *bufio.Reader
pkgType PackageType
bodyLen int
serializer Serializer
headerRead bool
}
func (c *ProtocolCodec) ReadHeader(header *DubboHeader) error {
var err error
if c.reader.Size() < HEADER_LENGTH {
return hessian.ErrHeaderNotEnough
}
buf, err := c.reader.Peek(HEADER_LENGTH)
if err != nil { // this is impossible
return perrors.WithStack(err)
}
_, err = c.reader.Discard(HEADER_LENGTH)
if err != nil { // this is impossible
return perrors.WithStack(err)
}
//// read header
if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW {
return hessian.ErrIllegalPackage
}
// Header{serialization id(5 bit), event, two way, req/response}
if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero {
return perrors.Errorf("serialization ID:%v", header.SerialID)
}
flag := buf[2] & FLAG_EVENT
if flag != Zero {
header.Type |= PackageHeartbeat
}
flag = buf[2] & FLAG_REQUEST
if flag != Zero {
header.Type |= PackageRequest
flag = buf[2] & FLAG_TWOWAY
if flag != Zero {
header.Type |= PackageRequest_TwoWay
}
} else {
header.Type |= PackageResponse
header.ResponseStatus = buf[3]
if header.ResponseStatus != Response_OK {
header.Type |= PackageResponse_Exception
}
}
// Header{req id}
header.ID = int64(binary.BigEndian.Uint64(buf[4:]))
// Header{body len}
header.BodyLen = int(binary.BigEndian.Uint32(buf[12:]))
if header.BodyLen < 0 {
return hessian.ErrIllegalPackage
}
c.pkgType = header.Type
c.bodyLen = header.BodyLen
if c.reader.Buffered() < c.bodyLen {
return hessian.ErrBodyNotEnough
}
c.headerRead = true
return perrors.WithStack(err)
}
func (c *ProtocolCodec) EncodeHeader(p DubboPackage) []byte {
header := p.Header
bs := make([]byte, 0)
switch header.Type {
case PackageHeartbeat:
if header.ResponseStatus == Zero {
bs = append(bs, hessian.DubboRequestHeartbeatHeader[:]...)
} else {
bs = append(bs, hessian.DubboResponseHeartbeatHeader[:]...)
}
case PackageResponse:
bs = append(bs, hessian.DubboResponseHeaderBytes[:]...)
if header.ResponseStatus != 0 {
bs[3] = header.ResponseStatus
}
case PackageRequest_TwoWay:
bs = append(bs, hessian.DubboRequestHeaderBytesTwoWay[:]...)
}
bs[2] |= header.SerialID & hessian.SERIAL_MASK
binary.BigEndian.PutUint64(bs[4:], uint64(header.ID))
return bs
}
func (c *ProtocolCodec) Encode(p DubboPackage) ([]byte, error) {
// header
if c.serializer == nil {
return nil, perrors.New("serializer should not be nil")
}
header := p.Header
switch header.Type {
case PackageHeartbeat:
if header.ResponseStatus == Zero {
return packRequest(p, c.serializer)
}
return packResponse(p, c.serializer)
case PackageRequest, PackageRequest_TwoWay:
return packRequest(p, c.serializer)
case PackageResponse:
return packResponse(p, c.serializer)
default:
return nil, perrors.Errorf("Unrecognised message type: %v", header.Type)
}
}
func (c *ProtocolCodec) Decode(p *DubboPackage) error {
if !c.headerRead {
if err := c.ReadHeader(&p.Header); err != nil {
return err
}
}
body, err := c.reader.Peek(p.GetBodyLen())
if err != nil {
return err
}
if p.IsResponseWithException() {
logger.Infof("response with exception: %+v", p.Header)
decoder := hessian.NewDecoder(body)
exception, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
rsp, ok := p.Body.(*ResponsePayload)
if !ok {
return perrors.Errorf("java exception:%s", exception.(string))
}
rsp.Exception = perrors.Errorf("java exception:%s", exception.(string))
return nil
} else if p.IsHeartBeat() {
// heartbeat no need to unmarshal contents
return nil
}
if c.serializer == nil {
return perrors.New("Codec serializer is nil")
}
if p.IsResponse() {
p.Body = &ResponsePayload{
RspObj: remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID)).Reply,
}
}
return c.serializer.Unmarshal(body, p)
}
func (c *ProtocolCodec) SetSerializer(serializer Serializer) {
c.serializer = serializer
}
func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
var (
byteArray []byte
pkgLen int
)
header := p.Header
//////////////////////////////////////////
// byteArray
//////////////////////////////////////////
// magic
switch header.Type {
case PackageHeartbeat:
byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...)
case PackageRequest_TwoWay:
byteArray = append(byteArray, DubboRequestHeaderBytesTwoWay[:]...)
default:
byteArray = append(byteArray, DubboRequestHeaderBytes[:]...)
}
// serialization id, two way flag, event, request/response flag
// SerialID is id of serialization approach in java dubbo
byteArray[2] |= header.SerialID & SERIAL_MASK
// request id
binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
//////////////////////////////////////////
// body
//////////////////////////////////////////
if p.IsHeartBeat() {
byteArray = append(byteArray, byte('N'))
pkgLen = 1
} else {
body, err := serializer.Marshal(p)
if err != nil {
return nil, err
}
pkgLen = len(body)
if pkgLen > int(DEFAULT_LEN) { // 8M
return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
}
byteArray = append(byteArray, body...)
}
binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
return byteArray, nil
}
func packResponse(p DubboPackage, serializer Serializer) ([]byte, error) {
var (
byteArray []byte
)
header := p.Header
hb := p.IsHeartBeat()
// magic
if hb {
byteArray = append(byteArray, DubboResponseHeartbeatHeader[:]...)
} else {
byteArray = append(byteArray, DubboResponseHeaderBytes[:]...)
}
// set serialID, identify serialization types, eg: fastjson->6, hessian2->2
byteArray[2] |= header.SerialID & SERIAL_MASK
// response status
if header.ResponseStatus != 0 {
byteArray[3] = header.ResponseStatus
}
// request id
binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
// body
body, err := serializer.Marshal(p)
if err != nil {
return nil, err
}
pkgLen := len(body)
if pkgLen > int(DEFAULT_LEN) { // 8M
return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
}
// byteArray{body length}
binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
byteArray = append(byteArray, body...)
return byteArray, nil
}
func NewDubboCodec(reader *bufio.Reader) *ProtocolCodec {
s, _ := GetSerializerById(constant.S_Hessian2)
return &ProtocolCodec{
reader: reader,
pkgType: 0,
bodyLen: 0,
headerRead: false,
serializer: s.(Serializer),
}
}
......@@ -15,45 +15,46 @@
* limitations under the License.
*/
package dubbo
package impl
import (
"bytes"
"testing"
"time"
)
import (
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/protocol/dubbo/hessian2"
"github.com/apache/dubbo-go/common/constant"
)
func TestDubboPackageMarshalAndUnmarshal(t *testing.T) {
pkg := &DubboPackage{}
func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
pkg := NewDubboPackage(nil)
pkg.Body = []interface{}{"a"}
pkg.Header.Type = hessian2.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
pkg.Header.Type = PackageHeartbeat
pkg.Header.SerialID = constant.S_Hessian2
pkg.Header.ID = 10086
pkg.SetSerializer(HessianSerializer{})
// heartbeat
data, err := pkg.Marshal()
assert.NoError(t, err)
pkgres := &DubboPackage{}
pkgres := NewDubboPackage(data)
pkgres.SetSerializer(HessianSerializer{})
pkgres.Body = []interface{}{}
err = pkgres.Unmarshal(data)
err = pkgres.Unmarshal()
assert.NoError(t, err)
assert.Equal(t, hessian2.PackageHeartbeat|hessian2.PackageRequest|hessian2.PackageRequest_TwoWay, pkgres.Header.Type)
assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
// request
pkg.Header.Type = hessian2.PackageRequest
pkg.Header.Type = PackageRequest
pkg.Service.Interface = "Service"
pkg.Service.Path = "path"
pkg.Service.Version = "2.6"
......@@ -62,25 +63,27 @@ func TestDubboPackageMarshalAndUnmarshal(t *testing.T) {
data, err = pkg.Marshal()
assert.NoError(t, err)
pkgres = &DubboPackage{}
pkgres = NewDubboPackage(data)
pkgres.SetSerializer(HessianSerializer{})
pkgres.Body = make([]interface{}, 7)
err = pkgres.Unmarshal(data)
err = pkgres.Unmarshal()
reassembleBody := pkgres.GetBody().(map[string]interface{})
assert.NoError(t, err)
assert.Equal(t, hessian2.PackageRequest, pkgres.Header.Type)
assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
assert.Equal(t, PackageRequest, pkgres.Header.Type)
assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0])
assert.Equal(t, "path", pkgres.Body.([]interface{})[1])
assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2])
assert.Equal(t, "Method", pkgres.Body.([]interface{})[3])
assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4])
assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5])
assert.Equal(t, map[string]interface{}{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6])
}
func TestIssue380(t *testing.T) {
pkg := &DubboPackage{}
buf := bytes.NewBuffer([]byte("hello"))
err := pkg.Unmarshal(buf)
assert.True(t, perrors.Cause(err) == hessian2.ErrHeaderNotEnough)
assert.Equal(t, "2.0.2", reassembleBody["dubboVersion"].(string))
assert.Equal(t, "path", pkgres.Service.Path)
assert.Equal(t, "2.6", pkgres.Service.Version)
assert.Equal(t, "Method", pkgres.Service.Method)
assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string))
assert.Equal(t, []interface{}{"a"}, reassembleBody["args"])
tmpData := map[string]interface{}{
"dubbo": "2.0.2",
"interface": "Service",
"path": "path",
"timeout": "1000",
"version": "2.6",
}
assert.Equal(t, tmpData, reassembleBody["attachments"])
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"reflect"
"regexp"
"github.com/pkg/errors"
)
const (
DUBBO = "dubbo"
)
const (
mask = byte(127)
flag = byte(128)
)
const (
// Zero : byte zero
Zero = byte(0x00)
)
// constansts
const (
TAG_READ = int32(-1)
ASCII_GAP = 32
CHUNK_SIZE = 4096
BC_BINARY = byte('B') // final chunk
BC_BINARY_CHUNK = byte('A') // non-final chunk
BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary
BINARY_DIRECT_MAX = byte(0x0f)
BC_BINARY_SHORT = byte(0x34) // 2-byte length binary
BINARY_SHORT_MAX = 0x3ff // 0-1023 binary
BC_DATE = byte(0x4a) // 64-bit millisecond UTC date
BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date
BC_DOUBLE = byte('D') // IEEE 64-bit double
BC_DOUBLE_ZERO = byte(0x5b)
BC_DOUBLE_ONE = byte(0x5c)
BC_DOUBLE_BYTE = byte(0x5d)
BC_DOUBLE_SHORT = byte(0x5e)
BC_DOUBLE_MILL = byte(0x5f)
BC_FALSE = byte('F') // boolean false
BC_INT = byte('I') // 32-bit int
INT_DIRECT_MIN = -0x10
INT_DIRECT_MAX = byte(0x2f)
BC_INT_ZERO = byte(0x90)
INT_BYTE_MIN = -0x800
INT_BYTE_MAX = 0x7ff
BC_INT_BYTE_ZERO = byte(0xc8)
BC_END = byte('Z')
INT_SHORT_MIN = -0x40000
INT_SHORT_MAX = 0x3ffff
BC_INT_SHORT_ZERO = byte(0xd4)
BC_LIST_VARIABLE = byte(0x55)
BC_LIST_FIXED = byte('V')
BC_LIST_VARIABLE_UNTYPED = byte(0x57)
BC_LIST_FIXED_UNTYPED = byte(0x58)
_listFixedTypedLenTagMin = byte(0x70)
_listFixedTypedLenTagMax = byte(0x77)
_listFixedUntypedLenTagMin = byte(0x78)
_listFixedUntypedLenTagMax = byte(0x7f)
BC_LIST_DIRECT = byte(0x70)
BC_LIST_DIRECT_UNTYPED = byte(0x78)
LIST_DIRECT_MAX = byte(0x7)
BC_LONG = byte('L') // 64-bit signed integer
LONG_DIRECT_MIN = -0x08
LONG_DIRECT_MAX = byte(0x0f)
BC_LONG_ZERO = byte(0xe0)
LONG_BYTE_MIN = -0x800
LONG_BYTE_MAX = 0x7ff
BC_LONG_BYTE_ZERO = byte(0xf8)
LONG_SHORT_MIN = -0x40000
LONG_SHORT_MAX = 0x3ffff
BC_LONG_SHORT_ZERO = byte(0x3c)
BC_LONG_INT = byte(0x59)
BC_MAP = byte('M')
BC_MAP_UNTYPED = byte('H')
BC_NULL = byte('N') // x4e
BC_OBJECT = byte('O')
BC_OBJECT_DEF = byte('C')
BC_OBJECT_DIRECT = byte(0x60)
OBJECT_DIRECT_MAX = byte(0x0f)
BC_REF = byte(0x51)
BC_STRING = byte('S') // final string
BC_STRING_CHUNK = byte('R') // non-final string
BC_STRING_DIRECT = byte(0x00)
STRING_DIRECT_MAX = byte(0x1f)
BC_STRING_SHORT = byte(0x30)
STRING_SHORT_MAX = 0x3ff
BC_TRUE = byte('T')
P_PACKET_CHUNK = byte(0x4f)
P_PACKET = byte('P')
P_PACKET_DIRECT = byte(0x80)
PACKET_DIRECT_MAX = byte(0x7f)
P_PACKET_SHORT = byte(0x70)
PACKET_SHORT_MAX = 0xfff
ARRAY_STRING = "[string"
ARRAY_INT = "[int"
ARRAY_DOUBLE = "[double"
ARRAY_FLOAT = "[float"
ARRAY_BOOL = "[boolean"
ARRAY_LONG = "[long"
PATH_KEY = "path"
GROUP_KEY = "group"
INTERFACE_KEY = "interface"
VERSION_KEY = "version"
TIMEOUT_KEY = "timeout"
STRING_NIL = ""
STRING_TRUE = "true"
STRING_FALSE = "false"
STRING_ZERO = "0.0"
STRING_ONE = "1.0"
)
// ResponsePayload related consts
const (
Response_OK byte = 20
Response_CLIENT_TIMEOUT byte = 30
Response_SERVER_TIMEOUT byte = 31
Response_BAD_REQUEST byte = 40
Response_BAD_RESPONSE byte = 50
Response_SERVICE_NOT_FOUND byte = 60
Response_SERVICE_ERROR byte = 70
Response_SERVER_ERROR byte = 80
Response_CLIENT_ERROR byte = 90
// According to "java dubbo" There are two cases of response:
// 1. with attachments
// 2. no attachments
RESPONSE_WITH_EXCEPTION int32 = 0
RESPONSE_VALUE int32 = 1
RESPONSE_NULL_VALUE int32 = 2
RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3
RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4
RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5
)
/**
* the dubbo protocol header length is 16 Bytes.
* the first 2 Bytes is magic code '0xdabb'
* the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag
* the next 1 Bytes is response state.
* the next 8 Bytes is package DI.
* the next 4 Bytes is package length.
**/
const (
// header length.
HEADER_LENGTH = 16
// magic header
MAGIC = uint16(0xdabb)
MAGIC_HIGH = byte(0xda)
MAGIC_LOW = byte(0xbb)
// message flag.
FLAG_REQUEST = byte(0x80)
FLAG_TWOWAY = byte(0x40)
FLAG_EVENT = byte(0x20) // for heartbeat
SERIAL_MASK = 0x1f
DUBBO_VERSION = "2.5.4"
DUBBO_VERSION_KEY = "dubbo"
DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200
DEFAULT_LEN = 8388608 // 8 * 1024 * 1024 default body max length
)
// regular
const (
JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)"
CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" + JAVA_IDENT_REGEX + ")*;)"
ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))"
DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" + ARRAY_DESC + ")"
)
// Dubbo request response related consts
var (
DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY}
DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST}
DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, Zero, Response_OK}
DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT}
DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_EVENT}
)
// Error part
var (
ErrHeaderNotEnough = errors.New("header buffer too short")
ErrBodyNotEnough = errors.New("body buffer too short")
ErrJavaException = errors.New("got java exception")
ErrIllegalPackage = errors.New("illegal package!")
)
// DescRegex ...
var DescRegex, _ = regexp.Compile(DESC_REGEX)
var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem())
// Body map keys
var (
DubboVersionKey = "dubboVersion"
ArgsTypesKey = "argsTypes"
ArgsKey = "args"
ServiceKey = "service"
AttachmentsKey = "attachments"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"math"
"reflect"
"strconv"
"strings"
"time"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go-hessian2/java_exception"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
type Object interface{}
type HessianSerializer struct {
}
func (h HessianSerializer) Marshal(p DubboPackage) ([]byte, error) {
encoder := hessian.NewEncoder()
if p.IsRequest() {
return marshalRequest(encoder, p)
}
return marshalResponse(encoder, p)
}
func (h HessianSerializer) Unmarshal(input []byte, p *DubboPackage) error {
if p.IsHeartBeat() {
return nil
}
if p.IsRequest() {
return unmarshalRequestBody(input, p)
}
return unmarshalResponseBody(input, p)
}
func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
header := p.Header
response := EnsureResponsePayload(p.Body)
if header.ResponseStatus == Response_OK {
if p.IsHeartBeat() {
encoder.Encode(nil)
} else {
var version string
if attachmentVersion, ok := response.Attachments[DUBBO_VERSION_KEY]; ok {
version = attachmentVersion.(string)
}
atta := isSupportResponseAttachment(version)
var resWithException, resValue, resNullValue int32
if atta {
resWithException = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
resValue = RESPONSE_VALUE_WITH_ATTACHMENTS
resNullValue = RESPONSE_NULL_VALUE_WITH_ATTACHMENTS
} else {
resWithException = RESPONSE_WITH_EXCEPTION
resValue = RESPONSE_VALUE
resNullValue = RESPONSE_NULL_VALUE
}
if response.Exception != nil { // throw error
encoder.Encode(resWithException)
if t, ok := response.Exception.(java_exception.Throwabler); ok {
encoder.Encode(t)
} else {
encoder.Encode(java_exception.NewThrowable(response.Exception.Error()))
}
} else {
if response.RspObj == nil {
encoder.Encode(resNullValue)
} else {
encoder.Encode(resValue)
encoder.Encode(response.RspObj) // result
}
}
if atta {
encoder.Encode(response.Attachments) // attachments
}
}
} else {
if response.Exception != nil { // throw error
encoder.Encode(response.Exception.Error())
} else {
encoder.Encode(response.RspObj)
}
}
bs := encoder.Buffer()
// encNull
bs = append(bs, byte('N'))
return bs, nil
}
func marshalRequest(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
service := p.Service
request := EnsureRequestPayload(p.Body)
encoder.Encode(DEFAULT_DUBBO_PROTOCOL_VERSION)
encoder.Encode(service.Path)
encoder.Encode(service.Version)
encoder.Encode(service.Method)
args, ok := request.Params.([]interface{})
if !ok {
logger.Infof("request args are: %+v", request.Params)
return nil, perrors.Errorf("@params is not of type: []interface{}")
}
types, err := getArgsTypeList(args)
if err != nil {
return nil, perrors.Wrapf(err, " PackRequest(args:%+v)", args)
}
encoder.Encode(types)
for _, v := range args {
encoder.Encode(v)
}
request.Attachments[PATH_KEY] = service.Path
request.Attachments[VERSION_KEY] = service.Version
if len(service.Group) > 0 {
request.Attachments[GROUP_KEY] = service.Group
}
if len(service.Interface) > 0 {
request.Attachments[INTERFACE_KEY] = service.Interface
}
if service.Timeout != 0 {
request.Attachments[TIMEOUT_KEY] = strconv.Itoa(int(service.Timeout / time.Millisecond))
}
encoder.Encode(request.Attachments)
return encoder.Buffer(), nil
}
var versionInt = make(map[string]int)
// https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java#L96
// isSupportResponseAttachment is for compatibility among some dubbo version
func isSupportResponseAttachment(version string) bool {
if version == "" {
return false
}
v, ok := versionInt[version]
if !ok {
v = version2Int(version)
if v == -1 {
return false
}
}
if v >= 2001000 && v <= 2060200 { // 2.0.10 ~ 2.6.2
return false
}
return v >= LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT
}
func version2Int(version string) int {
var v = 0
varr := strings.Split(version, ".")
length := len(varr)
for key, value := range varr {
v0, err := strconv.Atoi(value)
if err != nil {
return -1
}
v += v0 * int(math.Pow10((length-key-1)*2))
}
if length == 3 {
return v * 100
}
return v
}
func unmarshalRequestBody(body []byte, p *DubboPackage) error {
if p.Body == nil {
p.SetBody(make([]interface{}, 7))
}
decoder := hessian.NewDecoder(body)
var (
err error
dubboVersion, target, serviceVersion, method, argsTypes interface{}
args []interface{}
)
req, ok := p.Body.([]interface{})
if !ok {
return perrors.Errorf("@reqObj is not of type: []interface{}")
}
dubboVersion, err = decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
req[0] = dubboVersion
target, err = decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
req[1] = target
serviceVersion, err = decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
req[2] = serviceVersion
method, err = decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
req[3] = method
argsTypes, err = decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
req[4] = argsTypes
ats := hessian.DescRegex.FindAllString(argsTypes.(string), -1)
var arg interface{}
for i := 0; i < len(ats); i++ {
arg, err = decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
args = append(args, arg)
}
req[5] = args
attachments, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
v[DUBBO_VERSION_KEY] = dubboVersion
req[6] = ToMapStringInterface(v)
buildServerSidePackageBody(p)
return nil
}
return perrors.Errorf("get wrong attachments: %+v", attachments)
}
func unmarshalResponseBody(body []byte, p *DubboPackage) error {
decoder := hessian.NewDecoder(body)
rspType, err := decoder.Decode()
if p.Body == nil {
p.SetBody(&ResponsePayload{})
}
if err != nil {
return perrors.WithStack(err)
}
response := EnsureResponsePayload(p.Body)
switch rspType {
case RESPONSE_WITH_EXCEPTION, RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
expt, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
if rspType == RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS {
attachments, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
atta := ToMapStringInterface(v)
response.Attachments = atta
} else {
return perrors.Errorf("get wrong attachments: %+v", attachments)
}
}
if e, ok := expt.(error); ok {
response.Exception = e
} else {
response.Exception = perrors.Errorf("got exception: %+v", expt)
}
return nil
case RESPONSE_VALUE, RESPONSE_VALUE_WITH_ATTACHMENTS:
rsp, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
if rspType == RESPONSE_VALUE_WITH_ATTACHMENTS {
attachments, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
atta := ToMapStringInterface(v)
response.Attachments = atta
} else {
return perrors.Errorf("get wrong attachments: %+v", attachments)
}
}
return perrors.WithStack(hessian.ReflectResponse(rsp, response.RspObj))
case RESPONSE_NULL_VALUE, RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
if rspType == RESPONSE_NULL_VALUE_WITH_ATTACHMENTS {
attachments, err := decoder.Decode()
if err != nil {
return perrors.WithStack(err)
}
if v, ok := attachments.(map[interface{}]interface{}); ok {
atta := ToMapStringInterface(v)
response.Attachments = atta
} else {
return perrors.Errorf("get wrong attachments: %+v", attachments)
}
}
return nil
}
return nil
}
func buildServerSidePackageBody(pkg *DubboPackage) {
req := pkg.GetBody().([]interface{}) // length of body should be 7
if len(req) > 0 {
var dubboVersion, argsTypes string
var args []interface{}
var attachments map[string]interface{}
svc := Service{}
if req[0] != nil {
dubboVersion = req[0].(string)
}
if req[1] != nil {
svc.Path = req[1].(string)
}
if req[2] != nil {
svc.Version = req[2].(string)
}
if req[3] != nil {
svc.Method = req[3].(string)
}
if req[4] != nil {
argsTypes = req[4].(string)
}
if req[5] != nil {
args = req[5].([]interface{})
}
if req[6] != nil {
attachments = req[6].(map[string]interface{})
}
if svc.Path == "" && attachments[constant.PATH_KEY] != nil && len(attachments[constant.PATH_KEY].(string)) > 0 {
svc.Path = attachments[constant.PATH_KEY].(string)
}
if _, ok := attachments[constant.INTERFACE_KEY]; ok {
svc.Interface = attachments[constant.INTERFACE_KEY].(string)
} else {
svc.Interface = svc.Path
}
if _, ok := attachments[constant.GROUP_KEY]; ok {
svc.Group = attachments[constant.GROUP_KEY].(string)
}
pkg.SetService(svc)
pkg.SetBody(map[string]interface{}{
"dubboVersion": dubboVersion,
"argsTypes": argsTypes,
"args": args,
"service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key
"attachments": attachments,
})
}
}
func getArgsTypeList(args []interface{}) (string, error) {
var (
typ string
types string
)
for i := range args {
typ = getArgType(args[i])
if typ == "" {
return types, perrors.Errorf("cat not get arg %#v type", args[i])
}
if !strings.Contains(typ, ".") {
types += typ
} else if strings.Index(typ, "[") == 0 {
types += strings.Replace(typ, ".", "/", -1)
} else {
// java.util.List -> Ljava/util/List;
types += "L" + strings.Replace(typ, ".", "/", -1) + ";"
}
}
return types, nil
}
func getArgType(v interface{}) string {
if v == nil {
return "V"
}
switch v.(type) {
// Serialized tags for base types
case nil:
return "V"
case bool:
return "Z"
case []bool:
return "[Z"
case byte:
return "B"
case []byte:
return "[B"
case int8:
return "B"
case []int8:
return "[B"
case int16:
return "S"
case []int16:
return "[S"
case uint16: // Equivalent to Char of Java
return "C"
case []uint16:
return "[C"
// case rune:
// return "C"
case int:
return "J"
case []int:
return "[J"
case int32:
return "I"
case []int32:
return "[I"
case int64:
return "J"
case []int64:
return "[J"
case time.Time:
return "java.util.Date"
case []time.Time:
return "[Ljava.util.Date"
case float32:
return "F"
case []float32:
return "[F"
case float64:
return "D"
case []float64:
return "[D"
case string:
return "java.lang.String"
case []string:
return "[Ljava.lang.String;"
case []Object:
return "[Ljava.lang.Object;"
case map[interface{}]interface{}:
// return "java.util.HashMap"
return "java.util.Map"
case hessian.POJOEnum:
return v.(hessian.POJOEnum).JavaClassName()
// Serialized tags for complex types
default:
t := reflect.TypeOf(v)
if reflect.Ptr == t.Kind() {
t = reflect.TypeOf(reflect.ValueOf(v).Elem())
}
switch t.Kind() {
case reflect.Struct:
return "java.lang.Object"
case reflect.Slice, reflect.Array:
if t.Elem().Kind() == reflect.Struct {
return "[Ljava.lang.Object;"
}
// return "java.util.ArrayList"
return "java.util.List"
case reflect.Map: // Enter here, map may be map[string]int
return "java.util.Map"
default:
return ""
}
}
// unreachable
// return "java.lang.RuntimeException"
}
func ToMapStringInterface(origin map[interface{}]interface{}) map[string]interface{} {
dest := make(map[string]interface{}, len(origin))
for k, v := range origin {
if kv, ok := k.(string); ok {
if v == nil {
dest[kv] = ""
continue
}
dest[kv] = v
}
}
return dest
}
func init() {
SetSerializer("hessian2", HessianSerializer{})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"bufio"
"bytes"
"fmt"
"time"
)
import (
"github.com/pkg/errors"
)
type PackageType int
// enum part
const (
PackageError = PackageType(0x01)
PackageRequest = PackageType(0x02)
PackageResponse = PackageType(0x04)
PackageHeartbeat = PackageType(0x08)
PackageRequest_TwoWay = PackageType(0x10)
PackageResponse_Exception = PackageType(0x20)
PackageType_BitSize = 0x2f
)
type DubboHeader struct {
SerialID byte
Type PackageType
ID int64
BodyLen int
ResponseStatus byte
}
// Service defines service instance
type Service struct {
Path string
Interface string
Group string
Version string
Method string
Timeout time.Duration // request timeout
}
type DubboPackage struct {
Header DubboHeader
Service Service
Body interface{}
Err error
Codec *ProtocolCodec
}
func (p DubboPackage) String() string {
return fmt.Sprintf("HessianPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
}
func (p *DubboPackage) ReadHeader() error {
return p.Codec.ReadHeader(&p.Header)
}
func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
if p.Codec == nil {
return nil, errors.New("Codec is nil")
}
pkg, err := p.Codec.Encode(*p)
if err != nil {
return nil, errors.WithStack(err)
}
return bytes.NewBuffer(pkg), nil
}
func (p *DubboPackage) Unmarshal() error {
if p.Codec == nil {
return errors.New("Codec is nil")
}
return p.Codec.Decode(p)
}
func (p DubboPackage) IsHeartBeat() bool {
return p.Header.Type&PackageHeartbeat != 0
}
func (p DubboPackage) IsRequest() bool {
return p.Header.Type&(PackageRequest_TwoWay|PackageRequest) != 0
}
func (p DubboPackage) IsResponse() bool {
return p.Header.Type == PackageResponse
}
func (p DubboPackage) IsResponseWithException() bool {
flag := PackageResponse | PackageResponse_Exception
return p.Header.Type&flag == flag
}
func (p DubboPackage) GetBodyLen() int {
return p.Header.BodyLen
}
func (p DubboPackage) GetLen() int {
return HEADER_LENGTH + p.Header.BodyLen
}
func (p DubboPackage) GetBody() interface{} {
return p.Body
}
func (p *DubboPackage) SetBody(body interface{}) {
p.Body = body
}
func (p *DubboPackage) SetHeader(header DubboHeader) {
p.Header = header
}
func (p *DubboPackage) SetService(svc Service) {
p.Service = svc
}
func (p *DubboPackage) SetID(id int64) {
p.Header.ID = id
}
func (p DubboPackage) GetHeader() DubboHeader {
return p.Header
}
func (p DubboPackage) GetService() Service {
return p.Service
}
func (p *DubboPackage) SetResponseStatus(status byte) {
p.Header.ResponseStatus = status
}
func (p *DubboPackage) SetSerializer(serializer Serializer) {
p.Codec.SetSerializer(serializer)
}
func NewDubboPackage(data *bytes.Buffer) *DubboPackage {
var codec *ProtocolCodec
if data == nil {
codec = NewDubboCodec(nil)
} else {
codec = NewDubboCodec(bufio.NewReaderSize(data, len(data.Bytes())))
}
return &DubboPackage{
Header: DubboHeader{},
Service: Service{},
Body: nil,
Err: nil,
Codec: codec,
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
type RequestPayload struct {
Params interface{}
Attachments map[string]interface{}
}
func NewRequestPayload(args interface{}, atta map[string]interface{}) *RequestPayload {
if atta == nil {
atta = make(map[string]interface{})
}
return &RequestPayload{
Params: args,
Attachments: atta,
}
}
func EnsureRequestPayload(body interface{}) *RequestPayload {
if req, ok := body.(*RequestPayload); ok {
return req
}
return NewRequestPayload(body, nil)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
type ResponsePayload struct {
RspObj interface{}
Exception error
Attachments map[string]interface{}
}
// NewResponse create a new ResponsePayload
func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]interface{}) *ResponsePayload {
if attachments == nil {
attachments = make(map[string]interface{})
}
return &ResponsePayload{
RspObj: rspObj,
Exception: exception,
Attachments: attachments,
}
}
func EnsureResponsePayload(body interface{}) *ResponsePayload {
if res, ok := body.(*ResponsePayload); ok {
return res
}
if exp, ok := body.(error); ok {
return NewResponsePayload(nil, exp, nil)
}
return NewResponsePayload(body, nil, nil)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment