Skip to content
Snippets Groups Projects
Unverified Commit e21eb408 authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #736 from georgehao/refact-seri

Refact seri
parents 566a7e35 2d8d2f02
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/remoting/getty"
)
......@@ -40,7 +41,7 @@ const (
"side=provider&timeout=3000&timestamp=1556509797245"
)
func init() {
func initDubboInvokerTest() {
getty.SetServerConfig(getty.ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
......@@ -80,7 +81,9 @@ func init() {
},
})
}
func TestDubboProtocol_Export(t *testing.T) {
initDubboInvokerTest()
srvCfg := getty.GetDefaultServerConfig()
getty.SetServerConfig(srvCfg)
// Export
......@@ -117,6 +120,7 @@ func TestDubboProtocol_Export(t *testing.T) {
func TestDubboProtocol_Refer_No_connect(t *testing.T) {
// Refer
initDubboInvokerTest()
proto := GetProtocol()
url, err := common.NewURL(mockCommonUrl)
assert.NoError(t, err)
......@@ -125,15 +129,18 @@ func TestDubboProtocol_Refer_No_connect(t *testing.T) {
}
func TestDubboProtocol_Refer(t *testing.T) {
initDubboInvokerTest()
cliCfg := getty.GetDefaultClientConfig()
getty.SetClientConf(cliCfg)
// Refer
proto := GetProtocol()
url, err := common.NewURL(mockCommonUrl)
proto.Export(&proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
})
assert.NoError(t, err)
invoker := proto.Refer(url)
// make sure url
eq := invoker.GetUrl().URLEqual(url)
assert.True(t, eq)
......
......@@ -20,9 +20,20 @@ package getty
// it is used to unit test.
import (
"bytes"
"strconv"
"time"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/dubbo/impl"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
)
......@@ -34,26 +45,232 @@ func init() {
type DubboTestCodec struct {
}
// encode request for transport
func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
return &bytes.Buffer{}, nil
if request.Event {
return c.encodeHeartbeartReqeust(request)
}
invoc, ok := request.Data.(*invocation.RPCInvocation)
if !ok {
return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
}
invocation := *invoc
svc := impl.Service{}
svc.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
svc.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
svc.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
svc.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
svc.Method = invocation.MethodName()
timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT)))
if err != nil {
// it will be wrapped in readwrite.Write .
return nil, perrors.WithStack(err)
}
svc.Timeout = time.Duration(timeout)
header := impl.DubboHeader{}
serialization := invocation.AttachmentsByKey(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION)
if serialization == constant.PROTOBUF_SERIALIZATION {
header.SerialID = constant.S_Proto
} else {
header.SerialID = constant.S_Hessian2
}
header.ID = request.ID
if request.TwoWay {
header.Type = impl.PackageRequest_TwoWay
} else {
header.Type = impl.PackageRequest
}
pkg := &impl.DubboPackage{
Header: header,
Service: svc,
Body: impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
Err: nil,
Codec: impl.NewDubboCodec(nil),
}
if err := impl.LoadSerializer(pkg); err != nil {
return nil, perrors.WithStack(err)
}
return pkg.Marshal()
}
// encode heartbeart request
func (c *DubboTestCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
return &bytes.Buffer{}, nil
header := impl.DubboHeader{
Type: impl.PackageHeartbeat,
SerialID: constant.S_Hessian2,
ID: request.ID,
}
pkg := &impl.DubboPackage{
Header: header,
Service: impl.Service{},
Body: impl.NewRequestPayload([]interface{}{}, nil),
Err: nil,
Codec: impl.NewDubboCodec(nil),
}
if err := impl.LoadSerializer(pkg); err != nil {
return nil, err
}
return pkg.Marshal()
}
// encode response
func (c *DubboTestCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
return &bytes.Buffer{}, nil
var ptype = impl.PackageResponse
if response.IsHeartbeat() {
ptype = impl.PackageHeartbeat
}
resp := &impl.DubboPackage{
Header: impl.DubboHeader{
SerialID: response.SerialID,
Type: ptype,
ID: response.ID,
ResponseStatus: response.Status,
},
}
if !response.IsHeartbeat() {
resp.Body = &impl.ResponsePayload{
RspObj: response.Result.(protocol.RPCResult).Rest,
Exception: response.Result.(protocol.RPCResult).Err,
Attachments: response.Result.(protocol.RPCResult).Attrs,
}
}
codec := impl.NewDubboCodec(nil)
pkg, err := codec.Encode(*resp)
if err != nil {
return nil, perrors.WithStack(err)
}
return bytes.NewBuffer(pkg), nil
}
// Decode data, including request and response.
func (c *DubboTestCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
return remoting.DecodeResult{}, 0, nil
if c.isRequest(data) {
req, len, err := c.decodeRequest(data)
if err != nil {
return remoting.DecodeResult{}, len, perrors.WithStack(err)
}
return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
} else {
resp, len, err := c.decodeResponse(data)
if err != nil {
return remoting.DecodeResult{}, len, perrors.WithStack(err)
}
return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
}
}
func (c *DubboTestCodec) isRequest(data []byte) bool {
if data[2]&byte(0x80) == 0x00 {
return false
}
return true
}
// decode request
func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
return nil, 0, nil
var request *remoting.Request = nil
buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
pkg.SetBody(make([]interface{}, 7))
err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
//FIXME
return nil, 0, originErr
}
return request, 0, perrors.WithStack(err)
}
request = &remoting.Request{
ID: pkg.Header.ID,
SerialID: pkg.Header.SerialID,
TwoWay: pkg.Header.Type&impl.PackageRequest_TwoWay != 0x00,
Event: pkg.Header.Type&impl.PackageHeartbeat != 0x00,
}
if (pkg.Header.Type & impl.PackageHeartbeat) == 0x00 {
// convert params of request
req := pkg.Body.(map[string]interface{})
//invocation := request.Data.(*invocation.RPCInvocation)
var methodName string
var args []interface{}
var attachments map[string]string = make(map[string]string)
if req[impl.DubboVersionKey] != nil {
//dubbo version
request.Version = req[impl.DubboVersionKey].(string)
}
//path
attachments[constant.PATH_KEY] = pkg.Service.Path
//version
attachments[constant.VERSION_KEY] = pkg.Service.Version
//method
methodName = pkg.Service.Method
args = req[impl.ArgsKey].([]interface{})
attachments = req[impl.AttachmentsKey].(map[string]string)
invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
invocation.WithArguments(args), invocation.WithMethodName(methodName))
request.Data = invoc
}
return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
// decode response
func (c *DubboTestCodec) decodeResponse(data []byte) (*remoting.Response, int, error) {
return nil, 0, nil
buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
response := &remoting.Response{}
err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
// if the data is very big, so the receive need much times.
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return nil, 0, originErr
}
return nil, 0, perrors.WithStack(err)
}
response = &remoting.Response{
ID: pkg.Header.ID,
//Version: pkg.Header.,
SerialID: pkg.Header.SerialID,
Status: pkg.Header.ResponseStatus,
Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0,
}
var error error
if pkg.Header.Type&impl.PackageHeartbeat != 0x00 {
if pkg.Header.Type&impl.PackageResponse != 0x00 {
if pkg.Err != nil {
error = pkg.Err
}
} else {
response.Status = hessian.Response_OK
//reply(session, p, hessian.PackageHeartbeat)
}
return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error
}
rpcResult := &protocol.RPCResult{}
response.Result = rpcResult
if pkg.Header.Type&impl.PackageRequest == 0x00 {
if pkg.Err != nil {
rpcResult.Err = pkg.Err
} else if pkg.Body.(*impl.ResponsePayload).Exception != nil {
rpcResult.Err = pkg.Body.(*impl.ResponsePayload).Exception
response.Error = rpcResult.Err
}
rpcResult.Attrs = pkg.Body.(*impl.ResponsePayload).Attachments
rpcResult.Rest = pkg.Body.(*impl.ResponsePayload).RspObj
}
return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
......@@ -48,7 +48,6 @@ func TestRunSuite(t *testing.T) {
testRequestOneWay(t, svr, url, client)
testClient_Call(t, svr, url, client)
testClient_AsyncCall(t, svr, url, client)
svr.Stop()
}
......@@ -106,15 +105,10 @@ func testClient_Call(t *testing.T, svr *Server, url common.URL, c *Client) {
testGetUser5(t, c)
testGetUser6(t, c)
testGetUser61(t, c)
}
func testGetBigPkg(t *testing.T, c *Client) {
var (
user *User
err error
)
user = &User{}
func testGetBigPkg(t *testing.T, c *Client) {
user := &User{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetBigPkg", nil, nil, []interface{}{[]interface{}{nil}, user},
[]reflect.Value{reflect.ValueOf([]interface{}{nil}), reflect.ValueOf(user)})
......@@ -126,17 +120,14 @@ func testGetBigPkg(t *testing.T, c *Client) {
pendingResponse := remoting.NewPendingResponse(request.ID)
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 8*time.Second, pendingResponse)
err := c.Request(request, 8*time.Second, pendingResponse)
assert.NoError(t, err)
assert.NotEqual(t, "", user.Id)
assert.NotEqual(t, "", user.Name)
}
func testGetUser(t *testing.T, c *Client) {
var (
user *User
err error
)
user = &User{}
user := &User{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser", nil, nil, []interface{}{"1", "username"},
[]reflect.Value{reflect.ValueOf("1"), reflect.ValueOf("username")})
......@@ -148,7 +139,7 @@ func testGetUser(t *testing.T, c *Client) {
pendingResponse := remoting.NewPendingResponse(request.ID)
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 3*time.Second, pendingResponse)
err := c.Request(request, 3*time.Second, pendingResponse)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
}
......@@ -175,6 +166,7 @@ func testGetUser0(t *testing.T, c *Client) {
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
}
func testGetUser1(t *testing.T, c *Client) {
var (
err error
......@@ -194,6 +186,7 @@ func testGetUser1(t *testing.T, c *Client) {
err = c.Request(request, 3*time.Second, pendingResponse)
assert.NoError(t, err)
}
func testGetUser2(t *testing.T, c *Client) {
var (
err error
......@@ -211,6 +204,7 @@ func testGetUser2(t *testing.T, c *Client) {
err = c.Request(request, 3*time.Second, pendingResponse)
assert.EqualError(t, err, "error")
}
func testGetUser3(t *testing.T, c *Client) {
var (
err error
......@@ -231,6 +225,7 @@ func testGetUser3(t *testing.T, c *Client) {
assert.NoError(t, err)
assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
}
func testGetUser4(t *testing.T, c *Client) {
var (
err error
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment