Skip to content
Snippets Groups Projects
Commit da11226d authored by fangyincheng's avatar fangyincheng
Browse files

Mod: move callService to invoker

parent 4379d392
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,13 @@
package proxy_factory
import (
"github.com/apache/dubbo-go/common/logger"
perrors "github.com/pkg/errors"
"reflect"
"strings"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -51,6 +58,83 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm
return proxy.NewProxy(invoker, nil, attachments)
}
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
// todo: call service
return protocol.NewBaseInvoker(url)
return &ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
}
}
type ProxyInvoker struct {
protocol.BaseInvoker
}
func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
result := &protocol.RPCResult{}
methodName := invocation.MethodName()
proto := pi.GetUrl().Protocol
path := strings.TrimPrefix(pi.GetUrl().Path, "/")
args := invocation.Arguments()
// get service
svc := common.ServiceMap.GetService(proto, path)
if svc == nil {
logger.Errorf("cannot find service [%s] in %s", path, proto)
result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto))
return result
}
// get method
method := svc.Method()[methodName]
if method == nil {
logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)
result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto))
return result
}
in := []reflect.Value{svc.Rcvr()}
if method.CtxType() != nil {
in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later.
}
// prepare argv
if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" {
in = append(in, reflect.ValueOf(args))
} else {
for i := 0; i < len(args); i++ {
t := reflect.ValueOf(args[i])
if !t.IsValid() {
at := method.ArgsType()[i]
if at.Kind() == reflect.Ptr {
at = at.Elem()
}
t = reflect.New(at)
}
in = append(in, t)
}
}
// prepare replyv
var replyv reflect.Value
if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
in = append(in, replyv)
}
returnValues := method.Method().Func.Call(in)
var retErr interface{}
if len(returnValues) == 1 {
retErr = returnValues[0].Interface()
} else {
replyv = returnValues[0]
retErr = returnValues[1].Interface()
}
if retErr != nil {
result.SetError(retErr.(error))
} else {
if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
result.SetResult(replyv.Interface())
}
}
return result
}
......@@ -2,6 +2,7 @@ module github.com/apache/dubbo-go
require (
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/magiconair/properties v1.8.1
......
......@@ -33,6 +33,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
)
......@@ -70,8 +71,13 @@ func TestClient_Call(t *testing.T) {
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
user := &User{}
err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user)
var (
user *User
err error
)
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user)
assert.NoError(t, err)
assert.NotEqual(t, "", user.Id)
assert.NotEqual(t, "", user.Name)
......@@ -209,7 +215,9 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
proto.Export(protocol.NewBaseInvoker(url))
proto.Export(&proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
})
time.Sleep(time.Second * 2)
......
......@@ -18,10 +18,8 @@
package dubbo
import (
"context"
"fmt"
"net/url"
"reflect"
"sync"
"time"
)
......@@ -208,6 +206,27 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
twoway = false
}
defer func() {
if e := recover(); e != nil {
p.Header.ResponseStatus = hessian.Response_SERVER_ERROR
if err, ok := e.(error); ok {
logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err))
p.Body = perrors.WithStack(err)
} else if err, ok := e.(string); ok {
logger.Errorf("OnMessage panic: %+v", perrors.New(err))
p.Body = perrors.New(err)
} else {
logger.Errorf("OnMessage panic: %+v, this is impossible.", e)
p.Body = e
}
}
if !twoway {
return
}
h.reply(session, p, hessian.PackageResponse)
}()
u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}),
common.WithParamsValue(constant.GROUP_KEY, p.Service.Group),
common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface),
......@@ -232,18 +251,13 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_OK
p.Body = err
h.reply(session, p, hessian.PackageResponse)
return
}
if res := result.Result(); res != nil {
} else {
res := result.Result()
p.Header.ResponseStatus = hessian.Response_OK
p.Body = res
h.reply(session, p, hessian.PackageResponse)
return
}
}
h.callService(p, nil)
if !twoway {
return
}
......@@ -275,91 +289,6 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) {
defer func() {
if e := recover(); e != nil {
req.Header.ResponseStatus = hessian.Response_SERVER_ERROR
if err, ok := e.(error); ok {
logger.Errorf("callService panic: %+v", perrors.WithStack(err))
req.Body = perrors.WithStack(err)
} else if err, ok := e.(string); ok {
logger.Errorf("callService panic: %+v", perrors.New(err))
req.Body = perrors.New(err)
} else {
logger.Errorf("callService panic: %+v, this is impossible.", e)
req.Body = e
}
}
}()
svcIf := req.Body.(map[string]interface{})["service"]
if svcIf == nil {
logger.Errorf("service not found!")
req.Header.ResponseStatus = hessian.Response_BAD_REQUEST
req.Body = perrors.New("service not found")
return
}
svc := svcIf.(*common.Service)
method := svc.Method()[req.Service.Method]
if method == nil {
logger.Errorf("method not found!")
req.Header.ResponseStatus = hessian.Response_BAD_REQUEST
req.Body = perrors.New("method not found")
return
}
in := []reflect.Value{svc.Rcvr()}
if method.CtxType() != nil {
in = append(in, method.SuiteContext(ctx))
}
// prepare argv
argv := req.Body.(map[string]interface{})["args"]
if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" {
in = append(in, reflect.ValueOf(argv))
} else {
for i := 0; i < len(argv.([]interface{})); i++ {
t := reflect.ValueOf(argv.([]interface{})[i])
if !t.IsValid() {
at := method.ArgsType()[i]
if at.Kind() == reflect.Ptr {
at = at.Elem()
}
t = reflect.New(at)
}
in = append(in, t)
}
}
// prepare replyv
var replyv reflect.Value
if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
in = append(in, replyv)
}
returnValues := method.Method().Func.Call(in)
var retErr interface{}
if len(returnValues) == 1 {
retErr = returnValues[0].Interface()
} else {
replyv = returnValues[0]
retErr = returnValues[1].Interface()
}
if retErr != nil {
req.Header.ResponseStatus = hessian.Response_OK
req.Body = retErr
} else {
if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
req.Body = replyv.Interface()
} else {
req.Body = nil
}
}
}
func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
......
......@@ -32,6 +32,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
)
......@@ -60,7 +61,9 @@ func TestHTTPClient_Call(t *testing.T) {
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
proto.Export(protocol.NewBaseInvoker(url))
proto.Export(&proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
})
time.Sleep(time.Second * 2)
client := NewHTTPClient(&HTTPOptions{})
......
......@@ -29,6 +29,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -47,7 +48,9 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) {
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
proto.Export(protocol.NewBaseInvoker(url))
proto.Export(&proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
})
time.Sleep(time.Second * 2)
client := NewHTTPClient(&HTTPOptions{
......
......@@ -25,7 +25,6 @@ import (
"io/ioutil"
"net"
"net/http"
"reflect"
"runtime"
"runtime/debug"
"sync"
......@@ -330,13 +329,16 @@ func serveRequest(ctx context.Context,
constant.VERSION_KEY: codec.req.Version,
}))
if err := result.Error(); err != nil {
if errRsp := sendErrorResp(header, []byte(err.Error())); errRsp != nil {
rspStream, err := codec.Write(err.Error(), invalidRequest)
if err != nil {
return perrors.WithStack(err)
}
if errRsp := sendErrorResp(header, rspStream); errRsp != nil {
logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s",
header, err, errRsp)
return perrors.WithStack(errRsp)
}
}
if res := result.Result(); res != nil {
} else {
res := result.Result()
rspStream, err := codec.Write("", res)
if err != nil {
return perrors.WithStack(err)
......@@ -344,102 +346,9 @@ func serveRequest(ctx context.Context,
if errRsp := sendResp(header, rspStream); errRsp != nil {
logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s",
header, err, errRsp)
return perrors.WithStack(errRsp)
}
}
}
// get method
svc := common.ServiceMap.GetService(JSONRPC, path)
if svc == nil {
return perrors.New("cannot find svc " + path)
}
method := svc.Method()[methodName]
if method == nil {
return perrors.New("cannot find method " + methodName + " of svc " + path)
}
in := []reflect.Value{svc.Rcvr()}
if method.CtxType() != nil {
in = append(in, method.SuiteContext(ctx))
}
// prepare argv
if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" {
in = append(in, reflect.ValueOf(args))
} else {
for i := 0; i < len(args); i++ {
t := reflect.ValueOf(args[i])
if !t.IsValid() {
at := method.ArgsType()[i]
if at.Kind() == reflect.Ptr {
at = at.Elem()
}
t = reflect.New(at)
}
in = append(in, t)
}
}
// prepare replyv
var replyv reflect.Value
if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
in = append(in, replyv)
}
returnValues := method.Method().Func.Call(in)
var (
retErr interface{}
errMsg string
)
if len(returnValues) == 1 {
retErr = returnValues[0].Interface()
} else {
replyv = returnValues[0]
retErr = returnValues[1].Interface()
}
if retErr != nil {
errMsg = retErr.(error).Error()
}
// write response
code := 200
var rspReply interface{}
if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
rspReply = replyv.Interface()
}
if len(errMsg) != 0 {
code = 500
rspReply = invalidRequest
}
rspStream, err := codec.Write(errMsg, rspReply)
if err != nil {
return perrors.WithStack(err)
}
rsp := &http.Response{
StatusCode: code,
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
ContentLength: int64(len(rspStream)),
Body: ioutil.NopCloser(bytes.NewReader(rspStream)),
}
delete(header, "Content-Type")
delete(header, "Content-Length")
delete(header, "Timeout")
for k, v := range header {
rsp.Header.Set(k, v)
}
rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
rspBuf.Reset()
if err = rsp.Write(rspBuf); err != nil {
logger.Warnf("rsp.Write(rsp:%#v) = error:%s", rsp, err)
return nil
}
if _, err = rspBuf.WriteTo(conn); err != nil {
logger.Warnf("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err)
}
return nil
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment