diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 1665a7346e09016570dd36c56d231d3706b96a54..33d4dcc71ef0c9f3bce2d048e58bb7d202a19976 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -17,6 +17,13 @@ package proxy_factory +import ( + "github.com/apache/dubbo-go/common/logger" + perrors "github.com/pkg/errors" + "reflect" + "strings" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -51,6 +58,83 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm return proxy.NewProxy(invoker, nil, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { - // todo: call service - return protocol.NewBaseInvoker(url) + return &ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + } +} + +type ProxyInvoker struct { + protocol.BaseInvoker +} + +func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + result := &protocol.RPCResult{} + + methodName := invocation.MethodName() + proto := pi.GetUrl().Protocol + path := strings.TrimPrefix(pi.GetUrl().Path, "/") + args := invocation.Arguments() + + // get service + svc := common.ServiceMap.GetService(proto, path) + if svc == nil { + logger.Errorf("cannot find service [%s] in %s", path, proto) + result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto)) + return result + } + + // get method + method := svc.Method()[methodName] + if method == nil { + logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto) + result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)) + return result + } + + in := []reflect.Value{svc.Rcvr()} + if method.CtxType() != nil { + in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later. + } + + // prepare argv + if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { + in = append(in, reflect.ValueOf(args)) + } else { + for i := 0; i < len(args); i++ { + t := reflect.ValueOf(args[i]) + if !t.IsValid() { + at := method.ArgsType()[i] + if at.Kind() == reflect.Ptr { + at = at.Elem() + } + t = reflect.New(at) + } + in = append(in, t) + } + } + + // prepare replyv + var replyv reflect.Value + if method.ReplyType() == nil && len(method.ArgsType()) > 0 { + replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) + in = append(in, replyv) + } + + returnValues := method.Method().Func.Call(in) + + var retErr interface{} + if len(returnValues) == 1 { + retErr = returnValues[0].Interface() + } else { + replyv = returnValues[0] + retErr = returnValues[1].Interface() + } + if retErr != nil { + result.SetError(retErr.(error)) + } else { + if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { + result.SetResult(replyv.Interface()) + } + } + return result } diff --git a/go.mod b/go.mod index 61ce25b5cd909ffb50298e35122eb78fde1365ea..3dece6c19bacf8954f6eaebfd8ee9a0ed83d877f 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/apache/dubbo-go require ( github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 github.com/magiconair/properties v1.8.1 diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index cd961d382933443e37a08c21b4e4de5edb971860..531c53a50849b18045a503aa2ed18f8af58a39cb 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -33,6 +33,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" ) @@ -70,8 +71,13 @@ func TestClient_Call(t *testing.T) { } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - user := &User{} - err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) + var ( + user *User + err error + ) + + user = &User{} + err = c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) assert.NoError(t, err) assert.NotEqual(t, "", user.Id) assert.NotEqual(t, "", user.Name) @@ -209,7 +215,9 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index c0d5fe29169606c0655ac60e571ba9f59912ffa0..14d83ae1dde836161fe0832af2b06144b6f3c883 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -18,10 +18,8 @@ package dubbo import ( - "context" "fmt" "net/url" - "reflect" "sync" "time" ) @@ -208,6 +206,27 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { twoway = false } + defer func() { + if e := recover(); e != nil { + p.Header.ResponseStatus = hessian.Response_SERVER_ERROR + if err, ok := e.(error); ok { + logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err)) + p.Body = perrors.WithStack(err) + } else if err, ok := e.(string); ok { + logger.Errorf("OnMessage panic: %+v", perrors.New(err)) + p.Body = perrors.New(err) + } else { + logger.Errorf("OnMessage panic: %+v, this is impossible.", e) + p.Body = e + } + } + + if !twoway { + return + } + h.reply(session, p, hessian.PackageResponse) + }() + u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}), common.WithParamsValue(constant.GROUP_KEY, p.Service.Group), common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface), @@ -232,18 +251,13 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if err := result.Error(); err != nil { p.Header.ResponseStatus = hessian.Response_OK p.Body = err - h.reply(session, p, hessian.PackageResponse) - return - } - if res := result.Result(); res != nil { + } else { + res := result.Result() p.Header.ResponseStatus = hessian.Response_OK p.Body = res - h.reply(session, p, hessian.PackageResponse) - return } } - h.callService(p, nil) if !twoway { return } @@ -275,91 +289,6 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { - - defer func() { - if e := recover(); e != nil { - req.Header.ResponseStatus = hessian.Response_SERVER_ERROR - if err, ok := e.(error); ok { - logger.Errorf("callService panic: %+v", perrors.WithStack(err)) - req.Body = perrors.WithStack(err) - } else if err, ok := e.(string); ok { - logger.Errorf("callService panic: %+v", perrors.New(err)) - req.Body = perrors.New(err) - } else { - logger.Errorf("callService panic: %+v, this is impossible.", e) - req.Body = e - } - } - }() - - svcIf := req.Body.(map[string]interface{})["service"] - if svcIf == nil { - logger.Errorf("service not found!") - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - req.Body = perrors.New("service not found") - return - } - svc := svcIf.(*common.Service) - method := svc.Method()[req.Service.Method] - if method == nil { - logger.Errorf("method not found!") - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - req.Body = perrors.New("method not found") - return - } - - in := []reflect.Value{svc.Rcvr()} - if method.CtxType() != nil { - in = append(in, method.SuiteContext(ctx)) - } - - // prepare argv - argv := req.Body.(map[string]interface{})["args"] - if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { - in = append(in, reflect.ValueOf(argv)) - } else { - for i := 0; i < len(argv.([]interface{})); i++ { - t := reflect.ValueOf(argv.([]interface{})[i]) - if !t.IsValid() { - at := method.ArgsType()[i] - if at.Kind() == reflect.Ptr { - at = at.Elem() - } - t = reflect.New(at) - } - in = append(in, t) - } - } - - // prepare replyv - var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } - - returnValues := method.Method().Func.Call(in) - - var retErr interface{} - if len(returnValues) == 1 { - retErr = returnValues[0].Interface() - } else { - replyv = returnValues[0] - retErr = returnValues[1].Interface() - } - if retErr != nil { - req.Header.ResponseStatus = hessian.Response_OK - req.Body = retErr - } else { - if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { - req.Body = replyv.Interface() - } else { - req.Body = nil - } - } -} - func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 1f446803fd6c5f174f51e3fe9496c49ae4991691..9be55e247a730460a3adee5622fa978ef2defbfb 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -32,6 +32,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" ) @@ -60,7 +61,9 @@ func TestHTTPClient_Call(t *testing.T) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) client := NewHTTPClient(&HTTPOptions{}) diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go index bc88759bf522a35a30e8585429f1db614c3a15ce..8c910339858f4960ad0e394ae6271863d7654adc 100644 --- a/protocol/jsonrpc/jsonrpc_invoker_test.go +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -47,7 +48,9 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) client := NewHTTPClient(&HTTPOptions{ diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 6b3a39c68b4fdb417e8d2efaec4a43806acb2219..dc85e0f5e76fd07dbcd11646ae529c98e5323a15 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "net" "net/http" - "reflect" "runtime" "runtime/debug" "sync" @@ -330,13 +329,16 @@ func serveRequest(ctx context.Context, constant.VERSION_KEY: codec.req.Version, })) if err := result.Error(); err != nil { - if errRsp := sendErrorResp(header, []byte(err.Error())); errRsp != nil { + rspStream, err := codec.Write(err.Error(), invalidRequest) + if err != nil { + return perrors.WithStack(err) + } + if errRsp := sendErrorResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s", header, err, errRsp) - return perrors.WithStack(errRsp) } - } - if res := result.Result(); res != nil { + } else { + res := result.Result() rspStream, err := codec.Write("", res) if err != nil { return perrors.WithStack(err) @@ -344,102 +346,9 @@ func serveRequest(ctx context.Context, if errRsp := sendResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s", header, err, errRsp) - return perrors.WithStack(errRsp) - } - } - } - // get method - svc := common.ServiceMap.GetService(JSONRPC, path) - if svc == nil { - return perrors.New("cannot find svc " + path) - } - method := svc.Method()[methodName] - if method == nil { - return perrors.New("cannot find method " + methodName + " of svc " + path) - } - - in := []reflect.Value{svc.Rcvr()} - if method.CtxType() != nil { - in = append(in, method.SuiteContext(ctx)) - } - - // prepare argv - if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { - in = append(in, reflect.ValueOf(args)) - } else { - for i := 0; i < len(args); i++ { - t := reflect.ValueOf(args[i]) - if !t.IsValid() { - at := method.ArgsType()[i] - if at.Kind() == reflect.Ptr { - at = at.Elem() - } - t = reflect.New(at) } - in = append(in, t) } } - // prepare replyv - var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } - - returnValues := method.Method().Func.Call(in) - - var ( - retErr interface{} - errMsg string - ) - if len(returnValues) == 1 { - retErr = returnValues[0].Interface() - } else { - replyv = returnValues[0] - retErr = returnValues[1].Interface() - } - if retErr != nil { - errMsg = retErr.(error).Error() - } - - // write response - code := 200 - var rspReply interface{} - if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { - rspReply = replyv.Interface() - } - if len(errMsg) != 0 { - code = 500 - rspReply = invalidRequest - } - rspStream, err := codec.Write(errMsg, rspReply) - if err != nil { - return perrors.WithStack(err) - } - rsp := &http.Response{ - StatusCode: code, - ProtoMajor: 1, - ProtoMinor: 1, - Header: make(http.Header), - ContentLength: int64(len(rspStream)), - Body: ioutil.NopCloser(bytes.NewReader(rspStream)), - } - delete(header, "Content-Type") - delete(header, "Content-Length") - delete(header, "Timeout") - for k, v := range header { - rsp.Header.Set(k, v) - } - - rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) - rspBuf.Reset() - if err = rsp.Write(rspBuf); err != nil { - logger.Warnf("rsp.Write(rsp:%#v) = error:%s", rsp, err) - return nil - } - if _, err = rspBuf.WriteTo(conn); err != nil { - logger.Warnf("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err) - } return nil }