diff --git a/common/constant/key.go b/common/constant/key.go index 07335bed599788b0240b28b096c7d7a395ee9c11..d8eff3a479b415c818d348475dca8baf3539cdf5 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -22,25 +22,26 @@ const ( ) const ( - GROUP_KEY = "group" - VERSION_KEY = "version" - INTERFACE_KEY = "interface" - PATH_KEY = "path" - SERVICE_KEY = "service" - METHODS_KEY = "methods" - TIMEOUT_KEY = "timeout" - CATEGORY_KEY = "category" - CHECK_KEY = "check" - ENABLED_KEY = "enabled" - SIDE_KEY = "side" - OVERRIDE_PROVIDERS_KEY = "providerAddresses" - BEAN_NAME_KEY = "bean.name" - GENERIC_KEY = "generic" - CLASSIFIER_KEY = "classifier" - TOKEN_KEY = "token" - LOCAL_ADDR = "local-addr" - REMOTE_ADDR = "remote-addr" - PATH_SEPARATOR = "/" + GROUP_KEY = "group" + VERSION_KEY = "version" + INTERFACE_KEY = "interface" + PATH_KEY = "path" + SERVICE_KEY = "service" + METHODS_KEY = "methods" + TIMEOUT_KEY = "timeout" + CATEGORY_KEY = "category" + CHECK_KEY = "check" + ENABLED_KEY = "enabled" + SIDE_KEY = "side" + OVERRIDE_PROVIDERS_KEY = "providerAddresses" + BEAN_NAME_KEY = "bean.name" + GENERIC_KEY = "generic" + CLASSIFIER_KEY = "classifier" + TOKEN_KEY = "token" + LOCAL_ADDR = "local-addr" + REMOTE_ADDR = "remote-addr" + PATH_SEPARATOR = "/" + DEFAULT_REMOTING_TIMEOUT = 3000 ) const ( diff --git a/common/url.go b/common/url.go index ebb648db27c3efff534f0d0a545f2211f335aa89..01c623ed5aedc8c30df21434e29b0a57ed918ab5 100644 --- a/common/url.go +++ b/common/url.go @@ -318,12 +318,15 @@ func (c URL) Key() string { // ServiceKey ... func (c URL) ServiceKey() string { - intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) + return ServiceKey(c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")), + c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, "")) +} + +func ServiceKey(intf string, group string, version string) string { if intf == "" { return "" } buf := &bytes.Buffer{} - group := c.GetParam(constant.GROUP_KEY, "") if group != "" { buf.WriteString(group) buf.WriteString("/") @@ -331,7 +334,6 @@ func (c URL) ServiceKey() string { buf.WriteString(intf) - version := c.GetParam(constant.VERSION_KEY, "") if version != "" && version != "0.0.0" { buf.WriteString(":") buf.WriteString(version) diff --git a/config/application_config.go b/config/application_config.go index 23ab7d34aceaba02d7f592906d6f4e3d6cf36dae..1d9306c5a29e64040b791e51b7146e599026287d 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -40,7 +40,7 @@ func (*ApplicationConfig) Prefix() string { return constant.DUBBO + ".application." } -// Id ... +// nolint func (c *ApplicationConfig) Id() string { return "" } diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go deleted file mode 100644 index 5ec7db51af0ddfa6e49d3c65910355f0bf2de414..0000000000000000000000000000000000000000 --- a/protocol/dubbo/client.go +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dubbo - -import ( - "math/rand" - "strings" - "sync" - "time" -) - -import ( - hessian "github.com/apache/dubbo-go-hessian2" - "github.com/dubbogo/getty" - gxsync "github.com/dubbogo/gost/sync" - perrors "github.com/pkg/errors" - "go.uber.org/atomic" - "gopkg.in/yaml.v2" -) - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/config" -) - -var ( - errInvalidCodecType = perrors.New("illegal CodecType") - errInvalidAddress = perrors.New("remote address invalid or empty") - errSessionNotExist = perrors.New("session not exist") - errClientClosed = perrors.New("client closed") - errClientReadTimeout = perrors.New("client read timeout") - - clientConf *ClientConfig - clientGrpool *gxsync.TaskPool -) - -func init() { - - // load clientconfig from consumer_config - // default use dubbo - consumerConfig := config.GetConsumerConfig() - if consumerConfig.ApplicationConfig == nil { - return - } - protocolConf := config.GetConsumerConfig().ProtocolConf - defaultClientConfig := GetDefaultClientConfig() - if protocolConf == nil { - logger.Info("protocol_conf default use dubbo config") - } else { - dubboConf := protocolConf.(map[interface{}]interface{})[DUBBO] - if dubboConf == nil { - logger.Warnf("dubboConf is nil") - return - } - dubboConfByte, err := yaml.Marshal(dubboConf) - if err != nil { - panic(err) - } - err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig) - if err != nil { - panic(err) - } - } - clientConf = &defaultClientConfig - if err := clientConf.CheckValidity(); err != nil { - logger.Warnf("[CheckValidity] error: %v", err) - return - } - setClientGrpool() - - rand.Seed(time.Now().UnixNano()) -} - -// SetClientConf ... -func SetClientConf(c ClientConfig) { - clientConf = &c - err := clientConf.CheckValidity() - if err != nil { - logger.Warnf("[ClientConfig CheckValidity] error: %v", err) - return - } - setClientGrpool() -} - -// GetClientConf ... -func GetClientConf() ClientConfig { - return *clientConf -} - -func setClientGrpool() { - if clientConf.GrPoolSize > 1 { - clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), - gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) - } -} - -// Options ... -type Options struct { - // connect timeout - ConnectTimeout time.Duration - // request timeout - RequestTimeout time.Duration -} - -//AsyncCallbackResponse async response for dubbo -type AsyncCallbackResponse struct { - common.CallbackResponse - Opts Options - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} -} - -// Client ... -type Client struct { - opts Options - conf ClientConfig - pool *gettyRPCClientPool - sequence atomic.Uint64 - - pendingResponses *sync.Map -} - -// NewClient ... -func NewClient(opt Options) *Client { - - switch { - case opt.ConnectTimeout == 0: - opt.ConnectTimeout = 3 * time.Second - fallthrough - case opt.RequestTimeout == 0: - opt.RequestTimeout = 3 * time.Second - } - - // make sure that client request sequence is an odd number - initSequence := uint64(rand.Int63n(time.Now().UnixNano())) - if initSequence%2 == 0 { - initSequence++ - } - - c := &Client{ - opts: opt, - pendingResponses: new(sync.Map), - conf: *clientConf, - } - c.sequence.Store(initSequence) - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - - return c -} - -// Request ... -type Request struct { - addr string - svcUrl common.URL - method string - args interface{} - atta map[string]string -} - -// NewRequest ... -func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request { - return &Request{ - addr: addr, - svcUrl: svcUrl, - method: method, - args: args, - atta: atta, - } -} - -// Response ... -type Response struct { - reply interface{} - atta map[string]string -} - -// NewResponse ... -func NewResponse(reply interface{}, atta map[string]string) *Response { - return &Response{ - reply: reply, - atta: atta, - } -} - -// CallOneway call one way -func (c *Client) CallOneway(request *Request) error { - - return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) -} - -// Call if @response is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(request *Request, response *Response) error { - - ct := CT_TwoWay - if response.reply == nil { - ct = CT_OneWay - } - - return perrors.WithStack(c.call(ct, request, response, nil)) -} - -// AsyncCall ... -func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error { - - return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) -} - -func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error { - - p := &DubboPackage{} - p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") - p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "") - p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") - p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") - p.Service.Method = request.method - - p.Service.Timeout = c.opts.RequestTimeout - var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") - if len(timeout) != 0 { - if t, err := time.ParseDuration(timeout); err == nil { - p.Service.Timeout = t - } - } - - p.Header.SerialID = byte(S_Dubbo) - p.Body = hessian.NewRequest(request.args, request.atta) - - var rsp *PendingResponse - if ct != CT_OneWay { - p.Header.Type = hessian.PackageRequest_TwoWay - rsp = NewPendingResponse() - rsp.response = response - rsp.callback = callback - } else { - p.Header.Type = hessian.PackageRequest - } - - var ( - err error - session getty.Session - conn *gettyRPCClient - ) - conn, session, err = c.selectSession(request.addr) - if err != nil { - return perrors.WithStack(err) - } - if session == nil { - return errSessionNotExist - } - defer func() { - if err == nil { - c.pool.put(conn) - return - } - conn.close() - }() - - if err = c.transfer(session, p, rsp); err != nil { - return perrors.WithStack(err) - } - - if ct == CT_OneWay || callback != nil { - return nil - } - - select { - case <-getty.GetTimeWheel().After(c.opts.RequestTimeout): - c.removePendingResponse(SequenceType(rsp.seq)) - return perrors.WithStack(errClientReadTimeout) - case <-rsp.done: - err = rsp.err - } - - return perrors.WithStack(err) -} - -// Close ... -func (c *Client) Close() { - if c.pool != nil { - c.pool.close() - } - c.pool = nil -} - -func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { - rpcClient, err := c.pool.getGettyRpcClient(DUBBO, addr) - if err != nil { - return nil, nil, perrors.WithStack(err) - } - return rpcClient, rpcClient.selectSession(), nil -} - -func (c *Client) heartbeat(session getty.Session) error { - return c.transfer(session, nil, NewPendingResponse()) -} - -func (c *Client) transfer(session getty.Session, pkg *DubboPackage, - rsp *PendingResponse) error { - - var ( - sequence uint64 - err error - ) - - sequence = c.sequence.Add(1) - - if pkg == nil { - pkg = &DubboPackage{} - pkg.Body = hessian.NewRequest([]interface{}{}, nil) - pkg.Body = []interface{}{} - pkg.Header.Type = hessian.PackageHeartbeat - pkg.Header.SerialID = byte(S_Dubbo) - } - pkg.Header.ID = int64(sequence) - - // cond1 - if rsp != nil { - rsp.seq = sequence - c.addPendingResponse(rsp) - } - - err = session.WritePkg(pkg, c.opts.RequestTimeout) - if err != nil { - c.removePendingResponse(SequenceType(rsp.seq)) - } else if rsp != nil { // cond2 - // cond2 should not merged with cond1. cause the response package may be returned very - // soon and it will be handled by other goroutine. - rsp.readStart = time.Now() - } - - return perrors.WithStack(err) -} - -func (c *Client) addPendingResponse(pr *PendingResponse) { - c.pendingResponses.Store(SequenceType(pr.seq), pr) -} - -func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse { - if c.pendingResponses == nil { - return nil - } - if presp, ok := c.pendingResponses.Load(seq); ok { - c.pendingResponses.Delete(seq) - return presp.(*PendingResponse) - } - return nil -} diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go deleted file mode 100644 index 1e0a73fac1a6cf6d4d102e5f4f6f1ba60fc4102a..0000000000000000000000000000000000000000 --- a/protocol/dubbo/client_test.go +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dubbo - -import ( - "bytes" - "context" - "sync" - "testing" - "time" -) - -import ( - hessian "github.com/apache/dubbo-go-hessian2" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" -) - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/proxy/proxy_factory" - "github.com/apache/dubbo-go/protocol" -) - -func TestClient_CallOneway(t *testing.T) { - proto, url := InitTest(t) - - c := &Client{ - pendingResponses: new(sync.Map), - conf: *clientConf, - opts: Options{ - ConnectTimeout: 3e9, - RequestTimeout: 6e9, - }, - } - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - - //user := &User{} - err := c.CallOneway(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil)) - assert.NoError(t, err) - - // destroy - proto.Destroy() -} - -func TestClient_Call(t *testing.T) { - proto, url := InitTest(t) - - c := &Client{ - pendingResponses: new(sync.Map), - conf: *clientConf, - opts: Options{ - ConnectTimeout: 3e9, - RequestTimeout: 10e9, - }, - } - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - - var ( - user *User - err error - ) - - user = &User{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, nil), NewResponse(user, nil)) - assert.NoError(t, err) - assert.NotEqual(t, "", user.Id) - assert.NotEqual(t, "", user.Name) - - user = &User{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), NewResponse(user, nil)) - assert.NoError(t, err) - assert.Equal(t, User{Id: "1", Name: "username"}, *user) - - user = &User{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, nil), NewResponse(user, nil)) - assert.NoError(t, err) - assert.Equal(t, User{Id: "1", Name: "username"}, *user) - - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser1", []interface{}{}, nil), NewResponse(user, nil)) - assert.NoError(t, err) - - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser2", []interface{}{}, nil), NewResponse(user, nil)) - assert.EqualError(t, err, "error") - - user2 := []interface{}{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser3", []interface{}{}, nil), NewResponse(&user2, nil)) - assert.NoError(t, err) - assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) - - user2 = []interface{}{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil), NewResponse(&user2, nil)) - assert.NoError(t, err) - assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) - - user3 := map[interface{}]interface{}{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil), NewResponse(&user3, nil)) - assert.NoError(t, err) - assert.NotNil(t, user3) - assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"]) - - user = &User{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, nil), NewResponse(user, nil)) - assert.NoError(t, err) - assert.Equal(t, User{Id: "", Name: ""}, *user) - - user = &User{} - err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, nil), NewResponse(user, nil)) - assert.NoError(t, err) - assert.Equal(t, User{Id: "1", Name: ""}, *user) - - // destroy - proto.Destroy() -} - -func TestClient_AsyncCall(t *testing.T) { - proto, url := InitTest(t) - - c := &Client{ - pendingResponses: new(sync.Map), - conf: *clientConf, - opts: Options{ - ConnectTimeout: 3e9, - RequestTimeout: 6e9, - }, - } - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - - user := &User{} - lock := sync.Mutex{} - lock.Lock() - err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) { - r := response.(AsyncCallbackResponse) - assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) - lock.Unlock() - }, NewResponse(user, nil)) - assert.NoError(t, err) - assert.Equal(t, User{}, *user) - - // destroy - lock.Lock() - proto.Destroy() - lock.Unlock() -} - -func InitTest(t *testing.T) (protocol.Protocol, common.URL) { - - hessian.RegisterPOJO(&User{}) - - methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) - assert.NoError(t, err) - assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods) - - // config - SetClientConf(ClientConfig{ - ConnectionNum: 2, - HeartbeatPeriod: "5s", - SessionTimeout: "20s", - PoolTTL: 600, - PoolSize: 64, - GettySessionParam: GettySessionParam{ - CompressEncoding: false, - TcpNoDelay: true, - TcpKeepAlive: true, - KeepAlivePeriod: "120s", - TcpRBufSize: 262144, - TcpWBufSize: 65536, - PkgWQSize: 512, - TcpReadTimeout: "4s", - TcpWriteTimeout: "5s", - WaitTimeout: "1s", - MaxMsgLen: 10240000000, - SessionName: "client", - }, - }) - assert.NoError(t, clientConf.CheckValidity()) - SetServerConfig(ServerConfig{ - SessionNumber: 700, - SessionTimeout: "20s", - GettySessionParam: GettySessionParam{ - CompressEncoding: false, - TcpNoDelay: true, - TcpKeepAlive: true, - KeepAlivePeriod: "120s", - TcpRBufSize: 262144, - TcpWBufSize: 65536, - PkgWQSize: 512, - TcpReadTimeout: "1s", - TcpWriteTimeout: "5s", - WaitTimeout: "1s", - MaxMsgLen: 10240000000, - SessionName: "server", - }}) - assert.NoError(t, srvConf.CheckValidity()) - - // Export - proto := GetProtocol() - url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" + - "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + - "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + - "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + - "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") - assert.NoError(t, err) - proto.Export(&proxy_factory.ProxyInvoker{ - BaseInvoker: *protocol.NewBaseInvoker(url), - }) - - time.Sleep(time.Second * 2) - - return proto, url -} - -////////////////////////////////// -// provider -////////////////////////////////// - -type ( - User struct { - Id string `json:"id"` - Name string `json:"name"` - } - - UserProvider struct { - user map[string]User - } -) - -// size:4801228 -func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error { - argBuf := new(bytes.Buffer) - for i := 0; i < 4000; i++ { - argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") - argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") - } - rsp.Id = argBuf.String() - rsp.Name = argBuf.String() - return nil -} - -func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error { - rsp.Id = req[0].(string) - rsp.Name = req[1].(string) - return nil -} - -func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) { - return User{Id: id, Name: name}, nil -} - -func (u *UserProvider) GetUser1() error { - return nil -} - -func (u *UserProvider) GetUser2() error { - return perrors.New("error") -} - -func (u *UserProvider) GetUser3(rsp *[]interface{}) error { - *rsp = append(*rsp, User{Id: "1", Name: "username"}) - return nil -} - -func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) { - - return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil -} - -func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) { - return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil -} - -func (u *UserProvider) GetUser6(id int64) (*User, error) { - if id == 0 { - return nil, nil - } - return &User{Id: "1"}, nil -} - -func (u *UserProvider) Reference() string { - return "UserProvider" -} - -func (u User) JavaClassName() string { - return "com.ikurento.user.User" -} diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go deleted file mode 100644 index 76416b2baf1e1db516c00d92ecb8ad618bf186bd..0000000000000000000000000000000000000000 --- a/protocol/dubbo/codec.go +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dubbo - -import ( - "bufio" - "bytes" - "fmt" - "time" -) - -import ( - "github.com/apache/dubbo-go-hessian2" - "github.com/apache/dubbo-go/common" - perrors "github.com/pkg/errors" -) - -//SerialID serial ID -type SerialID byte - -const ( - // S_Dubbo dubbo serial id - S_Dubbo SerialID = 2 -) - -//CallType call type -type CallType int32 - -const ( - // CT_UNKNOWN unknown call type - CT_UNKNOWN CallType = 0 - // CT_OneWay call one way - CT_OneWay CallType = 1 - // CT_TwoWay call in request/response - CT_TwoWay CallType = 2 -) - -//////////////////////////////////////////// -// dubbo package -//////////////////////////////////////////// - -// SequenceType ... -type SequenceType int64 - -// DubboPackage ... -type DubboPackage struct { - Header hessian.DubboHeader - Service hessian.Service - Body interface{} - Err error -} - -func (p DubboPackage) String() string { - return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) -} - -// Marshal ... -func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { - codec := hessian.NewHessianCodec(nil) - - pkg, err := codec.Write(p.Service, p.Header, p.Body) - if err != nil { - return nil, perrors.WithStack(err) - } - - return bytes.NewBuffer(pkg), nil -} - -// Unmarshal ... -func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { - // fix issue https://github.com/apache/dubbo-go/issues/380 - bufLen := buf.Len() - if bufLen < hessian.HEADER_LENGTH { - return perrors.WithStack(hessian.ErrHeaderNotEnough) - } - - codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen)) - - // read header - err := codec.ReadHeader(&p.Header) - if err != nil { - return perrors.WithStack(err) - } - - if len(opts) != 0 { // for client - client, ok := opts[0].(*Client) - if !ok { - return perrors.Errorf("opts[0] is not of type *Client") - } - - if p.Header.Type&hessian.PackageRequest != 0x00 { - // size of this array must be '7' - // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 - p.Body = make([]interface{}, 7) - } else { - pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) - if !ok { - return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) - } - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} - } - } - - // read body - err = codec.ReadBody(p.Body) - return perrors.WithStack(err) -} - -//////////////////////////////////////////// -// PendingResponse -//////////////////////////////////////////// - -// PendingResponse ... -type PendingResponse struct { - seq uint64 - err error - start time.Time - readStart time.Time - callback common.AsyncCallback - response *Response - done chan struct{} -} - -// NewPendingResponse ... -func NewPendingResponse() *PendingResponse { - return &PendingResponse{ - start: time.Now(), - response: &Response{}, - done: make(chan struct{}), - } -} - -// GetCallResponse ... -func (r PendingResponse) GetCallResponse() common.CallbackResponse { - return AsyncCallbackResponse{ - Cause: r.err, - Start: r.start, - ReadStart: r.readStart, - Reply: r.response, - } -} diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go deleted file mode 100644 index 5dc71f0d080c8c862d68029c7983a4407913307e..0000000000000000000000000000000000000000 --- a/protocol/dubbo/codec_test.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dubbo - -import ( - "bytes" - "testing" - "time" -) - -import ( - hessian "github.com/apache/dubbo-go-hessian2" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" -) - -func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { - pkg := &DubboPackage{} - pkg.Body = []interface{}{"a"} - pkg.Header.Type = hessian.PackageHeartbeat - pkg.Header.SerialID = byte(S_Dubbo) - pkg.Header.ID = 10086 - - // heartbeat - data, err := pkg.Marshal() - assert.NoError(t, err) - - pkgres := &DubboPackage{} - pkgres.Body = []interface{}{} - err = pkgres.Unmarshal(data) - assert.NoError(t, err) - assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type) - assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) - assert.Equal(t, int64(10086), pkgres.Header.ID) - assert.Equal(t, 0, len(pkgres.Body.([]interface{}))) - - // request - pkg.Header.Type = hessian.PackageRequest - pkg.Service.Interface = "Service" - pkg.Service.Path = "path" - pkg.Service.Version = "2.6" - pkg.Service.Method = "Method" - pkg.Service.Timeout = time.Second - data, err = pkg.Marshal() - assert.NoError(t, err) - - pkgres = &DubboPackage{} - pkgres.Body = make([]interface{}, 7) - err = pkgres.Unmarshal(data) - assert.NoError(t, err) - assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type) - assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) - assert.Equal(t, int64(10086), pkgres.Header.ID) - assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0]) - assert.Equal(t, "path", pkgres.Body.([]interface{})[1]) - assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) - assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) - assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) - assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) - assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6]) -} - -func TestIssue380(t *testing.T) { - pkg := &DubboPackage{} - buf := bytes.NewBuffer([]byte("hello")) - err := pkg.Unmarshal(buf) - assert.True(t, perrors.Cause(err) == hessian.ErrHeaderNotEnough) -} diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go new file mode 100644 index 0000000000000000000000000000000000000000..f4a39cb89365919f8d0e6293c3591bc4257b0104 --- /dev/null +++ b/protocol/dubbo/dubbo_codec.go @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dubbo + +import ( + "bufio" + "bytes" + "fmt" + "strconv" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" +) + +//SerialID serial ID +type SerialID byte + +const ( + // S_Dubbo dubbo serial id + S_Dubbo SerialID = 2 +) + +func init() { + codec := &DubboCodec{} + // this is for registry dubboCodec of dubbo protocol + remoting.RegistryCodec("dubbo", codec) +} + +// DubboPackage is for hessian encode/decode. If we refactor hessian, it will also be refactored. +type DubboPackage struct { + Header hessian.DubboHeader + Service hessian.Service + Body interface{} + Err error +} + +// String of DubboPackage +func (p DubboPackage) String() string { + return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) +} + +// nolint +func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(p.Service, p.Header, p.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil +} + +// nolint +func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error { + // fix issue https://github.com/apache/dubbo-go/issues/380 + bufLen := buf.Len() + if bufLen < hessian.HEADER_LENGTH { + return perrors.WithStack(hessian.ErrHeaderNotEnough) + } + + codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen)) + + // read header + err := codec.ReadHeader(&p.Header) + if err != nil { + return perrors.WithStack(err) + } + + if resp != nil { // for client + if (p.Header.Type & hessian.PackageRequest) != 0x00 { + // size of this array must be '7' + // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 + p.Body = make([]interface{}, 7) + } else { + pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID)) + if pendingRsp == nil { + return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } + p.Body = &hessian.Response{RspObj: pendingRsp.Reply} + } + } + // read body + err = codec.ReadBody(p.Body) + return perrors.WithStack(err) +} + +// DubboCodec. It is implements remoting.Codec +type DubboCodec struct { +} + +// encode request for transport +func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) { + if request.Event { + return c.encodeHeartbeartReqeust(request) + } + + invoc, ok := request.Data.(*protocol.Invocation) + if !ok { + logger.Errorf("encode request failed for parameter type :%+v", request) + return nil, perrors.Errorf("encode request failed for parameter type :%+v", request) + } + invocation := *invoc + + p := &DubboPackage{} + p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "") + p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "") + p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "") + p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "") + p.Service.Method = invocation.MethodName() + + timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT))) + if err != nil { + // it will be wrapped in readwrite.Write . + return nil, perrors.WithStack(err) + } + p.Service.Timeout = time.Duration(timeout) + + p.Header.SerialID = byte(S_Dubbo) + p.Header.ID = request.ID + if request.TwoWay { + p.Header.Type = hessian.PackageRequest_TwoWay + } else { + p.Header.Type = hessian.PackageRequest + } + + p.Body = hessian.NewRequest(invocation.Arguments(), invocation.Attachments()) + + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(p.Service, p.Header, p.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil +} + +// encode heartbeart request +func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) { + pkg := &DubboPackage{} + pkg.Body = []interface{}{} + pkg.Header.ID = request.ID + pkg.Header.Type = hessian.PackageHeartbeat + pkg.Header.SerialID = byte(S_Dubbo) + + codec := hessian.NewHessianCodec(nil) + + byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(byt), nil +} + +// encode response +func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) { + var ptype = hessian.PackageResponse + if response.IsHeartbeat() { + ptype = hessian.PackageHeartbeat + } + resp := &DubboPackage{ + Header: hessian.DubboHeader{ + SerialID: response.SerialID, + Type: ptype, + ID: response.ID, + ResponseStatus: response.Status, + }, + } + if !response.IsHeartbeat() { + resp.Body = &hessian.Response{ + RspObj: response.Result.(protocol.RPCResult).Rest, + Exception: response.Result.(protocol.RPCResult).Err, + Attachments: response.Result.(protocol.RPCResult).Attrs, + } + } + + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(resp.Service, resp.Header, resp.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil +} + +// Decode data, including request and response. +func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) { + if c.isRequest(data) { + req, len, err := c.decodeRequest(data) + if err != nil { + return remoting.DecodeResult{}, len, perrors.WithStack(err) + } + return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err) + } else { + resp, len, err := c.decodeResponse(data) + if err != nil { + return remoting.DecodeResult{}, len, perrors.WithStack(err) + } + return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err) + } +} + +func (c *DubboCodec) isRequest(data []byte) bool { + if data[2]&byte(0x80) == 0x00 { + return false + } + return true +} + +// decode request +func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) { + pkg := &DubboPackage{ + Body: make([]interface{}, 7), + } + var request *remoting.Request = nil + buf := bytes.NewBuffer(data) + err := pkg.Unmarshal(buf, nil) + if err != nil { + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + //FIXME + return nil, 0, originErr + } + logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) + + return request, 0, perrors.WithStack(err) + } + request = &remoting.Request{ + ID: pkg.Header.ID, + SerialID: pkg.Header.SerialID, + TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00, + Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00, + } + if (pkg.Header.Type & hessian.PackageHeartbeat) == 0x00 { + // convert params of request + req := pkg.Body.([]interface{}) // length of body should be 7 + if len(req) > 0 { + //invocation := request.Data.(*invocation.RPCInvocation) + var methodName string + var args []interface{} + var attachments map[string]string = make(map[string]string) + if req[0] != nil { + //dubbo version + request.Version = req[0].(string) + } + if req[1] != nil { + //path + attachments[constant.PATH_KEY] = req[1].(string) + } + if req[2] != nil { + //version + attachments[constant.VERSION_KEY] = req[2].(string) + } + if req[3] != nil { + //method + methodName = req[3].(string) + } + if req[4] != nil { + //ignore argTypes + } + if req[5] != nil { + args = req[5].([]interface{}) + } + if req[6] != nil { + attachments = req[6].(map[string]string) + } + invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), + invocation.WithArguments(args), invocation.WithMethodName(methodName)) + request.Data = invoc + } + } + return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil +} + +// decode response +func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error) { + pkg := &DubboPackage{} + buf := bytes.NewBuffer(data) + response := &remoting.Response{} + err := pkg.Unmarshal(buf, response) + if err != nil { + originErr := perrors.Cause(err) + // if the data is very big, so the receive need much times. + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + return nil, 0, originErr + } + logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) + + return nil, 0, perrors.WithStack(err) + } + response = &remoting.Response{ + ID: pkg.Header.ID, + //Version: pkg.Header., + SerialID: pkg.Header.SerialID, + Status: pkg.Header.ResponseStatus, + Event: (pkg.Header.Type & hessian.PackageHeartbeat) != 0, + } + var error error + if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 { + if pkg.Header.Type&hessian.PackageResponse != 0x00 { + logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body) + if pkg.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err) + error = pkg.Err + } + } else { + logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body) + response.Status = hessian.Response_OK + //reply(session, p, hessian.PackageHeartbeat) + } + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error + } + logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body) + rpcResult := &protocol.RPCResult{} + response.Result = rpcResult + if pkg.Header.Type&hessian.PackageRequest == 0x00 { + if pkg.Err != nil { + rpcResult.Err = pkg.Err + } else if pkg.Body.(*hessian.Response).Exception != nil { + rpcResult.Err = pkg.Body.(*hessian.Response).Exception + response.Error = rpcResult.Err + } + rpcResult.Attrs = pkg.Body.(*hessian.Response).Attachments + rpcResult.Rest = pkg.Body.(*hessian.Response).RspObj + } + + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil +} diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 09c3725710d2a0b821d8e641b0cb7b367189c244..988353e393fc6282abafb4e38b73a421a5e0518a 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -20,6 +20,7 @@ package dubbo import ( "context" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -34,8 +35,10 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol" invocation_impl "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" ) var ( @@ -45,24 +48,35 @@ var ( ) var ( - attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} + attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY, + constant.PATH_KEY, constant.VERSION_KEY} ) -// DubboInvoker ... +// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip. type DubboInvoker struct { protocol.BaseInvoker - client *Client + // the exchange layer, it is focus on network communication. + client *remoting.ExchangeClient quitOnce sync.Once + // timeout for service(interface) level. + timeout time.Duration // Used to record the number of requests. -1 represent this DubboInvoker is destroyed reqNum int64 } -// NewDubboInvoker ... -func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { +// NewDubboInvoker constructor +func NewDubboInvoker(url common.URL, client *remoting.ExchangeClient) *DubboInvoker { + requestTimeout := config.GetConsumerConfig().RequestTimeout + + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, reqNum: 0, + timeout: requestTimeout, } } @@ -83,6 +97,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati defer atomic.AddInt64(&(di.reqNum), -1) inv := invocation.(*invocation_impl.RPCInvocation) + // init param for _, k := range attachmentKey { if v := di.GetUrl().GetParam(k, ""); len(v) > 0 { inv.SetAttachments(k, v) @@ -99,29 +114,47 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati logger.Errorf("ParseBool - error: %v", err) async = false } - response := NewResponse(inv.Reply(), nil) + //response := NewResponse(inv.Reply(), nil) + rest := &protocol.RPCResult{} + timeout := di.getTimeout(inv) if async { if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { - result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) + //result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) + result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest) } else { - result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) + result.Err = di.client.Send(&invocation, url, timeout) } } else { if inv.Reply() == nil { result.Err = ErrNoReply } else { - result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) + result.Err = di.client.Request(&invocation, url, timeout, rest) } } if result.Err == nil { result.Rest = inv.Reply() - result.Attrs = response.atta + result.Attrs = rest.Attrs } logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest) return &result } +// get timeout including methodConfig +func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration { + var timeout = di.GetUrl().GetParam(strings.Join([]string{constant.METHOD_KEYS, invocation.MethodName(), constant.TIMEOUT_KEY}, "."), "") + if len(timeout) != 0 { + if t, err := time.ParseDuration(timeout); err == nil { + // config timeout into attachment + invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(t.Milliseconds()))) + return t + } + } + // set timeout into invocation at method level + invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(di.timeout.Milliseconds()))) + return di.timeout +} + // Destroy ... func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 1a64301f8200a4264001284cca1af3f0f1e07814..88c19105332b6d084f94f3a2bac92262fd3966dc 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -18,6 +18,7 @@ package dubbo import ( + "bytes" "context" "sync" "testing" @@ -25,28 +26,26 @@ import ( ) import ( + hessian "github.com/apache/dubbo-go-hessian2" "github.com/opentracing/opentracing-go" + perrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/getty" ) func TestDubboInvoker_Invoke(t *testing.T) { proto, url := InitTest(t) - c := &Client{ - pendingResponses: new(sync.Map), - conf: *clientConf, - opts: Options{ - ConnectTimeout: 3 * time.Second, - RequestTimeout: 6 * time.Second, - }, - } - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + c := getExchangeClient(url) invoker := NewDubboInvoker(url, c) user := &User{} @@ -58,7 +57,6 @@ func TestDubboInvoker_Invoke(t *testing.T) { res := invoker.Invoke(context.Background(), inv) assert.NoError(t, res.Error()) assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) - assert.Equal(t, "test_value", res.Attachments()["test_key"]) // test attachments for request/response // CallOneway inv.SetAttachments(constant.ASYNC_KEY, "true") @@ -69,8 +67,10 @@ func TestDubboInvoker_Invoke(t *testing.T) { lock := sync.Mutex{} lock.Lock() inv.SetCallBack(func(response common.CallbackResponse) { - r := response.(AsyncCallbackResponse) - assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) + r := response.(remoting.AsyncCallbackResponse) + rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult) + assert.Equal(t, User{Id: "1", Name: "username"}, *(rst.Rest.(*User))) + //assert.Equal(t, User{ID: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(context.Background(), inv) @@ -92,3 +92,143 @@ func TestDubboInvoker_Invoke(t *testing.T) { proto.Destroy() lock.Unlock() } + +func InitTest(t *testing.T) (protocol.Protocol, common.URL) { + + hessian.RegisterPOJO(&User{}) + + methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) + assert.NoError(t, err) + assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods) + + // config + getty.SetClientConf(getty.ClientConfig{ + ConnectionNum: 2, + HeartbeatPeriod: "5s", + SessionTimeout: "20s", + PoolTTL: 600, + PoolSize: 64, + GettySessionParam: getty.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "4s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "client", + }, + }) + getty.SetServerConfig(getty.ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + GettySessionParam: getty.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "server", + }}) + + // Export + proto := GetProtocol() + url, err := common.NewURL("dubbo://127.0.0.1:20702/UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") + assert.NoError(t, err) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) + + time.Sleep(time.Second * 2) + + return proto, url +} + +////////////////////////////////// +// provider +////////////////////////////////// + +type ( + User struct { + Id string `json:"id"` + Name string `json:"name"` + } + + UserProvider struct { + user map[string]User + } +) + +// size:4801228 +func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error { + argBuf := new(bytes.Buffer) + for i := 0; i < 400; i++ { + // use chinese for test + argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") + argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") + } + rsp.Id = argBuf.String() + rsp.Name = argBuf.String() + return nil +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error { + rsp.Id = req[0].(string) + rsp.Name = req[1].(string) + return nil +} + +func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) { + return User{Id: id, Name: name}, nil +} + +func (u *UserProvider) GetUser1() error { + return nil +} + +func (u *UserProvider) GetUser2() error { + return perrors.New("error") +} + +func (u *UserProvider) GetUser3(rsp *[]interface{}) error { + *rsp = append(*rsp, User{Id: "1", Name: "username"}) + return nil +} + +func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) { + + return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil +} + +func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) { + return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil +} + +func (u *UserProvider) GetUser6(id int64) (*User, error) { + if id == 0 { + return nil, nil + } + return &User{Id: "1"}, nil +} + +func (u *UserProvider) Reference() string { + return "UserProvider" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 355dbc802488338ef4dbdd7290166038b312f183..2d8a20c550aa21161ba67071408a2c25e0c49970 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -18,10 +18,16 @@ package dubbo import ( + "context" + "fmt" "sync" "time" ) +import ( + "github.com/opentracing/opentracing-go" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -29,6 +35,9 @@ import ( "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/getty" ) // dubbo protocol constant @@ -37,6 +46,13 @@ const ( DUBBO = "dubbo" ) +var ( + // Make the connection can be shared. + // It will create one connection for one address (ip+port) + exchangeClientMap *sync.Map = new(sync.Map) + exchangeLock *sync.Map = new(sync.Map) +) + func init() { extension.SetProtocol(DUBBO, GetProtocol) } @@ -45,22 +61,24 @@ var ( dubboProtocol *DubboProtocol ) -// DubboProtocol ... +// It support dubbo protocol. It implements Protocol interface for dubbo protocol. type DubboProtocol struct { protocol.BaseProtocol - serverMap map[string]*Server + // It is store relationship about serviceKey(group/interface:version) and ExchangeServer + // The ExchangeServer is introduced to replace of Server. Because Server is depend on getty directly. + serverMap map[string]*remoting.ExchangeServer serverLock sync.Mutex } -// NewDubboProtocol ... +// nolint func NewDubboProtocol() *DubboProtocol { return &DubboProtocol{ BaseProtocol: protocol.NewBaseProtocol(), - serverMap: make(map[string]*Server), + serverMap: make(map[string]*remoting.ExchangeServer), } } -// Export ... +// nolint func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := url.ServiceKey() @@ -73,26 +91,20 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -// Refer ... +// nolint func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { - //default requestTimeout - var requestTimeout = config.GetConsumerConfig().RequestTimeout - - requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) - if t, err := time.ParseDuration(requestTimeoutStr); err == nil { - requestTimeout = t + exchangeClient := getExchangeClient(url) + if exchangeClient == nil { + logger.Warnf("can't dial the server: %+v", url.Location) + return nil } - - invoker := NewDubboInvoker(url, NewClient(Options{ - ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, - RequestTimeout: requestTimeout, - })) + invoker := NewDubboInvoker(url, exchangeClient) dp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) return invoker } -// Destroy ... +// nolint func (dp *DubboProtocol) Destroy() { logger.Infof("DubboProtocol destroy.") @@ -116,9 +128,12 @@ func (dp *DubboProtocol) openServer(url common.URL) { dp.serverLock.Lock() _, ok = dp.serverMap[url.Location] if !ok { - srv := NewServer() + handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult { + return doHandleRequest(invocation) + } + srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler)) dp.serverMap[url.Location] = srv - srv.Start(url) + srv.Start() } dp.serverLock.Unlock() } @@ -131,3 +146,100 @@ func GetProtocol() protocol.Protocol { } return dubboProtocol } + +func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult { + exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey()) + result := protocol.RPCResult{} + if exporter == nil { + err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey()) + logger.Errorf(err.Error()) + result.Err = err + //reply(session, p, hessian.PackageResponse) + return result + } + invoker := exporter.(protocol.Exporter).GetInvoker() + if invoker != nil { + // FIXME + ctx := rebuildCtx(rpcInvocation) + + invokeResult := invoker.Invoke(ctx, rpcInvocation) + if err := invokeResult.Error(); err != nil { + result.Err = invokeResult.Error() + //p.Header.ResponseStatus = hessian.Response_OK + //p.Body = hessian.NewResponse(nil, err, result.Attachments()) + } else { + result.Rest = invokeResult.Result() + //p.Header.ResponseStatus = hessian.Response_OK + //p.Body = hessian.NewResponse(res, nil, result.Attachments()) + } + } else { + result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey()) + } + return result +} + +func getExchangeClient(url common.URL) *remoting.ExchangeClient { + clientTmp, ok := exchangeClientMap.Load(url.Location) + if !ok { + var exchangeClientTmp *remoting.ExchangeClient + func() { + // lock for NewExchangeClient and store into map. + _, loaded := exchangeLock.LoadOrStore(url.Location, 0x00) + // unlock + defer exchangeLock.Delete(url.Location) + if loaded { + // retry for 5 times. + for i := 0; i < 5; i++ { + if clientTmp, ok = exchangeClientMap.Load(url.Location); ok { + break + } else { + // if cannot get, sleep a while. + time.Sleep(time.Duration(i*100) * time.Millisecond) + } + } + return + } + // new ExchangeClient + exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ + ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, + }), config.GetConsumerConfig().ConnectTimeout, false) + // input store + if exchangeClientTmp != nil { + exchangeClientMap.Store(url.Location, exchangeClientTmp) + } + }() + if exchangeClientTmp != nil { + return exchangeClientTmp + } + } + // cannot dial the server + if clientTmp == nil { + return nil + } + exchangeClient, ok := clientTmp.(*remoting.ExchangeClient) + if !ok { + exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ + ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, + }), config.GetConsumerConfig().ConnectTimeout, false) + if exchangeClientTmp != nil { + exchangeClientMap.Store(url.Location, exchangeClientTmp) + } + return exchangeClientTmp + } + return exchangeClient +} + +// rebuildCtx rebuild the context by attachment. +// Once we decided to transfer more context's key-value, we should change this. +// now we only support rebuild the tracing context +func rebuildCtx(inv *invocation.RPCInvocation) context.Context { + ctx := context.WithValue(context.Background(), "attachment", inv.Attachments()) + + // actually, if user do not use any opentracing framework, the err will not be nil. + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, + opentracing.TextMapCarrier(inv.Attachments())) + if err == nil { + ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx) + } + return ctx +} diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 14f6868ad4a7f2bf6f549a2fbbce8234cbb4aa12..55ab0fe455438e398fd0b227aad3962d2383f84a 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -29,13 +29,53 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting/getty" ) +func init() { + getty.SetServerConfig(getty.ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + GettySessionParam: getty.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "server", + }}) + getty.SetClientConf(getty.ClientConfig{ + ConnectionNum: 1, + HeartbeatPeriod: "3s", + SessionTimeout: "20s", + PoolTTL: 600, + PoolSize: 64, + GettySessionParam: getty.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "4s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "client", + }, + }) +} func TestDubboProtocol_Export(t *testing.T) { // Export proto := GetProtocol() - srvConf = &ServerConfig{} - url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + url, err := common.NewURL("dubbo://127.0.0.1:20094/com.ikurento.user.UserProvider?anyhost=true&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + @@ -48,7 +88,7 @@ func TestDubboProtocol_Export(t *testing.T) { assert.True(t, eq) // second service: the same path and the different version - url2, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + url2, err := common.NewURL("dubbo://127.0.0.1:20095/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ @@ -74,16 +114,31 @@ func TestDubboProtocol_Export(t *testing.T) { assert.False(t, ok) } +func TestDubboProtocol_Refer_No_connect(t *testing.T) { + // Refer + proto := GetProtocol() + url, err := common.NewURL("dubbo://127.0.0.1:20096/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + invoker := proto.Refer(url) + assert.Nil(t, invoker) +} + func TestDubboProtocol_Refer(t *testing.T) { // Refer proto := GetProtocol() - url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + + url, err := common.NewURL("dubbo://127.0.0.1:20091/com.ikurento.user.UserProvider?anyhost=true&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + "side=provider&timeout=3000×tamp=1556509797245") assert.NoError(t, err) - clientConf = &ClientConfig{} + proto.Export(protocol.NewBaseInvoker(url)) + invoker := proto.Refer(url) // make sure url diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index b207fd0b0cc4eb7de8409a8c46c6fc9ef0baa5c7..cc93da4762686533bf13748973efd8654feddb37 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -18,11 +18,13 @@ package invocation import ( + "github.com/apache/dubbo-go/common" "reflect" "sync" ) import ( + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) @@ -141,6 +143,11 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { r.callBack = c } +func (r *RPCInvocation) ServiceKey() string { + return common.ServiceKey(r.AttachmentsByKey(constant.INTERFACE_KEY, ""), + r.AttachmentsByKey(constant.GROUP_KEY, ""), r.AttachmentsByKey(constant.VERSION_KEY, "")) +} + // ///////////////////////// // option // ///////////////////////// diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 0cb88b36a8f330059906eb70417b6d4841020c38..8a43910524dd78163483e7ac3fb20c6fd5a6310b 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -22,19 +22,15 @@ import ( "strings" "testing" "time" -) -import ( "github.com/opentracing/opentracing-go" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" -) -import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) type ( @@ -71,7 +67,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser", }) @@ -85,7 +81,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser0 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser0", }) @@ -98,7 +94,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser1 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser1", }) @@ -110,7 +106,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser2 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser2", }) @@ -122,7 +118,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser3 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser3", }) @@ -134,7 +130,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser4 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser4", }) @@ -145,7 +141,7 @@ func TestHTTPClient_Call(t *testing.T) { assert.Equal(t, &User{Id: "", Name: ""}, reply) ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser4", }) diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index b6e194ce0e93e84c164eccf8574e5eb20430f6e8..3bd2d3898f20029ae849b24c047de48198aa46a8 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -19,13 +19,12 @@ package jsonrpc import ( "context" -) -import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 70d2da0faed3bc9797eb23cec653bea05d445d91..3fee0f41a9cbc66d6f9151d7699f96076b8d89fb 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -68,6 +68,9 @@ func (pfw *ProtocolFilterWrapper) Destroy() { } func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { + if invoker == nil { + return nil + } filtName := invoker.GetUrl().GetParam(key, "") if filtName == "" { return invoker diff --git a/remoting/codec.go b/remoting/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..607d1643cc1967e93bf5288d8d4c0788c73a735e --- /dev/null +++ b/remoting/codec.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "bytes" +) + +// codec for exchangeClient +type Codec interface { + EncodeRequest(request *Request) (*bytes.Buffer, error) + EncodeResponse(response *Response) (*bytes.Buffer, error) + Decode(data []byte) (DecodeResult, int, error) +} + +type DecodeResult struct { + IsRequest bool + Result interface{} +} + +var ( + codec = make(map[string]Codec, 2) +) + +func RegistryCodec(protocol string, codecTmp Codec) { + codec[protocol] = codecTmp +} + +func GetCodec(protocol string) Codec { + return codec[protocol] +} diff --git a/remoting/exchange.go b/remoting/exchange.go new file mode 100644 index 0000000000000000000000000000000000000000..b97d6073c1f734d91c094372cf7de0919d088d99 --- /dev/null +++ b/remoting/exchange.go @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "time" +) + +import ( + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +var ( + // generate request ID for global use + sequence atomic.Int64 +) + +func init() { + // init request ID + sequence.Store(0) +} + +func SequenceId() int64 { + // increse 2 for every request as the same before. + // We expect that the request from client to server, the requestId is even; but from server to client, the requestId is odd. + return sequence.Add(2) +} + +// this is request for transport layer +type Request struct { + ID int64 + // protocol version + Version string + // serial ID (ignore) + SerialID byte + // Data + Data interface{} + TwoWay bool + Event bool +} + +// NewRequest aims to create Request. +// The ID is auto increase. +func NewRequest(version string) *Request { + return &Request{ + ID: SequenceId(), + Version: version, + } +} + +// this is response for transport layer +type Response struct { + ID int64 + Version string + SerialID byte + Status uint8 + Event bool + Error error + Result interface{} +} + +// NewResponse aims to create Response +func NewResponse(id int64, version string) *Response { + return &Response{ + ID: id, + Version: version, + } +} + +// the response is heartbeat +func (response *Response) IsHeartbeat() bool { + return response.Event && response.Result == nil +} + +type Options struct { + // connect timeout + ConnectTimeout time.Duration +} + +//AsyncCallbackResponse async response for dubbo +type AsyncCallbackResponse struct { + common.CallbackResponse + Opts Options + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} +} + +// the client sends requst to server, there is one pendingResponse at client side to wait the response from server +type PendingResponse struct { + seq int64 + Err error + start time.Time + ReadStart time.Time + Callback common.AsyncCallback + response *Response + Reply interface{} + Done chan struct{} +} + +// NewPendingResponse aims to create PendingResponse. +// Id is always from ID of Request +func NewPendingResponse(id int64) *PendingResponse { + return &PendingResponse{ + seq: id, + start: time.Now(), + response: &Response{}, + Done: make(chan struct{}), + } +} + +func (r *PendingResponse) SetResponse(response *Response) { + r.response = response +} + +// GetCallResponse is used for callback of async. +// It is will return AsyncCallbackResponse. +func (r PendingResponse) GetCallResponse() common.CallbackResponse { + return AsyncCallbackResponse{ + Cause: r.Err, + Start: r.start, + ReadStart: r.ReadStart, + Reply: r.response, + } +} diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go new file mode 100644 index 0000000000000000000000000000000000000000..7c0520ab9cb599ae1a4d09c575c9f4389aa007eb --- /dev/null +++ b/remoting/exchange_client.go @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "errors" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +var ( + // store requestID and response + pendingResponses *sync.Map = new(sync.Map) +) + +type SequenceType int64 + +// It is interface of client for network communication. +// If you use getty as network communication, you should define GettyClient that implements this interface. +type Client interface { + SetExchangeClient(client *ExchangeClient) + // responseHandler is used to deal with msg + SetResponseHandler(responseHandler ResponseHandler) + // connect url + Connect(url common.URL) error + // close + Close() + // send request to server. + Request(request *Request, timeout time.Duration, response *PendingResponse) error +} + +// This is abstraction level. it is like facade. +type ExchangeClient struct { + // connect server timeout + ConnectTimeout time.Duration + // to dial server address. The format: ip:port + address string + // the client that will deal with the transport. It is interface, and it will use gettyClient by default. + client Client + // the tag for init. + init bool +} + +// handle the message from server +type ResponseHandler interface { + Handler(response *Response) +} + +// create ExchangeClient +func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient { + exchangeClient := &ExchangeClient{ + ConnectTimeout: connectTimeout, + address: url.Location, + client: client, + } + client.SetExchangeClient(exchangeClient) + if !lazyInit { + if err := exchangeClient.doInit(url); err != nil { + return nil + } + } + + client.SetResponseHandler(exchangeClient) + return exchangeClient +} + +func (cl *ExchangeClient) doInit(url common.URL) error { + if cl.init { + return nil + } + if cl.client.Connect(url) != nil { + //retry for a while + time.Sleep(100 * time.Millisecond) + if cl.client.Connect(url) != nil { + logger.Errorf("Failed to connect server %+v " + url.Location) + return errors.New("Failed to connect server " + url.Location) + } + } + //FIXME atomic operation + cl.init = true + return nil +} + +// two way request +func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration, + result *protocol.RPCResult) error { + if er := client.doInit(url); er != nil { + return er + } + request := NewRequest("2.0.2") + request.Data = invocation + request.Event = false + request.TwoWay = true + + rsp := NewPendingResponse(request.ID) + rsp.response = NewResponse(request.ID, "2.0.2") + rsp.Reply = (*invocation).Reply() + AddPendingResponse(rsp) + + err := client.client.Request(request, timeout, rsp) + // request error + if err != nil { + result.Err = err + return err + } + if resultTmp, ok := rsp.response.Result.(*protocol.RPCResult); ok { + result.Rest = resultTmp.Rest + result.Attrs = resultTmp.Attrs + result.Err = resultTmp.Err + } + return nil +} + +// async two way request +func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration, + callback common.AsyncCallback, result *protocol.RPCResult) error { + if er := client.doInit(url); er != nil { + return er + } + request := NewRequest("2.0.2") + request.Data = invocation + request.Event = false + request.TwoWay = true + + rsp := NewPendingResponse(request.ID) + rsp.response = NewResponse(request.ID, "2.0.2") + rsp.Callback = callback + rsp.Reply = (*invocation).Reply() + AddPendingResponse(rsp) + + err := client.client.Request(request, timeout, rsp) + if err != nil { + result.Err = err + return err + } + result.Rest = rsp.response + return nil +} + +// oneway request +func (client *ExchangeClient) Send(invocation *protocol.Invocation, url common.URL, timeout time.Duration) error { + if er := client.doInit(url); er != nil { + return er + } + request := NewRequest("2.0.2") + request.Data = invocation + request.Event = false + request.TwoWay = false + + rsp := NewPendingResponse(request.ID) + rsp.response = NewResponse(request.ID, "2.0.2") + + err := client.client.Request(request, timeout, rsp) + if err != nil { + return err + } + return nil +} + +// close client +func (client *ExchangeClient) Close() { + client.client.Close() +} + +// handle the response from server +func (client *ExchangeClient) Handler(response *Response) { + + pendingResponse := removePendingResponse(SequenceType(response.ID)) + if pendingResponse == nil { + logger.Errorf("failed to get pending response context for response package %s", *response) + return + } + + pendingResponse.response = response + + if pendingResponse.Callback == nil { + pendingResponse.Err = pendingResponse.response.Error + pendingResponse.Done <- struct{}{} + } else { + pendingResponse.Callback(pendingResponse.GetCallResponse()) + } +} + +// store response into map +func AddPendingResponse(pr *PendingResponse) { + pendingResponses.Store(SequenceType(pr.seq), pr) +} + +// get and remove response +func removePendingResponse(seq SequenceType) *PendingResponse { + if pendingResponses == nil { + return nil + } + if presp, ok := pendingResponses.Load(seq); ok { + pendingResponses.Delete(seq) + return presp.(*PendingResponse) + } + return nil +} + +// get response +func GetPendingResponse(seq SequenceType) *PendingResponse { + if presp, ok := pendingResponses.Load(seq); ok { + return presp.(*PendingResponse) + } + return nil +} diff --git a/remoting/exchange_server.go b/remoting/exchange_server.go new file mode 100644 index 0000000000000000000000000000000000000000..c4538075b53b9451bcf7ccc8cb981926b6adeb43 --- /dev/null +++ b/remoting/exchange_server.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package remoting + +import ( + "github.com/apache/dubbo-go/common" +) + +// It is interface of server for network communication. +// If you use getty as network communication, you should define GettyServer that implements this interface. +type Server interface { + //invoke once for connection + Start() + //it is for destroy + Stop() +} + +// This is abstraction level. it is like facade. +type ExchangeServer struct { + Server Server +} + +// Create ExchangeServer +func NewExchangeServer(url common.URL, server Server) *ExchangeServer { + exchangServer := &ExchangeServer{ + Server: server, + } + return exchangServer +} + +// start server +func (server *ExchangeServer) Start() { + server.Server.Start() +} + +// stop server +func (server *ExchangeServer) Stop() { + server.Server.Stop() +} diff --git a/protocol/dubbo/config.go b/remoting/getty/config.go similarity index 99% rename from protocol/dubbo/config.go rename to remoting/getty/config.go index dbc6989c54780afacef717f1d110833d92967f9f..d63a45a2e4d4d2e2b53c13b2de50a76fc3230b65 100644 --- a/protocol/dubbo/config.go +++ b/remoting/getty/config.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package dubbo +package getty import ( "time" diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f6460cde9ddb420eb57758bd32150bb84fea9dd0 --- /dev/null +++ b/remoting/getty/dubbo_codec_for_test.go @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package getty + +// copy from dubbo/dubbo_codec.go . +// it is used to unit test. +import ( + "bufio" + "bytes" + "fmt" + "strconv" + "time" +) +import ( + hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" +) + +//SerialID serial ID +type SerialID byte + +const ( + // S_Dubbo dubbo serial id + S_Dubbo SerialID = 2 +) + +func init() { + codec := &DubboTestCodec{} + remoting.RegistryCodec("dubbo", codec) +} + +// DubboPackage ... +type DubboPackage struct { + Header hessian.DubboHeader + Service hessian.Service + Body interface{} + Err error +} + +func (p DubboPackage) String() string { + return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) +} + +// Marshal ... +func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(p.Service, p.Header, p.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil +} + +// Unmarshal ... +func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error { + // fix issue https://github.com/apache/dubbo-go/issues/380 + bufLen := buf.Len() + if bufLen < hessian.HEADER_LENGTH { + return perrors.WithStack(hessian.ErrHeaderNotEnough) + } + + codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen)) + + // read header + err := codec.ReadHeader(&p.Header) + if err != nil { + return perrors.WithStack(err) + } + + if resp != nil { // for client + if (p.Header.Type & hessian.PackageRequest) != 0x00 { + // size of this array must be '7' + // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 + p.Body = make([]interface{}, 7) + } else { + pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID)) + if pendingRsp == nil { + return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } + p.Body = &hessian.Response{RspObj: pendingRsp.Reply} + } + } + + // read body + err = codec.ReadBody(p.Body) + return perrors.WithStack(err) +} + +type DubboTestCodec struct { +} + +func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) { + if request.Event { + return c.encodeHeartbeartReqeust(request) + } + + invoc, ok := request.Data.(*invocation.RPCInvocation) + if !ok { + logger.Errorf("encode request failed for parameter type :%+v", request) + return nil, perrors.Errorf("encode request failed for parameter type :%+v", request) + } + invocation := *invoc + + p := &DubboPackage{} + p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "") + p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "") + p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "") + p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "") + p.Service.Method = invocation.MethodName() + + timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000")) + if err != nil { + // it will be wrapped in readwrite.Write . + return nil, err + } + p.Service.Timeout = time.Duration(timeout) + //var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") + //if len(timeout) != 0 { + // if t, err := time.ParseDuration(timeout); err == nil { + // p.Service.Timeout = t + // } + //} + + p.Header.SerialID = byte(S_Dubbo) + p.Header.ID = request.ID + if request.TwoWay { + p.Header.Type = hessian.PackageRequest_TwoWay + } else { + p.Header.Type = hessian.PackageRequest + } + + p.Body = hessian.NewRequest(invocation.Arguments(), invocation.Attachments()) + + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(p.Service, p.Header, p.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil +} +func (c *DubboTestCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) { + pkg := &DubboPackage{} + pkg.Body = []interface{}{} + pkg.Header.ID = request.ID + pkg.Header.Type = hessian.PackageHeartbeat + pkg.Header.SerialID = byte(S_Dubbo) + + codec := hessian.NewHessianCodec(nil) + + byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(byt), nil +} +func (c *DubboTestCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) { + var ptype = hessian.PackageResponse + if response.IsHeartbeat() { + ptype = hessian.PackageHeartbeat + } + resp := &DubboPackage{ + Header: hessian.DubboHeader{ + SerialID: response.SerialID, + Type: ptype, + ID: response.ID, + ResponseStatus: response.Status, + }, + } + if !response.IsHeartbeat() { + resp.Body = &hessian.Response{ + RspObj: response.Result.(protocol.RPCResult).Rest, + Exception: response.Result.(protocol.RPCResult).Err, + Attachments: response.Result.(protocol.RPCResult).Attrs, + } + } + + //if response.Header.Type&hessian.PackageRequest != 0x00 { + // resp.Body = req.Body + //} else { + // resp.Body = nil + //} + codec := hessian.NewHessianCodec(nil) + + pkg, err := codec.Write(resp.Service, resp.Header, resp.Body) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil +} +func (c *DubboTestCodec) Decode(data []byte) (remoting.DecodeResult, int, error) { + if c.isRequest(data) { + req, len, err := c.decodeRequest(data) + if err != nil { + return remoting.DecodeResult{}, len, err + } + return remoting.DecodeResult{IsRequest: true, Result: req}, len, err + } else { + resp, len, err := c.decodeResponse(data) + if err != nil { + return remoting.DecodeResult{}, len, err + } + return remoting.DecodeResult{IsRequest: false, Result: resp}, len, err + } +} +func (c *DubboTestCodec) isRequest(data []byte) bool { + if data[2]&byte(0x80) == 0x00 { + return false + } + return true +} + +func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, error) { + pkg := &DubboPackage{ + Body: make([]interface{}, 7), + } + var request *remoting.Request = nil + buf := bytes.NewBuffer(data) + err := pkg.Unmarshal(buf, nil) + if err != nil { + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + //FIXME + return nil, 0, originErr + } + logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) + + return request, 0, perrors.WithStack(err) + } + request = &remoting.Request{ + ID: pkg.Header.ID, + SerialID: pkg.Header.SerialID, + TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00, + Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00, + } + if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { + // convert params of request + req := pkg.Body.([]interface{}) // length of body should be 7 + if len(req) > 0 { + //invocation := request.Data.(*invocation.RPCInvocation) + var methodName string + var args []interface{} + var attachments map[string]string = make(map[string]string) + if req[0] != nil { + //dubbo version + request.Version = req[0].(string) + } + if req[1] != nil { + //path + attachments[constant.PATH_KEY] = req[1].(string) + } + if req[2] != nil { + //version + attachments[constant.VERSION_KEY] = req[2].(string) + } + if req[3] != nil { + //method + methodName = req[3].(string) + } + if req[4] != nil { + //argsType + //invocation.ParameterTypes(constant., req[1].(string)) + //argsTypes = req[4].(string) + } + if req[5] != nil { + args = req[5].([]interface{}) + } + if req[6] != nil { + attachments = req[6].(map[string]string) + } + //if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { + // pkg.Service.Path = attachments[constant.PATH_KEY] + //} + //if _, ok := attachments[constant.INTERFACE_KEY]; ok { + // pkg.Service.Interface = attachments[constant.INTERFACE_KEY] + //} else { + // pkg.Service.Interface = pkg.Service.Path + //} + //if len(attachments[constant.GROUP_KEY]) > 0 { + // pkg.Service.Group = attachments[constant.GROUP_KEY] + //} + invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), + invocation.WithArguments(args), invocation.WithMethodName(methodName)) + request.Data = invoc + //pkg.Body = map[string]interface{}{ + // "dubboVersion": dubboVersion, + // "argsTypes": argsTypes, + // "args": args, + // "service": common.ServiceMap.GetService("dubbo", pkg.Service.Path), // path as a key + // "attachments": attachments, + //} + } + } + return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil +} + +func (c *DubboTestCodec) decodeResponse(data []byte) (*remoting.Response, int, error) { + pkg := &DubboPackage{} + buf := bytes.NewBuffer(data) + response := &remoting.Response{} + err := pkg.Unmarshal(buf, response) + if err != nil { + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + return nil, 0, originErr + } + logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) + + return response, 0, perrors.WithStack(err) + } + response = &remoting.Response{ + ID: pkg.Header.ID, + //Version: pkg.Header., + SerialID: pkg.Header.SerialID, + Status: pkg.Header.ResponseStatus, + Event: (pkg.Header.Type & hessian.PackageHeartbeat) != 0, + } + var error error + if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 { + if pkg.Header.Type&hessian.PackageResponse != 0x00 { + logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body) + if pkg.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err) + error = pkg.Err + } + } else { + logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body) + response.Status = hessian.Response_OK + //reply(session, p, hessian.PackageHeartbeat) + } + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error + } + logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body) + rpcResult := &protocol.RPCResult{} + response.Result = rpcResult + if pkg.Header.Type&hessian.PackageRequest == 0x00 { + if pkg.Err != nil { + rpcResult.Err = pkg.Err + } else if pkg.Body.(*hessian.Response).Exception != nil { + rpcResult.Err = pkg.Body.(*hessian.Response).Exception + response.Error = rpcResult.Err + } + rpcResult.Attrs = pkg.Body.(*hessian.Response).Attachments + rpcResult.Rest = pkg.Body.(*hessian.Response).RspObj + } + + //h.conn.updateSession(session) + //pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) + //if pendingResponse == nil { + // logger.Errorf("failed to get pending response context for response package %s", *p) + // return + //} + + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil +} diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go new file mode 100644 index 0000000000000000000000000000000000000000..7055b985056a5c2d7d7aeb325ac3f367953347ea --- /dev/null +++ b/remoting/getty/getty_client.go @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package getty + +import ( + "math/rand" + "time" +) + +import ( + "github.com/dubbogo/getty" + gxsync "github.com/dubbogo/gost/sync" + perrors "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/remoting" +) + +var ( + errInvalidCodecType = perrors.New("illegal CodecType") + errInvalidAddress = perrors.New("remote address invalid or empty") + errSessionNotExist = perrors.New("session not exist") + errClientClosed = perrors.New("client closed") + errClientReadTimeout = perrors.New("client read timeout") + + clientConf *ClientConfig + clientGrpool *gxsync.TaskPool +) + +// it is init client for single protocol. +func initClient(protocol string) { + if protocol == "" { + return + } + + // load clientconfig from consumer_config + // default use dubbo + consumerConfig := config.GetConsumerConfig() + if consumerConfig.ApplicationConfig == nil { + return + } + protocolConf := config.GetConsumerConfig().ProtocolConf + defaultClientConfig := GetDefaultClientConfig() + if protocolConf == nil { + logger.Info("protocol_conf default use dubbo config") + } else { + dubboConf := protocolConf.(map[interface{}]interface{})[protocol] + if dubboConf == nil { + logger.Warnf("dubboConf is nil") + return + } + dubboConfByte, err := yaml.Marshal(dubboConf) + if err != nil { + panic(err) + } + err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig) + if err != nil { + panic(err) + } + } + clientConf = &defaultClientConfig + if err := clientConf.CheckValidity(); err != nil { + logger.Warnf("[CheckValidity] error: %v", err) + return + } + setClientGrpool() + + rand.Seed(time.Now().UnixNano()) +} + +// Config ClientConf +func SetClientConf(c ClientConfig) { + clientConf = &c + err := clientConf.CheckValidity() + if err != nil { + logger.Warnf("[ClientConfig CheckValidity] error: %v", err) + return + } + setClientGrpool() +} + +func setClientGrpool() { + if clientConf.GrPoolSize > 1 { + clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), + gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) + } +} + +// Options : param config +type Options struct { + // connect timeout + // remove request timeout, it will be calulate for every request + ConnectTimeout time.Duration +} + +// Client : some configuration for network communication. +type Client struct { + addr string + opts Options + conf ClientConfig + pool *gettyRPCClientPool + codec remoting.Codec + responseHandler remoting.ResponseHandler + ExchangeClient *remoting.ExchangeClient +} + +// create client +func NewClient(opt Options) *Client { + switch { + case opt.ConnectTimeout == 0: + opt.ConnectTimeout = 3 * time.Second + } + + c := &Client{ + opts: opt, + } + return c +} + +func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) { + c.ExchangeClient = client +} +func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) { + c.responseHandler = responseHandler +} + +// init client and try to connection. +func (c *Client) Connect(url common.URL) error { + initClient(url.Protocol) + c.conf = *clientConf + // new client + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + // codec + c.codec = remoting.GetCodec(url.Protocol) + c.addr = url.Location + _, _, err := c.selectSession(c.addr) + if err != nil { + logger.Errorf("try to connect server %v failed for : %v", url.Location, err) + } + return err +} + +// close network connection +func (c *Client) Close() { + if c.pool != nil { + c.pool.close() + } + c.pool = nil +} + +// send request +func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error { + _, session, err := c.selectSession(c.addr) + if err != nil { + return perrors.WithStack(err) + } + if session == nil { + return errSessionNotExist + } + + if err = c.transfer(session, request, timeout); err != nil { + return perrors.WithStack(err) + } + + if !request.TwoWay || response.Callback != nil { + return nil + } + + select { + case <-getty.GetTimeWheel().After(timeout): + return perrors.WithStack(errClientReadTimeout) + case <-response.Done: + err = response.Err + } + + return perrors.WithStack(err) +} + +func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { + rpcClient, err := c.pool.getGettyRpcClient(addr) + if err != nil { + return nil, nil, perrors.WithStack(err) + } + return rpcClient, rpcClient.selectSession(), nil +} + +func (c *Client) heartbeat(session getty.Session) error { + req := remoting.NewRequest("2.0.2") + req.TwoWay = true + req.Event = true + resp := remoting.NewPendingResponse(req.ID) + remoting.AddPendingResponse(resp) + return c.transfer(session, req, 3*time.Second) +} + +func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error { + err := session.WritePkg(request, timeout) + return perrors.WithStack(err) +} diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3dcc6b30ea4939d1d059cee4df98bc6eacb3036f --- /dev/null +++ b/remoting/getty/getty_client_test.go @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package getty + +import ( + "bytes" + "context" + "reflect" + "sync" + "testing" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + . "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" +) + +func TestRunSuite(t *testing.T) { + svr, url := InitTest(t) + client := getClient(url) + testRequestOneWay(t, svr, url, client) + testClient_Call(t, svr, url, client) + testClient_AsyncCall(t, svr, url, client) + + svr.Stop() +} + +func testRequestOneWay(t *testing.T, svr *Server, url common.URL, client *Client) { + + request := remoting.NewRequest("2.0.2") + up := &UserProvider{} + invocation := createInvocation("GetUser", nil, nil, []interface{}{[]interface{}{"1", "username"}, up}, + []reflect.Value{reflect.ValueOf([]interface{}{"1", "username"}), reflect.ValueOf(up)}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = false + //user := &User{} + err := client.Request(request, 3*time.Second, nil) + assert.NoError(t, err) +} + +func createInvocation(methodName string, callback interface{}, reply interface{}, arguments []interface{}, + parameterValues []reflect.Value) *invocation.RPCInvocation { + return invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName), + invocation.WithArguments(arguments), invocation.WithReply(reply), + invocation.WithCallBack(callback), invocation.WithParameterValues(parameterValues)) +} + +func setAttachment(invocation *invocation.RPCInvocation, attachments map[string]string) { + for key, value := range attachments { + invocation.SetAttachments(key, value) + } +} + +func getClient(url common.URL) *Client { + client := NewClient(Options{ + ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, + }) + + exchangeClient := remoting.NewExchangeClient(url, client, 5*time.Second, false) + client.SetExchangeClient(exchangeClient) + client.Connect(url) + client.SetResponseHandler(exchangeClient) + return client +} + +func testClient_Call(t *testing.T, svr *Server, url common.URL, c *Client) { + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + testGetBigPkg(t, c) + testGetUser(t, c) + testGetUser0(t, c) + testGetUser1(t, c) + testGetUser2(t, c) + testGetUser3(t, c) + testGetUser4(t, c) + testGetUser5(t, c) + testGetUser6(t, c) + testGetUser61(t, c) + +} +func testGetBigPkg(t *testing.T, c *Client) { + var ( + user *User + err error + ) + + user = &User{} + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetBigPkg", nil, nil, []interface{}{[]interface{}{nil}, user}, + []reflect.Value{reflect.ValueOf([]interface{}{nil}), reflect.ValueOf(user)}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + pendingResponse.Reply = user + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 8*time.Second, pendingResponse) + assert.NoError(t, err) + assert.NotEqual(t, "", user.Id) + assert.NotEqual(t, "", user.Name) +} +func testGetUser(t *testing.T, c *Client) { + var ( + user *User + err error + ) + user = &User{} + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetUser", nil, nil, []interface{}{"1", "username"}, + []reflect.Value{reflect.ValueOf("1"), reflect.ValueOf("username")}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + pendingResponse.Reply = user + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: "username"}, *user) +} + +func testGetUser0(t *testing.T, c *Client) { + var ( + user *User + err error + ) + user = &User{} + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetUser0", nil, nil, []interface{}{"1", nil, "username"}, + []reflect.Value{reflect.ValueOf("1"), reflect.ValueOf(nil), reflect.ValueOf("username")}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + rsp := remoting.NewPendingResponse(request.ID) + rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2")) + remoting.AddPendingResponse(rsp) + rsp.Reply = user + err = c.Request(request, 3*time.Second, rsp) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: "username"}, *user) +} +func testGetUser1(t *testing.T, c *Client) { + var ( + err error + ) + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetUser1", nil, nil, []interface{}{}, + []reflect.Value{}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + user := &User{} + pendingResponse.Reply = user + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) +} +func testGetUser2(t *testing.T, c *Client) { + var ( + err error + ) + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetUser2", nil, nil, []interface{}{}, + []reflect.Value{}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.EqualError(t, err, "error") +} +func testGetUser3(t *testing.T, c *Client) { + var ( + err error + ) + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetUser3", nil, nil, []interface{}{}, + []reflect.Value{}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + user2 := []interface{}{} + pendingResponse.Reply = &user2 + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) + assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) +} +func testGetUser4(t *testing.T, c *Client) { + var ( + err error + ) + request := remoting.NewRequest("2.0.2") + invocation := invocation.NewRPCInvocation("GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + user2 := []interface{}{} + pendingResponse.Reply = &user2 + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) + assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) +} + +func testGetUser5(t *testing.T, c *Client) { + var ( + err error + ) + request := remoting.NewRequest("2.0.2") + invocation := invocation.NewRPCInvocation("GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + user3 := map[interface{}]interface{}{} + pendingResponse.Reply = &user3 + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) + assert.NotNil(t, user3) + assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"]) +} + +func testGetUser6(t *testing.T, c *Client) { + var ( + user *User + err error + ) + user = &User{} + request := remoting.NewRequest("2.0.2") + invocation := invocation.NewRPCInvocation("GetUser6", []interface{}{0}, nil) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + pendingResponse.Reply = user + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) + assert.Equal(t, User{Id: "", Name: ""}, *user) +} + +func testGetUser61(t *testing.T, c *Client) { + var ( + user *User + err error + ) + user = &User{} + request := remoting.NewRequest("2.0.2") + invocation := invocation.NewRPCInvocation("GetUser6", []interface{}{1}, nil) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + pendingResponse := remoting.NewPendingResponse(request.ID) + pendingResponse.Reply = user + remoting.AddPendingResponse(pendingResponse) + err = c.Request(request, 3*time.Second, pendingResponse) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: ""}, *user) +} + +func testClient_AsyncCall(t *testing.T, svr *Server, url common.URL, client *Client) { + user := &User{} + lock := sync.Mutex{} + request := remoting.NewRequest("2.0.2") + invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"}, + []reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")}) + attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"} + setAttachment(invocation, attachment) + request.Data = invocation + request.Event = false + request.TwoWay = true + rsp := remoting.NewPendingResponse(request.ID) + rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2")) + remoting.AddPendingResponse(rsp) + rsp.Reply = user + rsp.Callback = func(response common.CallbackResponse) { + r := response.(remoting.AsyncCallbackResponse) + rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult) + assert.Equal(t, User{Id: "4", Name: "username"}, *(rst.Rest.(*User))) + lock.Unlock() + } + lock.Lock() + err := client.Request(request, 3*time.Second, rsp) + assert.NoError(t, err) + assert.Equal(t, User{}, *user) + time.Sleep(1 * time.Second) +} + +func InitTest(t *testing.T) (*Server, common.URL) { + + hessian.RegisterPOJO(&User{}) + remoting.RegistryCodec("dubbo", &DubboTestCodec{}) + + methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) + assert.NoError(t, err) + assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods) + + // config + SetClientConf(ClientConfig{ + ConnectionNum: 2, + HeartbeatPeriod: "5s", + SessionTimeout: "20s", + PoolTTL: 600, + PoolSize: 64, + GettySessionParam: GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "4s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "client", + }, + }) + assert.NoError(t, clientConf.CheckValidity()) + SetServerConfig(ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + GettySessionParam: GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "server", + }}) + assert.NoError(t, srvConf.CheckValidity()) + + url, err := common.NewURL("dubbo://127.0.0.1:20060/UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=127.0.0.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") + // init server + userProvider := &UserProvider{} + common.ServiceMap.Register(url.Protocol, userProvider) + invoker := &proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + } + handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult { + //result := protocol.RPCResult{} + r := invoker.Invoke(context.Background(), invocation) + result := protocol.RPCResult{ + Err: r.Error(), + Rest: r.Result(), + Attrs: r.Attachments(), + } + return result + } + server := NewServer(url, handler) + server.Start() + + time.Sleep(time.Second * 2) + + return server, url +} + +////////////////////////////////// +// provider +////////////////////////////////// + +type ( + User struct { + Id string `json:"id"` + Name string `json:"name"` + } + + UserProvider struct { + user map[string]User + } +) + +// size:4801228 +func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error { + argBuf := new(bytes.Buffer) + for i := 0; i < 400; i++ { + argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") + argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。") + } + rsp.Id = argBuf.String() + rsp.Name = argBuf.String() + return nil +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error { + rsp.Id = req[0].(string) + rsp.Name = req[1].(string) + return nil +} + +func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) { + return User{Id: id, Name: name}, nil +} + +func (u *UserProvider) GetUser1() error { + return nil +} + +func (u *UserProvider) GetUser2() error { + return perrors.New("error") +} + +func (u *UserProvider) GetUser3(rsp *[]interface{}) error { + *rsp = append(*rsp, User{Id: "1", Name: "username"}) + return nil +} + +func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) { + + return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil +} + +func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) { + return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil +} + +func (u *UserProvider) GetUser6(id int64) (*User, error) { + if id == 0 { + return nil, nil + } + return &User{Id: "1"}, nil +} + +func (u *UserProvider) Reference() string { + return "UserProvider" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/protocol/dubbo/server.go b/remoting/getty/getty_server.go similarity index 81% rename from protocol/dubbo/server.go rename to remoting/getty/getty_server.go index bd2b37b7a9f055745e183524d19a442af03360f4..6dc15d251f3b357f271ac420b6adc9ea194b957f 100644 --- a/protocol/dubbo/server.go +++ b/remoting/getty/getty_server.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package dubbo +package getty import ( "fmt" @@ -24,7 +24,7 @@ import ( import ( "github.com/dubbogo/getty" - "github.com/dubbogo/gost/sync" + gxsync "github.com/dubbogo/gost/sync" "gopkg.in/yaml.v2" ) @@ -32,6 +32,9 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" ) var ( @@ -39,7 +42,7 @@ var ( srvGrpool *gxsync.TaskPool ) -func init() { +func initServer(protocol string) { // load clientconfig from provider_config // default use dubbo @@ -52,7 +55,7 @@ func init() { if protocolConf == nil { logger.Info("protocol_conf default use dubbo config") } else { - dubboConf := protocolConf.(map[interface{}]interface{})[DUBBO] + dubboConf := protocolConf.(map[interface{}]interface{})[protocol] if dubboConf == nil { logger.Warnf("dubboConf is nil") return @@ -100,19 +103,27 @@ func SetServerGrpool() { // Server ... type Server struct { - conf ServerConfig - tcpServer getty.Server - rpcHandler *RpcServerHandler + conf ServerConfig + addr string + codec remoting.Codec + tcpServer getty.Server + rpcHandler *RpcServerHandler + requestHandler func(*invocation.RPCInvocation) protocol.RPCResult } // NewServer ... -func NewServer() *Server { +func NewServer(url common.URL, handlers func(*invocation.RPCInvocation) protocol.RPCResult) *Server { + //init + initServer(url.Protocol) s := &Server{ - conf: *srvConf, + conf: *srvConf, + addr: url.Location, + codec: remoting.GetCodec(url.Protocol), + requestHandler: handlers, } - s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout) + s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout, s) return s } @@ -142,7 +153,7 @@ func (s *Server) newSession(session getty.Session) error { session.SetName(conf.GettySessionParam.SessionName) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) - session.SetPkgHandler(rpcServerPkgHandler) + session.SetPkgHandler(NewRpcServerPackageHandler(s)) session.SetEventListener(s.rpcHandler) session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) @@ -157,18 +168,16 @@ func (s *Server) newSession(session getty.Session) error { } // Start ... -func (s *Server) Start(url common.URL) { +func (s *Server) Start() { var ( - addr string tcpServer getty.Server ) - addr = url.Location tcpServer = getty.NewTCPServer( - getty.WithLocalAddress(addr), + getty.WithLocalAddress(s.addr), ) tcpServer.RunEventLoop(s.newSession) - logger.Debugf("s bind addr{%s} ok!", addr) + logger.Debugf("s bind addr{%s} ok!", s.addr) s.tcpServer = tcpServer } diff --git a/protocol/dubbo/listener.go b/remoting/getty/listener.go similarity index 57% rename from protocol/dubbo/listener.go rename to remoting/getty/listener.go index 0251b78a2b0d27a68461c16c284b1af53bcb08aa..777e14c2ecb344d3fa2aa2084b79c7805f542e28 100644 --- a/protocol/dubbo/listener.go +++ b/remoting/getty/listener.go @@ -15,30 +15,26 @@ * limitations under the License. */ -package dubbo +package getty import ( - "context" "fmt" - "net/url" "sync" "sync/atomic" "time" ) import ( - "github.com/apache/dubbo-go-hessian2" + hessian "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" - "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" ) // todo: WritePkg_Timeout will entry *.yml @@ -98,47 +94,48 @@ func (h *RpcClientHandler) OnClose(session getty.Session) { // OnMessage ... func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { - p, ok := pkg.(*DubboPackage) + result, ok := pkg.(remoting.DecodeResult) if !ok { logger.Errorf("illegal package") return } - - if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - if p.Header.Type&hessian.PackageResponse != 0x00 { - logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) - if p.Err != nil { - logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) - } - h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) - } else { - logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - p.Header.ResponseStatus = hessian.Response_OK - reply(session, p, hessian.PackageHeartbeat) + // get heartbeart request from server + if result.IsRequest { + req := result.Result.(*remoting.Request) + if req.Event { + logger.Debugf("get rpc heartbeat request{%#v}", req) + resp := remoting.NewResponse(req.ID, req.Version) + resp.Status = hessian.Response_OK + resp.Event = req.Event + resp.SerialID = req.SerialID + resp.Version = "2.0.2" + reply(session, resp, hessian.PackageHeartbeat) + return } + logger.Errorf("illegal request but not heartbeart. {%#v}", req) return } - logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) - - h.conn.updateSession(session) - pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) - if pendingResponse == nil { - logger.Errorf("failed to get pending response context for response package %s", *p) + p := result.Result.(*remoting.Response) + // get heartbeart + if p.Event { + logger.Debugf("get rpc heartbeat response{%#v}", p) + if p.Error != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Error) + } + h.conn.pool.rpcClient.responseHandler.Handler(p) return } - - if p.Err != nil { - pendingResponse.err = p.Err + if result.IsRequest { + logger.Errorf("illegal package for it is response type. {%#v}", pkg) + return } - pendingResponse.response.atta = p.Body.(*Response).atta + logger.Debugf("get rpc response{%#v}", p) - if pendingResponse.callback == nil { - pendingResponse.done <- struct{}{} - } else { - pendingResponse.callback(pendingResponse.GetCallResponse()) - } + h.conn.updateSession(session) + + h.conn.pool.rpcClient.responseHandler.Handler(p) } // OnCron ... @@ -169,14 +166,16 @@ type RpcServerHandler struct { sessionTimeout time.Duration sessionMap map[getty.Session]*rpcSession rwlock sync.RWMutex + server *Server } // NewRpcServerHandler ... -func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { +func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration, serverP *Server) *RpcServerHandler { return &RpcServerHandler{ maxSessionNum: maxSessionNum, sessionTimeout: sessionTimeout, sessionMap: make(map[getty.Session]*rpcSession), + server: serverP, } } @@ -223,87 +222,67 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { } h.rwlock.Unlock() - p, ok := pkg.(*DubboPackage) + decodeResult, ok := pkg.(remoting.DecodeResult) if !ok { logger.Errorf("illegal package{%#v}", pkg) return } - p.Header.ResponseStatus = hessian.Response_OK - - // heartbeat - if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - reply(session, p, hessian.PackageHeartbeat) + if !decodeResult.IsRequest { + logger.Errorf("illegal package for it is response type. {%#v}", pkg) return } + req := decodeResult.Result.(*remoting.Request) - twoway := true - // not twoway - if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 { - twoway = false + resp := remoting.NewResponse(req.ID, req.Version) + resp.Status = hessian.Response_OK + resp.Event = req.Event + resp.SerialID = req.SerialID + resp.Version = "2.0.2" + + // heartbeat + if req.Event { + logger.Debugf("get rpc heartbeat request{%#v}", resp) + reply(session, resp, hessian.PackageHeartbeat) + return } defer func() { if e := recover(); e != nil { - p.Header.ResponseStatus = hessian.Response_SERVER_ERROR + resp.Status = hessian.Response_SERVER_ERROR if err, ok := e.(error); ok { logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err)) - p.Body = perrors.WithStack(err) + resp.Error = perrors.WithStack(err) } else if err, ok := e.(string); ok { logger.Errorf("OnMessage panic: %+v", perrors.New(err)) - p.Body = perrors.New(err) + resp.Error = perrors.New(err) } else { logger.Errorf("OnMessage panic: %+v, this is impossible.", e) - p.Body = e + resp.Error = fmt.Errorf("OnMessage panic unknow exception. %+v", e) } - if !twoway { + if !req.TwoWay { return } - reply(session, p, hessian.PackageResponse) + reply(session, resp, hessian.PackageResponse) } }() - u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}), - common.WithParamsValue(constant.GROUP_KEY, p.Service.Group), - common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface), - common.WithParamsValue(constant.VERSION_KEY, p.Service.Version)) - exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey()) - if exporter == nil { - err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey()) - logger.Errorf(err.Error()) - p.Header.ResponseStatus = hessian.Response_OK - p.Body = err - reply(session, p, hessian.PackageResponse) + invoc, ok := req.Data.(*invocation.RPCInvocation) + if !ok { + panic("create invocation occur some exception for the type is not suitable one.") return } - invoker := exporter.(protocol.Exporter).GetInvoker() - if invoker != nil { - attachments := p.Body.(map[string]interface{})["attachments"].(map[string]string) - attachments[constant.LOCAL_ADDR] = session.LocalAddr() - attachments[constant.REMOTE_ADDR] = session.RemoteAddr() - - args := p.Body.(map[string]interface{})["args"].([]interface{}) - inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments) - - ctx := rebuildCtx(inv) - - result := invoker.Invoke(ctx, inv) - if err := result.Error(); err != nil { - p.Header.ResponseStatus = hessian.Response_OK - p.Body = hessian.NewResponse(nil, err, result.Attachments()) - } else { - res := result.Result() - p.Header.ResponseStatus = hessian.Response_OK - p.Body = hessian.NewResponse(res, nil, result.Attachments()) - } - } + attachments := invoc.Attachments() + attachments[constant.LOCAL_ADDR] = session.LocalAddr() + attachments[constant.REMOTE_ADDR] = session.RemoteAddr() - if !twoway { + result := h.server.requestHandler(invoc) + if !req.TwoWay { return } - reply(session, p, hessian.PackageResponse) + resp.Result = result + reply(session, resp, hessian.PackageResponse) } // OnCron ... @@ -332,38 +311,9 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -// rebuildCtx rebuild the context by attachment. -// Once we decided to transfer more context's key-value, we should change this. -// now we only support rebuild the tracing context -func rebuildCtx(inv *invocation.RPCInvocation) context.Context { - ctx := context.Background() - - // actually, if user do not use any opentracing framework, the err will not be nil. - spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, - opentracing.TextMapCarrier(inv.Attachments())) - if err == nil { - ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx) - } - return ctx -} - -func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { - resp := &DubboPackage{ - Header: hessian.DubboHeader{ - SerialID: req.Header.SerialID, - Type: tp, - ID: req.Header.ID, - ResponseStatus: req.Header.ResponseStatus, - }, - } - - if req.Header.Type&hessian.PackageRequest != 0x00 { - resp.Body = req.Body - } else { - resp.Body = nil - } +func reply(session getty.Session, resp *remoting.Response, tp hessian.PackageType) { if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { - logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header) + logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), resp) } } diff --git a/protocol/dubbo/listener_test.go b/remoting/getty/listener_test.go similarity index 74% rename from protocol/dubbo/listener_test.go rename to remoting/getty/listener_test.go index 5f809814607558650e09934019db96dbb2ceeeae..3bb659c2310e76d3b94cc17129d762f7f8b76add 100644 --- a/protocol/dubbo/listener_test.go +++ b/remoting/getty/listener_test.go @@ -15,9 +15,10 @@ * limitations under the License. */ -package dubbo +package getty import ( + "context" "testing" ) @@ -56,3 +57,18 @@ func TestRebuildCtx(t *testing.T) { assert.NotNil(t, ctx) assert.NotNil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX)) } + +// rebuildCtx rebuild the context by attachment. +// Once we decided to transfer more context's key-value, we should change this. +// now we only support rebuild the tracing context +func rebuildCtx(inv *invocation.RPCInvocation) context.Context { + ctx := context.WithValue(context.Background(), "attachment", inv.Attachments()) + + // actually, if user do not use any opentracing framework, the err will not be nil. + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, + opentracing.TextMapCarrier(inv.Attachments())) + if err == nil { + ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx) + } + return ctx +} diff --git a/protocol/dubbo/pool.go b/remoting/getty/pool.go similarity index 91% rename from protocol/dubbo/pool.go rename to remoting/getty/pool.go index 918514c2676cfc69336a9f53e6d16d7f23cf7dca..fdefa238202f0b650c9b7cf3389b81cca2f1c80d 100644 --- a/protocol/dubbo/pool.go +++ b/remoting/getty/pool.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package dubbo +package getty import ( "fmt" @@ -36,10 +36,10 @@ import ( ) type gettyRPCClient struct { - once sync.Once - protocol string - addr string - active int64 // zero, not create or be destroyed + once sync.Once + //protocol string + addr string + active int64 // zero, not create or be destroyed pool *gettyRPCClientPool @@ -52,11 +52,11 @@ var ( errClientPoolClosed = perrors.New("client pool closed") ) -func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) { +func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string) (*gettyRPCClient, error) { c := &gettyRPCClient{ - protocol: protocol, - addr: addr, - pool: pool, + //protocol: protocol, + addr: addr, + pool: pool, gettyClient: getty.NewTCPClient( getty.WithServerAddress(addr), getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)), @@ -98,7 +98,6 @@ func (c *gettyRPCClient) newSession(session getty.Session) error { tcpConn *net.TCPConn conf ClientConfig ) - conf = c.pool.rpcClient.conf if conf.GettySessionParam.CompressEncoding { session.SetCompressType(getty.CompressZip) @@ -301,7 +300,8 @@ func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) * rpcClient: rpcClient, size: size, ttl: int64(ttl.Seconds()), - conns: make([]*gettyRPCClient, 0, 16), + // init capacity : 2 + conns: make([]*gettyRPCClient, 0, 2), } } @@ -315,11 +315,14 @@ func (p *gettyRPCClientPool) close() { } } -func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { +func (p *gettyRPCClientPool) getGettyRpcClient(addr string) (*gettyRPCClient, error) { conn, err := p.get() if err == nil && conn == nil { // create new conn - rpcClientConn, err := newGettyRPCClientConn(p, protocol, addr) + rpcClientConn, err := newGettyRPCClientConn(p, addr) + if err == nil { + p.put(rpcClientConn) + } return rpcClientConn, perrors.WithStack(err) } return conn, perrors.WithStack(err) @@ -333,10 +336,15 @@ func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) { if p.conns == nil { return nil, errClientPoolClosed } - - for len(p.conns) > 0 { - conn := p.conns[len(p.conns)-1] - p.conns = p.conns[:len(p.conns)-1] + for num := len(p.conns); num > 0; { + var conn *gettyRPCClient + if num != 1 { + conn = p.conns[rand.Int31n(int32(num))] + } else { + conn = p.conns[0] + } + // This will recreate gettyRpcClient for remove last position + //p.conns = p.conns[:len(p.conns)-1] if d := now - conn.getActive(); d > p.ttl { p.remove(conn) @@ -353,21 +361,17 @@ func (p *gettyRPCClientPool) put(conn *gettyRPCClient) { if conn == nil || conn.getActive() == 0 { return } - p.Lock() defer p.Unlock() - if p.conns == nil { return } - // check whether @conn has existed in p.conns or not. for i := range p.conns { if p.conns[i] == conn { return } } - if len(p.conns) >= p.size { // delete @conn from client pool // p.remove(conn) diff --git a/protocol/dubbo/readwriter.go b/remoting/getty/readwriter.go similarity index 50% rename from protocol/dubbo/readwriter.go rename to remoting/getty/readwriter.go index b5c4f509190dbdc85825ad424656240b234786df..8842f1b8dcd50cfe2e814f626f3e8e50c6f42424 100644 --- a/protocol/dubbo/readwriter.go +++ b/remoting/getty/readwriter.go @@ -15,23 +15,22 @@ * limitations under the License. */ -package dubbo +package getty import ( - "bytes" "reflect" ) import ( - "github.com/apache/dubbo-go-hessian2" + hessian "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" ) import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" ) //////////////////////////////////////////// @@ -49,37 +48,35 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { } func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - pkg := &DubboPackage{} - - buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf, p.client) + //pkg := &DubboPackage{} + //p.client.ExchangeClient.GetPendingResponse(remoting.SequenceType()) + resp, length, err := (p.client.codec).Decode(data) + //err := pkg.Unmarshal(buf, p.client) if err != nil { - originErr := perrors.Cause(err) - if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + if err == hessian.ErrHeaderNotEnough || err == hessian.ErrBodyNotEnough { return nil, 0, nil } logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) - return nil, 0, perrors.WithStack(err) - } - - if pkg.Header.Type&hessian.PackageRequest == 0x00 { - pkg.Err = pkg.Body.(*hessian.Response).Exception - pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + return nil, length, err } + //if pkg.Header.Type&hessian.PackageRequest == 0x00 { + // pkg.Err = pkg.Body.(*hessian.Response).Exception + // pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + //} - return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil + return resp, length, nil } func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { - req, ok := pkg.(*DubboPackage) + req, ok := pkg.(*remoting.Request) if !ok { logger.Errorf("illegal pkg:%+v\n", pkg) return nil, perrors.New("invalid rpc request") } - buf, err := req.Marshal() + buf, err := (p.client.codec).EncodeRequest(req) if err != nil { logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err)) return nil, perrors.WithStack(err) @@ -92,91 +89,44 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by // RpcServerPackageHandler //////////////////////////////////////////// -var ( - rpcServerPkgHandler = &RpcServerPackageHandler{} -) +//var ( +// rpcServerPkgHandler = &RpcServerPackageHandler{} +//) // RpcServerPackageHandler ... -type RpcServerPackageHandler struct{} +type RpcServerPackageHandler struct { + server *Server +} + +func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler { + return &RpcServerPackageHandler{server: server} +} func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - pkg := &DubboPackage{ - Body: make([]interface{}, 7), - } + req, length, err := (p.server.codec).Decode(data) + //resp,len, err := (*p.).DecodeResponse(buf) - buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf) if err != nil { - originErr := perrors.Cause(err) - if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + if err == hessian.ErrHeaderNotEnough || err == hessian.ErrBodyNotEnough { return nil, 0, nil } logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) - return nil, 0, perrors.WithStack(err) - } - - if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { - // convert params of request - req := pkg.Body.([]interface{}) // length of body should be 7 - if len(req) > 0 { - var dubboVersion, argsTypes string - var args []interface{} - var attachments map[string]string - if req[0] != nil { - dubboVersion = req[0].(string) - } - if req[1] != nil { - pkg.Service.Path = req[1].(string) - } - if req[2] != nil { - pkg.Service.Version = req[2].(string) - } - if req[3] != nil { - pkg.Service.Method = req[3].(string) - } - if req[4] != nil { - argsTypes = req[4].(string) - } - if req[5] != nil { - args = req[5].([]interface{}) - } - if req[6] != nil { - attachments = req[6].(map[string]string) - } - if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { - pkg.Service.Path = attachments[constant.PATH_KEY] - } - if _, ok := attachments[constant.INTERFACE_KEY]; ok { - pkg.Service.Interface = attachments[constant.INTERFACE_KEY] - } else { - pkg.Service.Interface = pkg.Service.Path - } - if len(attachments[constant.GROUP_KEY]) > 0 { - pkg.Service.Group = attachments[constant.GROUP_KEY] - } - pkg.Body = map[string]interface{}{ - "dubboVersion": dubboVersion, - "argsTypes": argsTypes, - "args": args, - "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Path), // path as a key - "attachments": attachments, - } - } + return nil, 0, err } - return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil + return req, length, err } func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { - res, ok := pkg.(*DubboPackage) + res, ok := pkg.(*remoting.Response) if !ok { logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) return nil, perrors.New("invalid rpc response") } - buf, err := res.Marshal() + buf, err := (p.server.codec).EncodeResponse(res) if err != nil { logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err)) return nil, perrors.WithStack(err)