diff --git a/cluster/cluster.go b/cluster/cluster.go index 6a48bfd08fcb0d0d3c28620c09854fa1c993409c..bb9f89d0fc54352672995de3442c78bf77dbb2e8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,4 +5,3 @@ import "github.com/dubbo/dubbo-go/protocol" type Cluster interface { Join(Directory) protocol.Invoker } - diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go index 585f5c4125831d96cdf8a147b0e386e4ac7a5fd9..71161e53cada724d2e15adddbc81537a4e26a56a 100644 --- a/cluster/support/failover_cluster.go +++ b/cluster/support/failover_cluster.go @@ -22,5 +22,5 @@ func NewFailoverCluster() cluster.Cluster { } func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { - return NewFailoverClusterInvoker( directory) + return NewFailoverClusterInvoker(directory) } diff --git a/common/constant/key.go b/common/constant/key.go index 30f3f304b96e28341147d5927c777dc3f51dff80..63a881423cd92625793a199aad845b426986f557 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -27,3 +27,7 @@ const ( WARMUP_KEY = "warmup" RETRIES_KEY = "retries" ) + +const ( + DUBBOGO_CTX_KEY = "dubbogo-ctx" +) diff --git a/common/proxy/.gitkeep b/common/proxy/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..e1133b3fc673ed4e4f2e1c6af86ecb6e6e839ed9 --- /dev/null +++ b/common/proxy/proxy.go @@ -0,0 +1,108 @@ +package proxy + +import ( + "fmt" + "reflect" +) +import ( + log "github.com/AlexStocks/log4go" +) + +import ( + "github.com/dubbo/dubbo-go/protocol" +) + +// Proxy struct +type Proxy struct { + v interface{} + invoke protocol.Invoker + callBack interface{} + attachments map[string]string +} + +var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() + +func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy { + return &Proxy{ + invoke: invoke, + callBack: callBack, + attachments: attachments, + } +} + +// proxy implement +func (p *Proxy) Implement(v interface{}) error { + + // check parameters, incoming interface must be a elem's pointer. + valueOf := reflect.ValueOf(v) + log.Debug("[Implement] reflect.TypeOf: %s", valueOf.String()) + if valueOf.Kind() != reflect.Ptr { + return fmt.Errorf("%s must be a pointer", valueOf) + } + + valueOfElem := valueOf.Elem() + typeOf := valueOfElem.Type() + + // check incoming interface, incoming interface's elem must be a struct. + if typeOf.Kind() != reflect.Struct { + return fmt.Errorf("%s must be a struct ptr", valueOf.String()) + } + + makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { + return func(in []reflect.Value) []reflect.Value { + // Convert input parameters to interface. + var argsInterface = make([]interface{}, len(in)) + for k, v := range in { + argsInterface[k] = v.Interface() + } + + //todo:call + inv := protocol.NewRPCInvocationForConsumer(methodName, nil, argsInterface, in[2].Interface(), p.callBack, p.attachments, nil) + result := p.invoke.Invoke(inv) + var err error + err = result.Error() + return []reflect.Value{reflect.ValueOf(&err).Elem()} + } + } + + numField := valueOfElem.NumField() + for i := 0; i < numField; i++ { + t := typeOf.Field(i) + f := valueOfElem.Field(i) + if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { + + if t.Type.NumIn() != 3 && t.Type.NumIn() != 4 { + log.Error("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4", + t.Name, t.Type.String(), t.Type.NumIn()) + return fmt.Errorf("method %s of mtype %v has wrong number of in parameters %d; needs exactly 3/4", + t.Name, t.Type.String(), t.Type.NumIn()) + } + + // Method needs one out. + if t.Type.NumOut() != 1 { + log.Error("method %q has %d out parameters; needs exactly 1", t.Name, t.Type.NumOut()) + return fmt.Errorf("method %q has %d out parameters; needs exactly 1", t.Name, t.Type.NumOut()) + } + // The return type of the method must be error. + if returnType := t.Type.Out(0); returnType != typError { + log.Error("return type %s of method %q is not error", returnType, t.Name) + return fmt.Errorf("return type %s of method %q is not error", returnType, t.Name) + } + + var funcOuts = make([]reflect.Type, t.Type.NumOut()) + funcOuts[i] = t.Type.Out(0) + + // do method proxy here: + f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(t.Name, funcOuts))) + log.Debug("set method [%s]", t.Name) + } + } + + p.v = v + + return nil +} + +func (p *Proxy) Get() interface{} { + return p.v +} diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 2155af0ad377ce6f50593b27c786d552dfe4fd57..9d737877cf060a2501eb0f59481b662531fef2d4 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -4,7 +4,6 @@ import ( "errors" "strconv" "sync" - "time" ) import ( @@ -49,27 +48,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } if async { if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { - result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply(), - WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), - WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), - WithCallSerialID(inv.Params()["serialID"].(SerialID)), - WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + result.Err = di.client.AsyncCall(url.Location, *url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply()) } else { - result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(), inv.Arguments(), - WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), - WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), - WithCallSerialID(inv.Params()["serialID"].(SerialID)), - WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + result.Err = di.client.CallOneway(url.Location, *url, inv.MethodName(), inv.Arguments()) } } else { if inv.Reply() == nil { result.Err = Err_No_Reply } else { - result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Arguments(), inv.Reply(), - WithCallRequestTimeout(inv.Params()["requestTimeout"].(time.Duration)), - WithCallResponseTimeout(inv.Params()["responseTimeout"].(time.Duration)), - WithCallSerialID(inv.Params()["serialID"].(SerialID)), - WithCallMeta_All(inv.Params()["callMeta"].(map[interface{}]interface{}))) + result.Err = di.client.Call(url.Location, *url, inv.MethodName(), inv.Arguments(), inv.Reply()) result.Rest = inv.Reply() // reply should be set to result.Rest when sync } } diff --git a/protocol/invocation.go b/protocol/invocation.go index 5e079f3dc2aa54e095f0c3ebb5e3416e7fe07f8b..3953d3b07e41eb05e60ca32df49069d0a2ba281e 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -26,12 +26,11 @@ type RPCInvocation struct { callBack interface{} attachments map[string]string invoker Invoker - params map[string]interface{} // Store some parameters that are not easy to refine } // todo: arguments table is too many func NewRPCInvocationForConsumer(methodName string, parameterTypes []reflect.Type, arguments []interface{}, - reply interface{}, callBack interface{}, attachments map[string]string, invoker Invoker, params map[string]interface{}) *RPCInvocation { + reply interface{}, callBack interface{}, attachments map[string]string, invoker Invoker) *RPCInvocation { return &RPCInvocation{ methodName: methodName, parameterTypes: parameterTypes, @@ -40,7 +39,6 @@ func NewRPCInvocationForConsumer(methodName string, parameterTypes []reflect.Typ callBack: callBack, attachments: attachments, invoker: invoker, - params: params, } } @@ -71,6 +69,9 @@ func (r *RPCInvocation) Attachments() map[string]string { } func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string { + if r.attachments == nil { + return defaultValue + } value, ok := r.attachments[key] if ok { return value @@ -81,14 +82,12 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string func (r *RPCInvocation) Invoker() Invoker { return r.invoker } + +// SetInvoker is called while getting url, maybe clusterInvoker? func (r *RPCInvocation) SetInvoker() Invoker { return r.invoker } -func (r *RPCInvocation) Params() map[string]interface{} { - return r.params -} - func (r *RPCInvocation) CallBack() interface{} { return r.callBack } diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 256616eb89e8f437f2230f773e8f7f73861b43f7..4a898a6049a5a86ef1bf1263c1c61d3d34f81852 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -20,8 +20,8 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/common/constant" "github.com/dubbo/dubbo-go/config" - "github.com/dubbo/dubbo-go/public" ) ////////////////////////////////////////////// @@ -104,7 +104,7 @@ func (c *HTTPClient) Call(ctx context.Context, service config.URL, req *Request, reqTimeout = 1e8 } httpHeader.Set("Timeout", reqTimeout.String()) - if md, ok := ctx.Value(public.DUBBOGO_CTX_KEY).(map[string]string); ok { + if md, ok := ctx.Value(constant.DUBBOGO_CTX_KEY).(map[string]string); ok { for k := range md { httpHeader.Set(k, md[k]) } diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index 9667c46f40d2d72961cf083a6ba9057cc4cade92..38b929c02fae7991e2bdc7fd77060089bc0a0c89 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -10,9 +10,9 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/common/constant" "github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/protocol" - "github.com/dubbo/dubbo-go/public" ) type JsonrpcInvoker struct { @@ -37,7 +37,7 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result url := inv.Invoker().GetUrl().(*config.URL) req := ji.client.NewRequest(*url, inv.MethodName(), inv.Arguments()) - ctx := context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ + ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", "X-Services": url.Service, "X-Method": inv.MethodName(), diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index 8fe385bae3adbbc65ca485a140a1897495957fe3..cb6cd5258ce4eb5c5af26d1d4681b3d142577dc1 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -7,6 +7,7 @@ import ( import ( "github.com/dubbo/dubbo-go/common/extension" "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/config/support" "github.com/dubbo/dubbo-go/protocol" ) @@ -45,8 +46,8 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { func (jp *JsonrpcProtocol) Refer(url config.IURL) protocol.Invoker { invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ - HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, - HTTPTimeout: config.GetConsumerConfig().RequestTimeout, + HandshakeTimeout: support.GetConsumerConfig().ConnectTimeout, + HTTPTimeout: support.GetConsumerConfig().RequestTimeout, })) jp.SetInvokers(invoker) log.Info("Refer service: ", url.(*config.URL).String()) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 6aa3a1ca8e1a0fa1fb14158b87fb94ff27cc4c82..c7d9997bf8f49c6e483e7fca2c21f3d348d17724 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -64,7 +64,7 @@ type ZkRegistry struct { zkPath map[string]int // key = protocol://ip:port/interface } -func NewZkRegistry( url *config.RegistryURL) (registry.Registry, error) { +func NewZkRegistry(url *config.RegistryURL) (registry.Registry, error) { var ( err error r *ZkRegistry