diff --git a/README.md b/README.md index d01c5df56a4e33a6bd062c64201fc985c79077b1..36e0ad78c3aefe21c3847f2ede1c4cbec8db9851 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,6 @@ --- Apache Dubbo Go Implementation. - - -Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/). ## License diff --git a/README_CN.md b/README_CN.md index 9a769a196afa87eabc88649a5315b2772a8e4fef..387070d267fbba05fb99226f83bde45e236e491c 100644 --- a/README_CN.md +++ b/README_CN.md @@ -5,9 +5,6 @@ --- Apache Dubbo Go 璇█瀹炵幇 - - -Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/). ## 璇佷功 ## diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 1665a7346e09016570dd36c56d231d3706b96a54..bafba60b400ec59d99e2d68ecf4d067c906ba6fb 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -17,10 +17,20 @@ package proxy_factory +import ( + "reflect" + "strings" +) + +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/proxy" "github.com/apache/dubbo-go/protocol" ) @@ -51,6 +61,86 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm return proxy.NewProxy(invoker, nil, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { - // todo: call service - return protocol.NewBaseInvoker(url) + return &ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + } +} + +type ProxyInvoker struct { + protocol.BaseInvoker +} + +func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + result := &protocol.RPCResult{} + result.SetAttachments(invocation.Attachments()) + + url := pi.GetUrl() + + methodName := invocation.MethodName() + proto := url.Protocol + path := strings.TrimPrefix(url.Path, "/") + args := invocation.Arguments() + + // get service + svc := common.ServiceMap.GetService(proto, path) + if svc == nil { + logger.Errorf("cannot find service [%s] in %s", path, proto) + result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto)) + return result + } + + // get method + method := svc.Method()[methodName] + if method == nil { + logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto) + result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)) + return result + } + + in := []reflect.Value{svc.Rcvr()} + if method.CtxType() != nil { + in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later. + } + + // prepare argv + if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { + in = append(in, reflect.ValueOf(args)) + } else { + for i := 0; i < len(args); i++ { + t := reflect.ValueOf(args[i]) + if !t.IsValid() { + at := method.ArgsType()[i] + if at.Kind() == reflect.Ptr { + at = at.Elem() + } + t = reflect.New(at) + } + in = append(in, t) + } + } + + // prepare replyv + var replyv reflect.Value + if method.ReplyType() == nil && len(method.ArgsType()) > 0 { + replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) + in = append(in, replyv) + } + + returnValues := method.Method().Func.Call(in) + + var retErr interface{} + if len(returnValues) == 1 { + retErr = returnValues[0].Interface() + } else { + replyv = returnValues[0] + retErr = returnValues[1].Interface() + } + if retErr != nil { + result.SetError(retErr.(error)) + } else { + if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { + result.SetResult(replyv.Interface()) + } + } + return result } diff --git a/examples/README.md b/examples/README.md index 497926f1d3822f7b7d33640fa18cbc2bd65bdbb9..d520c5cceecd30ea6d4cae9bf416ae079b3a3f84 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,74 +1,80 @@ # examples -Examples of go-for-apache-dubbo +Examples of dubbo-go -## dubbo +## What does this contain -#### Build by these command +* helloworld -java server -```bash -cd dubbo/java-server -sh build.sh -``` + A simplest example. It contain 'go-client', 'go-server', 'java-server' of dubbo protocol. -java client -```bash -cd dubbo/java-client -sh build.sh -``` +* general -go server + A general example. It had validated zookeeper registry and different parameter lists of service. + And it has a comprehensive testing with dubbo/jsonrpc protocol. You can refer to it to create your first complete dubbo-go project. -* sh ./assembly/\[os]/\[environment].sh -```bash -cd dubbo/go-server -# $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] -sh ./assembly/$ARCH/$ENV.sh -``` +* generic -go client -```bash -cd dubbo/go-client -# $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] -sh ./assembly/$ARCH/$ENV.sh -``` + A generic reference example. It show how to use generic reference of dubbo-go. + +* configcenter + + Some examples of different config center. There is only one -- zookeeper at present. -#### Run by these command: +## How to build and run + +> Take `helloworld` as an example java server + ```bash -cd dubbo/java-server/target +cd helloworld/dubbo/java-server +sh build.sh + +cd ./target tar -zxvf user-info-server-0.2.0-assembly.tar.gz cd ./user-info-server-0.2.0 sh ./bin/server.sh start ``` java client + ```bash -cd dubbo/java-client/target +cd helloworld/dubbo/java-client +sh build.sh + +cd ./target tar -zxvf user-info-client-0.2.0-assembly.tar.gz cd ./user-info-client-0.2.0 sh ./bin/server.sh start ``` go server + +* $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] + ```bash -cd dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release -#conf suffix appoint config file, -#such as server_zookeeper.yml when "sh ./bin/load.sh start is zookeeper", -#default server.yml -sh ./bin/load.sh start [conf suffix] +cd helloworld/dubbo/go-server +sh ./assembly/$ARCH/$ENV.sh + +cd ./target/linux/user_info_server-0.3.1-20190517-0930-release +# $SUFFIX is a suffix of config file, +# such as server_zookeeper.yml when $SUFFIX is "zookeeper", +# if $SUFFIX = "", default server.yml +sh ./bin/load.sh start $SUFFIX ``` go client + +* $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] + ```bash -cd dubbo/go-client/target/linux/user_info_client-0.3.1-20190517-0921-release +cd helloworld/dubbo/go-client +sh ./assembly/$ARCH/$ENV.sh + +cd ./target/linux/user_info_client-0.3.1-20190517-0921-release # $SUFFIX is a suffix of config file, # such as client_zookeeper.yml when $SUFFIX = zookeeper", # if $SUFFIX = "", config file is client.yml sh ./bin/load_user_info_client.sh start $SUFFIX ``` - -## jsonrpc -Similar to dubbo diff --git a/examples/general/dubbo/go-client/app/client.go b/examples/general/dubbo/go-client/app/client.go index 60adad356b8a8ff843034dd8e6086a2b832c7792..f3ac4b99ab0abdd472056682ce8f4b3c4b41bc8b 100644 --- a/examples/general/dubbo/go-client/app/client.go +++ b/examples/general/dubbo/go-client/app/client.go @@ -33,9 +33,9 @@ import ( "github.com/apache/dubbo-go/common/logger" _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" - _ "github.com/apache/dubbo-go/registry/protocol" - _ "github.com/apache/dubbo-go/filter/impl" + _ "github.com/apache/dubbo-go/protocol/dubbo" + _ "github.com/apache/dubbo-go/registry/protocol" _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" @@ -168,7 +168,7 @@ func test1() { time.Sleep(3e9) - println("\n\n\nstart to test dubbo") + println("\n\n\nstart to test1 dubbo") user := &User{} err = userProvider1.GetUser(context.TODO(), []interface{}{"A003"}, user) if err != nil { @@ -176,7 +176,7 @@ func test1() { } println("response result: %v", user) - println("\n\n\nstart to test dubbo - GetUser0") + println("\n\n\nstart to test1 dubbo - GetUser0") ret, err := userProvider1.GetUser0("A003", "Moorse") if err != nil { panic(err) @@ -190,7 +190,7 @@ func test1() { } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser") + println("\n\n\nstart to test1 dubbo - getUser") user = &User{} var i int32 = 1 err = userProvider1.GetUser2(context.TODO(), []interface{}{i}, user) @@ -206,7 +206,7 @@ func test1() { } println("succ!") - println("\n\n\nstart to test dubbo - getErr") + println("\n\n\nstart to test1 dubbo - getErr") user = &User{} err = userProvider1.GetErr(context.TODO(), []interface{}{"A003"}, user) if err == nil { @@ -214,7 +214,7 @@ func test1() { } println("getErr - error: %v", err) - println("\n\n\nstart to test dubbo illegal method") + println("\n\n\nstart to test1 dubbo illegal method") err = userProvider1.GetUser1(context.TODO(), []interface{}{"A003"}, user) if err == nil { panic("err is nil") @@ -232,7 +232,7 @@ func test2() { time.Sleep(3e9) - println("\n\n\nstart to test dubbo") + println("\n\n\nstart to test2 dubbo") user := &User{} err = userProvider2.GetUser(context.TODO(), []interface{}{"A003"}, user) if err != nil { @@ -240,21 +240,21 @@ func test2() { } println("response result: %v", user) - println("\n\n\nstart to test dubbo - GetUser0") + println("\n\n\nstart to test2 dubbo - GetUser0") ret, err := userProvider2.GetUser0("A003", "Moorse") if err != nil { panic(err) } println("response result: %v", ret) - println("\n\n\nstart to test dubbo - GetUsers") + println("\n\n\nstart to test2 dubbo - GetUsers") ret1, err := userProvider2.GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) if err != nil { panic(err) } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser") + println("\n\n\nstart to test2 dubbo - getUser") user = &User{} var i int32 = 1 err = userProvider2.GetUser2(context.TODO(), []interface{}{i}, user) @@ -263,14 +263,14 @@ func test2() { } println("response result: %v", user) - println("\n\n\nstart to test dubbo - GetUser3") + println("\n\n\nstart to test2 dubbo - GetUser3") err = userProvider2.GetUser3() if err != nil { panic(err) } println("succ!") - println("\n\n\nstart to test dubbo - getErr") + println("\n\n\nstart to test2 dubbo - getErr") user = &User{} err = userProvider2.GetErr(context.TODO(), []interface{}{"A003"}, user) if err == nil { @@ -278,7 +278,7 @@ func test2() { } println("getErr - error: %v", err) - println("\n\n\nstart to test dubbo illegal method") + println("\n\n\nstart to test2 dubbo illegal method") err = userProvider2.GetUser1(context.TODO(), []interface{}{"A003"}, user) if err == nil { panic("err is nil") diff --git a/filter/impl/echo_filter.go b/filter/impl/echo_filter.go index 5eb5a37fa500bd8c180d879240d1c1e367df31ce..18e42c8cb2b15acb27573c5e24f11a8b69e0d496 100644 --- a/filter/impl/echo_filter.go +++ b/filter/impl/echo_filter.go @@ -43,7 +43,8 @@ func (ef *EchoFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invoc logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 { return &protocol.RPCResult{ - Rest: invocation.Arguments()[0], + Rest: invocation.Arguments()[0], + Attrs: invocation.Attachments(), } } diff --git a/go.mod b/go.mod index 20a8fc836788b1b30ca0368574f65f3b54d94d3a..264559029655be4505faff2d714e8c6e05c87074 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect - github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 + github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible diff --git a/go.sum b/go.sum index 4c6eb9b2553042914b00f2cd121277804b896a57..b04deb316931680ad3db2a68ca5b4f3c8dd3b92d 100644 --- a/go.sum +++ b/go.sum @@ -26,17 +26,17 @@ github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIO github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= -github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= -github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af h1:DBNMBMuMiWYu0b+8KMJuWmfCkcxl09JwdlqwDZZ6U14= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw= +github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= +github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 h1:Ku+3LFRYVelgo/INS9893QOUeIiKNeNKzK3CzDcqt/4= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 6ac5e7215429338aa7cabb646b3afd3f1a136b68..2cec5aa8faf2d3e60af93b333ed69e6e6b077a72 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -150,46 +150,74 @@ func NewClient(opt Options) *Client { return c } +type Request struct { + addr string + svcUrl common.URL + method string + args interface{} + atta map[string]string +} + +func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request { + return &Request{ + addr: addr, + svcUrl: svcUrl, + method: method, + args: args, + atta: atta, + } +} + +type Response struct { + reply interface{} + atta map[string]string +} + +func NewResponse(reply interface{}, atta map[string]string) *Response { + return &Response{ + reply: reply, + atta: atta, + } +} + // call one way -func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}) error { +func (c *Client) CallOneway(request *Request) error { - return perrors.WithStack(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil)) + return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) } -// if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}) error { +// if @response is nil, the transport layer will get the response without notify the invoker. +func (c *Client) Call(request *Request, response *Response) error { ct := CT_TwoWay - if reply == nil { + if response.reply == nil { ct = CT_OneWay } - return perrors.WithStack(c.call(ct, addr, svcUrl, method, args, reply, nil)) + return perrors.WithStack(c.call(ct, request, response, nil)) } -func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args interface{}, - callback AsyncCallback, reply interface{}) error { +func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error { - return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback)) + return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) } -func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, - args, reply interface{}, callback AsyncCallback) error { +func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error { p := &DubboPackage{} - p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") - p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") - p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, "") - p.Service.Method = method + p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") + p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "") + p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") + p.Service.Method = request.method p.Service.Timeout = c.opts.RequestTimeout p.Header.SerialID = byte(S_Dubbo) - p.Body = args + p.Body = hessian.NewRequest(request.args, request.atta) var rsp *PendingResponse if ct != CT_OneWay { p.Header.Type = hessian.PackageRequest_TwoWay rsp = NewPendingResponse() - rsp.reply = reply + rsp.response = response rsp.callback = callback } else { p.Header.Type = hessian.PackageRequest @@ -200,7 +228,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string session getty.Session conn *gettyRPCClient ) - conn, session, err = c.selectSession(addr) + conn, session, err = c.selectSession(request.addr) if err != nil { return perrors.WithStack(err) } @@ -259,6 +287,7 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, if pkg == nil { pkg = &DubboPackage{} + pkg.Body = hessian.NewRequest([]interface{}{}, nil) pkg.Body = []interface{}{} pkg.Header.Type = hessian.PackageHeartbeat pkg.Header.SerialID = byte(S_Dubbo) diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index cd961d382933443e37a08c21b4e4de5edb971860..eb1f15c862a910120e118c06bf9b572e93f58832 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -33,6 +33,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" ) @@ -50,7 +51,7 @@ func TestClient_CallOneway(t *testing.T) { c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) //user := &User{} - err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}) + err := c.CallOneway(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil)) assert.NoError(t, err) // destroy @@ -70,51 +71,56 @@ func TestClient_Call(t *testing.T) { } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - user := &User{} - err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) + var ( + user *User + err error + ) + + user = &User{} + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.NotEqual(t, "", user.Id) assert.NotEqual(t, "", user.Name) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) - err = c.Call("127.0.0.1:20000", url, "GetUser1", []interface{}{}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser1", []interface{}{}, nil), NewResponse(user, nil)) assert.NoError(t, err) - err = c.Call("127.0.0.1:20000", url, "GetUser2", []interface{}{}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser2", []interface{}{}, nil), NewResponse(user, nil)) assert.EqualError(t, err, "error") user2 := []interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser3", []interface{}{}, &user2) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser3", []interface{}{}, nil), NewResponse(&user2, nil)) assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) user2 = []interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, &user2) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil), NewResponse(&user2, nil)) assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) user3 := map[interface{}]interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, &user3) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil), NewResponse(&user3, nil)) assert.NoError(t, err) assert.NotNil(t, user3) assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"]) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "", Name: ""}, *user) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: ""}, *user) @@ -138,10 +144,10 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, func(response CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) { + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() - }, user) + }, NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{}, *user) @@ -209,7 +215,9 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 98c29a4e5bc576f7e37f74c0c0abbbab4687717b..a878ffd91e29d6949870ec25fed9481f301b435a 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -91,9 +91,8 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) if !ok { return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) - } else { - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply} } + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } // read body @@ -111,14 +110,15 @@ type PendingResponse struct { start time.Time readStart time.Time callback AsyncCallback - reply interface{} + response *Response done chan struct{} } func NewPendingResponse() *PendingResponse { return &PendingResponse{ - start: time.Now(), - done: make(chan struct{}), + start: time.Now(), + response: &Response{}, + done: make(chan struct{}), } } @@ -127,6 +127,6 @@ func (r PendingResponse) GetCallResponse() CallResponse { Cause: r.err, Start: r.start, ReadStart: r.readStart, - Reply: r.reply, + Reply: r.response, } } diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go index 52bb1fc130bb2dad866799f01c43d11ffd10a220..c192c2294db5597517ace011224e34f8affefb1f 100644 --- a/protocol/dubbo/codec_test.go +++ b/protocol/dubbo/codec_test.go @@ -64,11 +64,11 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type) assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) assert.Equal(t, int64(10086), pkgres.Header.ID) - assert.Equal(t, "2.5.4", pkgres.Body.([]interface{})[0]) + assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0]) assert.Equal(t, "path", pkgres.Body.([]interface{})[1]) assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) - assert.Equal(t, map[interface{}]interface{}{"group": "", "interface": "Service", "path": "path", "timeout": "1000"}, pkgres.Body.([]interface{})[6]) + assert.Equal(t, map[string]string{"dubbo": "2.0.2", "group": "", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6]) } diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 46b3dcc8463d8406c494d80149b28f0dba7444d0..4bfc1324cf9dcdb2412e7026214f6a3951a9c7c1 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -34,7 +34,7 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) -var Err_No_Reply = perrors.New("request need @reply") +var Err_No_Reply = perrors.New("request need @response") type DubboInvoker struct { protocol.BaseInvoker @@ -64,21 +64,23 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { logger.Errorf("ParseBool - error: %v", err) async = false } + response := NewResponse(inv.Reply(), nil) if async { if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { - result.Err = di.client.AsyncCall(url.Location, url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply()) + result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { - result.Err = di.client.CallOneway(url.Location, url, inv.MethodName(), inv.Arguments()) + result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) } } else { if inv.Reply() == nil { result.Err = Err_No_Reply } else { - result.Err = di.client.Call(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Reply()) + result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) } } if result.Err == nil { result.Rest = inv.Reply() + result.Attrs = response.atta } logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 09a4c128b600e605de616a65027da9b2ce6fcb20..0a765356f7353829c8486fddba986e3a444441a1 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -49,12 +49,13 @@ func TestDubboInvoker_Invoke(t *testing.T) { user := &User{} inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}), - invocation.WithReply(user)) + invocation.WithReply(user), invocation.WithAttachments(map[string]string{"test_key": "test_value"})) // Call res := invoker.Invoke(inv) assert.NoError(t, res.Error()) assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) + assert.Equal(t, "test_value", res.Attachments()["test_key"]) // test attachments for request/response // CallOneway inv.SetAttachments(constant.ASYNC_KEY, "true") @@ -65,7 +66,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { lock := sync.Mutex{} lock.Lock() inv.SetCallBack(func(response CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(inv) @@ -75,7 +76,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { inv.SetAttachments(constant.ASYNC_KEY, "false") inv.SetReply(nil) res = invoker.Invoke(inv) - assert.EqualError(t, res.Error(), "request need @reply") + assert.EqualError(t, res.Error(), "request need @response") // destroy lock.Lock() diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 4438a0b3d0e32127536b818806d190a2d2a5a2ba..59d1ea05160696754b46dfead5713684aa7a94f7 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -17,13 +17,16 @@ package dubbo +import ( + "sync" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol" - "sync" ) const ( diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 26ce4a1906d5d6fe425f23984586914c293f47a4..a6b0bc1df3cf2eb46e07c9dab149d04f62f78012 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -19,7 +19,6 @@ package dubbo import ( "context" - "github.com/apache/dubbo-go/common/constant" "testing" ) @@ -29,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index aa208284825665dae4c23f871117a7f34c548d16..1a7b002819271d7841e4749881d2ab379f9a7240 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -18,10 +18,8 @@ package dubbo import ( - "context" "fmt" "net/url" - "reflect" "sync" "time" ) @@ -107,6 +105,8 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { pendingResponse.err = p.Err } + pendingResponse.response.atta = p.Body.(*Response).atta + if pendingResponse.callback == nil { pendingResponse.done <- struct{}{} } else { @@ -209,6 +209,28 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { twoway = false } + defer func() { + if e := recover(); e != nil { + p.Header.ResponseStatus = hessian.Response_SERVER_ERROR + if err, ok := e.(error); ok { + logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err)) + p.Body = perrors.WithStack(err) + } else if err, ok := e.(string); ok { + logger.Errorf("OnMessage panic: %+v", perrors.New(err)) + p.Body = perrors.New(err) + } else { + logger.Errorf("OnMessage panic: %+v, this is impossible.", e) + p.Body = e + } + + if !twoway { + return + } + h.reply(session, p, hessian.PackageResponse) + } + + }() + u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}), common.WithParamsValue(constant.GROUP_KEY, p.Service.Group), common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface), @@ -224,27 +246,18 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { } invoker := exporter.(protocol.Exporter).GetInvoker() if invoker != nil { - result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), map[string]string{ - constant.PATH_KEY: p.Service.Path, - constant.GROUP_KEY: p.Service.Group, - constant.INTERFACE_KEY: p.Service.Interface, - constant.VERSION_KEY: p.Service.Version, - })) + result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), + p.Body.(map[string]interface{})["attachments"].(map[string]string))) if err := result.Error(); err != nil { p.Header.ResponseStatus = hessian.Response_OK - p.Body = err - h.reply(session, p, hessian.PackageResponse) - return - } - if res := result.Result(); res != nil { + p.Body = hessian.NewResponse(nil, err, result.Attachments()) + } else { + res := result.Result() p.Header.ResponseStatus = hessian.Response_OK - p.Body = res - h.reply(session, p, hessian.PackageResponse) - return + p.Body = hessian.NewResponse(res, nil, result.Attachments()) } } - h.callService(p, nil) if !twoway { return } @@ -276,91 +289,6 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { - - defer func() { - if e := recover(); e != nil { - req.Header.ResponseStatus = hessian.Response_SERVER_ERROR - if err, ok := e.(error); ok { - logger.Errorf("callService panic: %+v", perrors.WithStack(err)) - req.Body = perrors.WithStack(err) - } else if err, ok := e.(string); ok { - logger.Errorf("callService panic: %+v", perrors.New(err)) - req.Body = perrors.New(err) - } else { - logger.Errorf("callService panic: %+v, this is impossible.", e) - req.Body = e - } - } - }() - - svcIf := req.Body.(map[string]interface{})["service"] - if svcIf == nil { - logger.Errorf("service not found!") - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - req.Body = perrors.New("service not found") - return - } - svc := svcIf.(*common.Service) - method := svc.Method()[req.Service.Method] - if method == nil { - logger.Errorf("method not found!") - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - req.Body = perrors.New("method not found") - return - } - - in := []reflect.Value{svc.Rcvr()} - if method.CtxType() != nil { - in = append(in, method.SuiteContext(ctx)) - } - - // prepare argv - argv := req.Body.(map[string]interface{})["args"] - if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { - in = append(in, reflect.ValueOf(argv)) - } else { - for i := 0; i < len(argv.([]interface{})); i++ { - t := reflect.ValueOf(argv.([]interface{})[i]) - if !t.IsValid() { - at := method.ArgsType()[i] - if at.Kind() == reflect.Ptr { - at = at.Elem() - } - t = reflect.New(at) - } - in = append(in, t) - } - } - - // prepare replyv - var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } - - returnValues := method.Method().Func.Call(in) - - var retErr interface{} - if len(returnValues) == 1 { - retErr = returnValues[0].Interface() - } else { - replyv = returnValues[0] - retErr = returnValues[1].Interface() - } - if retErr != nil { - req.Header.ResponseStatus = hessian.Response_OK - req.Body = retErr - } else { - if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { - req.Body = replyv.Interface() - } else { - req.Body = nil - } - } -} - func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 8c6c8a5a53af4df9a89eae5db5752eb07f3aa446..eb5cbe786a0e00d19902a725dbf1b19cc236817f 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -23,7 +23,7 @@ import ( ) import ( - hessian "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) @@ -63,7 +63,7 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface } pkg.Err = pkg.Body.(*hessian.Response).Exception - pkg.Body = pkg.Body.(*hessian.Response).RspObj + pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } @@ -118,7 +118,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface if len(req) > 0 { var dubboVersion, argsTypes string var args []interface{} - var attachments map[interface{}]interface{} + var attachments map[string]string if req[0] != nil { dubboVersion = req[0].(string) } @@ -138,14 +138,14 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface args = req[5].([]interface{}) } if req[6] != nil { - attachments = req[6].(map[interface{}]interface{}) + attachments = req[6].(map[string]string) } - pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) - if pkg.Service.Path == "" && attachments[constant.PATH_KEY] != nil { - pkg.Service.Path = attachments[constant.PATH_KEY].(string) + pkg.Service.Interface = attachments[constant.INTERFACE_KEY] + if len(pkg.Service.Path) == 0 && len(attachments[constant.PATH_KEY]) != 0 { + pkg.Service.Path = attachments[constant.PATH_KEY] } - if attachments[constant.GROUP_KEY] != nil { - pkg.Service.Group = attachments[constant.GROUP_KEY].(string) + if len(attachments[constant.GROUP_KEY]) != 0 { + pkg.Service.Group = attachments[constant.GROUP_KEY] } pkg.Body = map[string]interface{}{ "dubboVersion": dubboVersion, diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 1f446803fd6c5f174f51e3fe9496c49ae4991691..9be55e247a730460a3adee5622fa978ef2defbfb 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -32,6 +32,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" ) @@ -60,7 +61,9 @@ func TestHTTPClient_Call(t *testing.T) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) client := NewHTTPClient(&HTTPOptions{}) diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go index bc88759bf522a35a30e8585429f1db614c3a15ce..8c910339858f4960ad0e394ae6271863d7654adc 100644 --- a/protocol/jsonrpc/jsonrpc_invoker_test.go +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -47,7 +48,9 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) client := NewHTTPClient(&HTTPOptions{ diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 6b3a39c68b4fdb417e8d2efaec4a43806acb2219..dc85e0f5e76fd07dbcd11646ae529c98e5323a15 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "net" "net/http" - "reflect" "runtime" "runtime/debug" "sync" @@ -330,13 +329,16 @@ func serveRequest(ctx context.Context, constant.VERSION_KEY: codec.req.Version, })) if err := result.Error(); err != nil { - if errRsp := sendErrorResp(header, []byte(err.Error())); errRsp != nil { + rspStream, err := codec.Write(err.Error(), invalidRequest) + if err != nil { + return perrors.WithStack(err) + } + if errRsp := sendErrorResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s", header, err, errRsp) - return perrors.WithStack(errRsp) } - } - if res := result.Result(); res != nil { + } else { + res := result.Result() rspStream, err := codec.Write("", res) if err != nil { return perrors.WithStack(err) @@ -344,102 +346,9 @@ func serveRequest(ctx context.Context, if errRsp := sendResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s", header, err, errRsp) - return perrors.WithStack(errRsp) - } - } - } - // get method - svc := common.ServiceMap.GetService(JSONRPC, path) - if svc == nil { - return perrors.New("cannot find svc " + path) - } - method := svc.Method()[methodName] - if method == nil { - return perrors.New("cannot find method " + methodName + " of svc " + path) - } - - in := []reflect.Value{svc.Rcvr()} - if method.CtxType() != nil { - in = append(in, method.SuiteContext(ctx)) - } - - // prepare argv - if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { - in = append(in, reflect.ValueOf(args)) - } else { - for i := 0; i < len(args); i++ { - t := reflect.ValueOf(args[i]) - if !t.IsValid() { - at := method.ArgsType()[i] - if at.Kind() == reflect.Ptr { - at = at.Elem() - } - t = reflect.New(at) } - in = append(in, t) } } - // prepare replyv - var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } - - returnValues := method.Method().Func.Call(in) - - var ( - retErr interface{} - errMsg string - ) - if len(returnValues) == 1 { - retErr = returnValues[0].Interface() - } else { - replyv = returnValues[0] - retErr = returnValues[1].Interface() - } - if retErr != nil { - errMsg = retErr.(error).Error() - } - - // write response - code := 200 - var rspReply interface{} - if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { - rspReply = replyv.Interface() - } - if len(errMsg) != 0 { - code = 500 - rspReply = invalidRequest - } - rspStream, err := codec.Write(errMsg, rspReply) - if err != nil { - return perrors.WithStack(err) - } - rsp := &http.Response{ - StatusCode: code, - ProtoMajor: 1, - ProtoMinor: 1, - Header: make(http.Header), - ContentLength: int64(len(rspStream)), - Body: ioutil.NopCloser(bytes.NewReader(rspStream)), - } - delete(header, "Content-Type") - delete(header, "Content-Length") - delete(header, "Timeout") - for k, v := range header { - rsp.Header.Set(k, v) - } - - rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) - rspBuf.Reset() - if err = rsp.Write(rspBuf); err != nil { - logger.Warnf("rsp.Write(rsp:%#v) = error:%s", rsp, err) - return nil - } - if _, err = rspBuf.WriteTo(conn); err != nil { - logger.Warnf("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err) - } return nil } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index 98bd206325ffd38a325b667f7f0bc2684bbf626f..dc376313549c24da1cc6cb64a42e8445ef4fe346 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -32,7 +32,6 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" - //"github.com/apache/dubbo-go/filter/impl" "github.com/apache/dubbo-go/protocol" ) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index ff33b5fe6e063257c12035f3262e0daae874363e..d01c92ec5ccf273de1ad60d3dd0b5804178b11c2 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -26,6 +26,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" @@ -183,20 +184,17 @@ func GetProtocol() protocol.Protocol { type wrappedInvoker struct { invoker protocol.Invoker - url common.URL protocol.BaseInvoker } func newWrappedInvoker(invoker protocol.Invoker, url common.URL) *wrappedInvoker { return &wrappedInvoker{ invoker: invoker, - url: url, - BaseInvoker: *protocol.NewBaseInvoker(common.URL{}), + BaseInvoker: *protocol.NewBaseInvoker(url), } } -func (ivk *wrappedInvoker) GetUrl() common.URL { - return ivk.url -} -func (ivk *wrappedInvoker) getInvoker() protocol.Invoker { - return ivk.invoker +func (ivk *wrappedInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + // get right url + ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) + return ivk.invoker.Invoke(invocation) }