diff --git a/common/constant/default.go b/common/constant/default.go index 85f61f30f115493d3d520f2a68f36921735055d7..f67f2158fd814f00b52c867eb2be5a15d5e94dec 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -26,7 +26,6 @@ const ( DEFAULT_LOADBALANCE = "random" DEFAULT_RETRIES = 2 DEFAULT_PROTOCOL = "dubbo" - DEFAULT_VERSION = "" DEFAULT_REG_TIMEOUT = "10s" DEFAULT_CLUSTER = "failover" ) @@ -37,3 +36,7 @@ const ( DEFAULT_REFERENCE_FILTERS = "" ECHO = "$echo" ) + +const ( + ANY_VALUE = "*" +) diff --git a/common/constant/key.go b/common/constant/key.go index e49ddae7259bc8fd07351e38964594f2b28d3795..e82a41aa4a686aa55ed48ad859bfc8458ebdc1b0 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -44,6 +44,7 @@ const ( WEIGHT_KEY = "weight" WARMUP_KEY = "warmup" RETRIES_KEY = "retries" + BEAN_NAME = "bean.name" ) const ( diff --git a/common/logger/logger.go b/common/logger/logger.go index f6522c489d047a1d4ec7000565bf4cfda6a50466..e8dd5a608b68c7cbde0d8c6b85c1601c847fea01 100644 --- a/common/logger/logger.go +++ b/common/logger/logger.go @@ -56,7 +56,7 @@ func init() { logConfFile := os.Getenv(constant.APP_LOG_CONF_FILE) err := InitLog(logConfFile) if err != nil { - log.Printf("[InitLog] error: %v", err) + log.Printf("[InitLog] warn: %v", err) } } @@ -110,7 +110,7 @@ func InitLogger(conf *zap.Config) { } else { zapLoggerConfig = *conf } - zapLogger, _ := zapLoggerConfig.Build() + zapLogger, _ := zapLoggerConfig.Build(zap.AddCallerSkip(1)) logger = zapLogger.Sugar() } diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index fe4c003306360af928a0b1a6612042e9811a9af3..96d42eb21152e64d170f50276bbce88e1bf8db69 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -19,6 +19,7 @@ package proxy import ( "reflect" + "sync" ) import ( @@ -34,6 +35,8 @@ type Proxy struct { invoke protocol.Invoker callBack interface{} attachments map[string]string + + once sync.Once } var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() @@ -78,23 +81,31 @@ func (p *Proxy) Implement(v common.RPCService) { methodName = "$echo" } - start := 0 - end := len(in) - if in[0].Type().String() == "context.Context" { - start += 1 - } - if len(outs) == 1 { - end -= 1 - reply = in[len(in)-1] - } else { + if len(outs) == 2 { if outs[0].Kind() == reflect.Ptr { reply = reflect.New(outs[0].Elem()) } else { reply = reflect.New(outs[0]) } + } else { + reply = valueOf + } + + start := 0 + end := len(in) + if end > 0 { + if in[0].Type().String() == "context.Context" { + start += 1 + } + if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr { + end -= 1 + reply = in[len(in)-1] + } } - if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 { + if end-start <= 0 { + inArr = []interface{}{} + } else if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 { inArr = v } else { inArr = make([]interface{}, end-start) @@ -134,7 +145,6 @@ func (p *Proxy) Implement(v common.RPCService) { } f := valueOfElem.Field(i) if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { - inNum := t.Type.NumIn() outNum := t.Type.NumOut() if outNum != 1 && outNum != 2 { @@ -149,12 +159,6 @@ func (p *Proxy) Implement(v common.RPCService) { continue } - // reply must be Ptr when outNum == 1 - if outNum == 1 && t.Type.In(inNum-1).Kind() != reflect.Ptr { - logger.Warnf("reply type of method %q is not a pointer", t.Name) - continue - } - var funcOuts = make([]reflect.Type, outNum) for i := 0; i < outNum; i++ { funcOuts[i] = t.Type.Out(i) @@ -166,7 +170,9 @@ func (p *Proxy) Implement(v common.RPCService) { } } - p.rpc = v + p.once.Do(func() { + p.rpc = v + }) } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 016bbcf514514e3bea67d87c83f01aebb166be23..1665a7346e09016570dd36c56d231d3706b96a54 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -51,6 +51,6 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm return proxy.NewProxy(invoker, nil, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { - //TODO:yincheng need to do the service invoker refactor + // todo: call service return protocol.NewBaseInvoker(url) } diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go index d9c8ffa2525551fc0197e2476e85b5ee51a18ec6..b6a6b675baf992b2d64ffd19291ee2dc009bd1e3 100644 --- a/common/proxy/proxy_factory/default_test.go +++ b/common/proxy/proxy_factory/default_test.go @@ -21,22 +21,25 @@ import ( "testing" ) +import ( + "github.com/stretchr/testify/assert" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" - "github.com/stretchr/testify/assert" ) func Test_GetProxy(t *testing.T) { proxyFactory := NewDefaultProxyFactory() - url := common.NewURLWithOptions("testservice") + url := common.NewURLWithOptions() proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) assert.NotNil(t, proxy) } func Test_GetInvoker(t *testing.T) { proxyFactory := NewDefaultProxyFactory() - url := common.NewURLWithOptions("testservice") + url := common.NewURLWithOptions() invoker := proxyFactory.GetInvoker(*url) assert.True(t, invoker.IsAvailable()) } diff --git a/common/proxy/proxy_test.go b/common/proxy/proxy_test.go index 44bdaec4b845a3a864c8637ced16ac4493fc2b80..1cc30457c3b021ec139b57fe764d6ac6b9104dbc 100644 --- a/common/proxy/proxy_test.go +++ b/common/proxy/proxy_test.go @@ -36,9 +36,10 @@ import ( type TestService struct { MethodOne func(context.Context, int, bool, *interface{}) error - MethodTwo func([]interface{}, *interface{}) error + MethodTwo func([]interface{}) error MethodThree func(int, bool) (interface{}, error) MethodFour func(int, bool) (*interface{}, error) `dubbo:"methodFour"` + MethodFive func() error Echo func(interface{}, *interface{}) error } @@ -64,19 +65,25 @@ func TestProxy_Implement(t *testing.T) { p := NewProxy(invoker, nil, map[string]string{constant.ASYNC_KEY: "false"}) s := &TestService{} p.Implement(s) + err := p.Get().(*TestService).MethodOne(nil, 0, false, nil) assert.NoError(t, err) - err = p.Get().(*TestService).MethodTwo(nil, nil) + + err = p.Get().(*TestService).MethodTwo(nil) assert.NoError(t, err) ret, err := p.Get().(*TestService).MethodThree(0, false) assert.NoError(t, err) assert.Nil(t, ret) // ret is nil, because it doesn't be injection yet + ret2, err := p.Get().(*TestService).MethodFour(0, false) assert.NoError(t, err) assert.Equal(t, "*interface {}", reflect.TypeOf(ret2).String()) err = p.Get().(*TestService).Echo(nil, nil) assert.NoError(t, err) + err = p.Get().(*TestService).MethodFive() + assert.NoError(t, err) + // inherit & lowercase p.rpc = nil type S1 struct { @@ -108,24 +115,14 @@ func TestProxy_Implement(t *testing.T) { p.Implement(s2) assert.Nil(t, s2.MethodOne) - // reply type + // returns type p.rpc = nil type S3 struct { TestService - MethodOne func(context.Context, []interface{}, struct{}) error + MethodOne func(context.Context, []interface{}, *struct{}) interface{} } s3 := &S3{TestService: *s} p.Implement(s3) assert.Nil(t, s3.MethodOne) - // returns type - p.rpc = nil - type S4 struct { - TestService - MethodOne func(context.Context, []interface{}, *struct{}) interface{} - } - s4 := &S4{TestService: *s} - p.Implement(s4) - assert.Nil(t, s4.MethodOne) - } diff --git a/common/rpc_service.go b/common/rpc_service.go index d8578a143eccb05696e633cc3fd61dd6f00c3bc7..0444f0c17e7e9d96d1563c72fde2fd62b81fb744 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -40,6 +40,12 @@ type RPCService interface { Version() string } +// for lowercase func +// func MethodMapper() map[string][string] { +// return map[string][string]{} +// } +const METHOD_MAPPER = "MethodMapper" + var ( // Precompute the reflect type for error. Can't use error directly // because Typeof takes an empty interface value. This is annoying. @@ -210,20 +216,26 @@ func isExportedOrBuiltinType(t reflect.Type) bool { // suitableMethods returns suitable Rpc methods of typ func suitableMethods(typ reflect.Type) (string, map[string]*MethodType) { methods := make(map[string]*MethodType) - mts := "" + var mts []string logger.Debugf("[%s] NumMethod is %d", typ.String(), typ.NumMethod()) + method, ok := typ.MethodByName(METHOD_MAPPER) + var methodMapper map[string]string + if ok && method.Type.NumIn() == 1 && method.Type.NumOut() == 1 && method.Type.Out(0).String() == "map[string]string" { + methodMapper = method.Func.Call([]reflect.Value{reflect.New(typ.Elem())})[0].Interface().(map[string]string) + } + for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) + method = typ.Method(m) if mt := suiteMethod(method); mt != nil { - methods[method.Name] = mt - if m == 0 { - mts += method.Name - } else { - mts += "," + method.Name + methodName, ok := methodMapper[method.Name] + if !ok { + methodName = method.Name } + methods[methodName] = mt + mts = append(mts, methodName) } } - return mts, methods + return strings.Join(mts, ","), methods } // suiteMethod returns a suitable Rpc methodType @@ -256,12 +268,7 @@ func suiteMethod(method reflect.Method) *MethodType { } // replyType - if outNum == 1 { - if mtype.In(inNum-1).Kind() != reflect.Ptr { - logger.Errorf("reply type of method %q is not a pointer %v", mname, replyType) - return nil - } - } else { + if outNum == 2 { replyType = mtype.Out(0) if !isExportedOrBuiltinType(replyType) { logger.Errorf("reply type of method %s not exported{%v}", mname, replyType) @@ -272,7 +279,7 @@ func suiteMethod(method reflect.Method) *MethodType { index := 1 // ctxType - if mtype.In(1).String() == "context.Context" { + if inNum > 1 && mtype.In(1).String() == "context.Context" { ctxType = mtype.In(1) index = 2 } diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index e5bab2c7153daeb34068c2005def6811b8478532..ec4371da4768298fe0928ba6ef88c2be7060832e 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -30,18 +30,26 @@ import ( type TestService struct { } -func (s *TestService) MethodOne(ctx context.Context, args []interface{}, rsp *struct{}) error { +func (s *TestService) MethodOne(ctx context.Context, arg1, arg2, arg3 interface{}) error { return nil } -func (s *TestService) MethodTwo(args []interface{}) (struct{}, error) { +func (s *TestService) MethodTwo(arg1, arg2, arg3 interface{}) (interface{}, error) { return struct{}{}, nil } +func (s *TestService) MethodThree() error { + return nil +} func (s *TestService) Service() string { return "com.test.Path" } func (s *TestService) Version() string { return "" } +func (s *TestService) MethodMapper() map[string]string { + return map[string]string{ + "MethodTwo": "methodTwo", + } +} type testService struct { } @@ -49,15 +57,12 @@ type testService struct { func (s *testService) Method1(ctx context.Context, args testService, rsp *struct{}) error { return nil } -func (s *testService) Method2(ctx context.Context, args []interface{}, rsp struct{}) error { - return nil -} -func (s *testService) Method3(ctx context.Context, args []interface{}) (testService, error) { +func (s *testService) Method2(ctx context.Context, args []interface{}) (testService, error) { return testService{}, nil } -func (s *testService) Method4(ctx context.Context, args []interface{}, rsp *struct{}) { +func (s *testService) Method3(ctx context.Context, args []interface{}, rsp *struct{}) { } -func (s *testService) Method5(ctx context.Context, args []interface{}, rsp *struct{}) *testService { +func (s *testService) Method4(ctx context.Context, args []interface{}, rsp *struct{}) *testService { return nil } func (s *testService) Service() string { @@ -87,7 +92,7 @@ func TestServiceMap_Register(t *testing.T) { s := &TestService{} methods, err = ServiceMap.Register("testporotocol", s) assert.NoError(t, err) - assert.Equal(t, "MethodOne,MethodTwo", methods) + assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods) // repeat methods, err = ServiceMap.Register("testporotocol", s) @@ -139,10 +144,11 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType := suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService, context.Context, []interface {}, *struct {}) error", method.Type.String()) + assert.Equal(t, "func(*common.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) at := methodType.ArgsType() - assert.Equal(t, "[]interface {}", at[0].String()) - assert.Equal(t, "*struct {}", at[1].String()) + assert.Equal(t, "interface {}", at[0].String()) + assert.Equal(t, "interface {}", at[1].String()) + assert.Equal(t, "interface {}", at[2].String()) ct := methodType.CtxType() assert.Equal(t, "context.Context", ct.String()) rt := methodType.ReplyType() @@ -152,12 +158,25 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService, []interface {}) (struct {}, error)", method.Type.String()) + assert.Equal(t, "func(*common.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) + at = methodType.ArgsType() + assert.Equal(t, "interface {}", at[0].String()) + assert.Equal(t, "interface {}", at[1].String()) + assert.Equal(t, "interface {}", at[2].String()) + assert.Nil(t, methodType.CtxType()) + rt = methodType.ReplyType() + assert.Equal(t, "interface {}", rt.String()) + + method, ok = reflect.TypeOf(s).MethodByName("MethodThree") + assert.True(t, ok) + methodType = suiteMethod(method) + method = methodType.Method() + assert.Equal(t, "func(*common.TestService) error", method.Type.String()) at = methodType.ArgsType() - assert.Equal(t, "[]interface {}", at[0].String()) + assert.Equal(t, 0, len(at)) assert.Nil(t, methodType.CtxType()) rt = methodType.ReplyType() - assert.Equal(t, "struct {}", rt.String()) + assert.Nil(t, rt) // wrong number of in return s1 := &testService{} @@ -172,26 +191,20 @@ func TestSuiteMethod(t *testing.T) { methodType = suiteMethod(method) assert.Nil(t, methodType) - // replyType != Ptr - method, ok = reflect.TypeOf(s1).MethodByName("Method2") - assert.True(t, ok) - methodType = suiteMethod(method) - assert.Nil(t, methodType) - // Reply not exported - method, ok = reflect.TypeOf(s1).MethodByName("Method3") + method, ok = reflect.TypeOf(s1).MethodByName("Method2") assert.True(t, ok) methodType = suiteMethod(method) assert.Nil(t, methodType) // no return - method, ok = reflect.TypeOf(s1).MethodByName("Method4") + method, ok = reflect.TypeOf(s1).MethodByName("Method3") assert.True(t, ok) methodType = suiteMethod(method) assert.Nil(t, methodType) // return value is not error - method, ok = reflect.TypeOf(s1).MethodByName("Method5") + method, ok = reflect.TypeOf(s1).MethodByName("Method4") assert.True(t, ok) methodType = suiteMethod(method) assert.Nil(t, methodType) diff --git a/common/url.go b/common/url.go index 4fb1af767fa7db6e84aefd3e33afddc099ebfcb1..9249dd055acedc67a0494e97571468bd3ed67cb0 100644 --- a/common/url.go +++ b/common/url.go @@ -18,6 +18,7 @@ package common import ( + "bytes" "context" "fmt" "math" @@ -128,16 +129,14 @@ func WithPort(port string) option { } } -//func WithPath(path string) option { -// return func(url *URL) { -// url.Path = path -// } -//} - -func NewURLWithOptions(service string, opts ...option) *URL { - url := &URL{ - Path: "/" + service, +func WithPath(path string) option { + return func(url *URL) { + url.Path = "/" + strings.TrimPrefix(path, "/") } +} + +func NewURLWithOptions(opts ...option) *URL { + url := &URL{} for _, opt := range opts { opt(url) } @@ -205,32 +204,27 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error) return s, nil } -// -//func (c URL) Key() string { -// return fmt.Sprintf( -// "%s://%s:%s@%s:%s/%s", -// c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path) -//} - func (c URL) URLEqual(url URL) bool { c.Ip = "" c.Port = "" url.Ip = "" url.Port = "" - if c.Key() != url.Key() { + cGroup := c.GetParam(constant.GROUP_KEY, "") + urlGroup := url.GetParam(constant.GROUP_KEY, "") + cKey := c.Key() + urlKey := url.Key() + + if cGroup == constant.ANY_VALUE { + cKey = strings.Replace(cKey, "group=*", "group="+urlGroup, 1) + } else if urlGroup == constant.ANY_VALUE { + urlKey = strings.Replace(urlKey, "group=*", "group="+cGroup, 1) + } + if cKey != urlKey { return false } return true } -//func (c SubURL) String() string { -// return fmt.Sprintf( -// "DefaultServiceURL{protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+ -// "Timeout:%s, Version:%s, Group:%s, Params:%+v}", -// c.protocol, c.Location, c.Path, c.Ip, c.Port, -// c.Timeout, c.Version, c.Group, c.Params) -//} - func (c URL) String() string { buildString := fmt.Sprintf( "%s://%s:%s@%s:%s%s?", @@ -241,10 +235,33 @@ func (c URL) String() string { func (c URL) Key() string { buildString := fmt.Sprintf( - "%s://%s:%s@%s:%s/%s?group=%s&version=%s", - c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION)) - + "%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s", + c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, "")) return buildString + //return c.ServiceKey() +} + +func (c URL) ServiceKey() string { + intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) + if intf == "" { + return "" + } + buf := &bytes.Buffer{} + group := c.GetParam(constant.GROUP_KEY, "") + if group != "" { + buf.WriteString(group) + buf.WriteString("/") + } + + buf.WriteString(intf) + + version := c.GetParam(constant.VERSION_KEY, "") + if version != "" && version != "0.0.0" { + buf.WriteString(":") + buf.WriteString(version) + } + + return buf.String() } func (c URL) Context() context.Context { @@ -252,11 +269,11 @@ func (c URL) Context() context.Context { } func (c URL) Service() string { - service := strings.TrimPrefix(c.Path, "/") + service := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if service != "" { return service } else if c.SubURL != nil { - service = strings.TrimPrefix(c.SubURL.Path, "/") + service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if service != "" { //if url.path is "" then return suburl's path, special for registry Url return service } diff --git a/common/url_test.go b/common/url_test.go index 1093516ce5f5ccedcee2ab22898e0124eae0eca2..bae37237f1b7c70a0b5214acdc4252e5382c7ab3 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -24,15 +24,18 @@ import ( ) import ( - "github.com/apache/dubbo-go/common/constant" "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/common/constant" +) + func TestNewURLWithOptions(t *testing.T) { methods := []string{"Methodone,methodtwo"} params := url.Values{} params.Set("key", "value") - u := NewURLWithOptions("com.test.Service", + u := NewURLWithOptions(WithPath("com.test.Service"), WithUsername("username"), WithPassword("password"), WithProtocol("testprotocol"), diff --git a/config/config_loader.go b/config/config_loader.go index 2560b78bd15686d221cdb7f2e0e613954688e20e..e8c3951bcf6f9997731eeb8ef2a909598cba96df 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -33,6 +33,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/version" @@ -203,8 +204,13 @@ func loadProtocol(protocolsIds string, protocols []ProtocolConfig) []ProtocolCon return returnProtocols } +var ( + refConfigs map[string]*ReferenceConfig // record reference config loaded + srvConfigs map[string]*ServiceConfig // record service config loaded +) + // Dubbo Init -func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) { +func Load() (int, int) { var refMap map[string]*ReferenceConfig var srvMap map[string]*ServiceConfig @@ -217,13 +223,13 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) { for index := 0; index < length; index++ { con := &consumerConfig.References[index] rpcService := GetConsumerService(con.InterfaceName) + con.Refer() + refMap[con.InterfaceName] = con if rpcService == nil { logger.Warnf("%s is not exsist!", con.InterfaceName) continue } - con.Refer() con.Implement(rpcService) - refMap[con.InterfaceName] = con } //wait for invoker is available, if wait over default 3s, then panic @@ -278,5 +284,17 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) { } } - return refMap, srvMap + refConfigs = refMap + srvConfigs = srvMap + return len(refMap), len(srvMap) +} + +// get rpc service for consumer +func GetRPCService(name string) common.RPCService { + return refConfigs[name].GetRPCService() +} + +// create rpc service for consumer +func RPCService(service common.RPCService) { + refConfigs[service.Service()].Implement(service) } diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 7c3343fd35a1ce9d3e519d3f45435be775755bdf..9d062ce2fc5a043ceb07f9175ba5f62eba055abd 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -59,17 +59,25 @@ func TestLoad(t *testing.T) { doInit() doinit() - SetConsumerService(&MockService{}) - SetProviderService(&MockService{}) + ms := &MockService{} + SetConsumerService(ms) + SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"} - refConfigs, svcConfigs := Load() - assert.NotEqual(t, 0, len(refConfigs)) - assert.NotEqual(t, 0, len(svcConfigs)) + refLen, svcLen := Load() + assert.NotEqual(t, 0, refLen) + assert.NotEqual(t, 0, svcLen) + + assert.Equal(t, ms, GetRPCService(ms.Service())) + ms2 := &struct { + MockService + }{} + RPCService(ms2) + assert.NotEqual(t, ms2, GetRPCService(ms2.Service())) conServices = map[string]common.RPCService{} proServices = map[string]common.RPCService{} diff --git a/config/reference_config.go b/config/reference_config.go index e0c93b3e54248e886982dbf49e13a42a063eee77..ffac5b4267b94b717da31980c13f8c63a28b7a66 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -77,7 +77,7 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro } func (refconfig *ReferenceConfig) Refer() { - url := common.NewURLWithOptions(refconfig.InterfaceName, common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap())) + url := common.NewURLWithOptions(common.WithPath(refconfig.InterfaceName), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap())) //1. user specified URL, could be peer-to-peer address, or register center's address. if refconfig.Url != "" { diff --git a/config/service_config.go b/config/service_config.go index 958737ca2d77045f23e33880d2c5767f8c6f4f2c..e225c954231ca647bfe18167c41b005fb2054e3d 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -104,7 +104,7 @@ func (srvconfig *ServiceConfig) Export() error { //if contextPath == "" { // contextPath = providerConfig.Path //} - url := common.NewURLWithOptions(srvconfig.InterfaceName, + url := common.NewURLWithOptions(common.WithPath(srvconfig.InterfaceName), common.WithProtocol(proto.Name), common.WithIp(proto.Ip), common.WithPort(proto.Port), diff --git a/examples/README.md b/examples/README.md index c71e843e060a198f23b747fa80adee81554bcc0d..497926f1d3822f7b7d33640fa18cbc2bd65bdbb9 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,19 +19,19 @@ sh build.sh ``` go server + +* sh ./assembly/\[os]/\[environment].sh ```bash cd dubbo/go-server -#linux, mac windows represent the os -#release, dev and test represent the environment -sh ./assembly/linux/release.sh +# $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] +sh ./assembly/$ARCH/$ENV.sh ``` go client ```bash cd dubbo/go-client -#linux, mac windows represent the os -#release, dev and test represent the environment -sh ./assembly/linux/release.sh +# $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] +sh ./assembly/$ARCH/$ENV.sh ``` #### Run by these command: @@ -53,8 +53,6 @@ sh ./bin/server.sh start ``` go server -> It must not listen on IP 127.0.0.1 when called by java-client. -> You should change IP in dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release/conf/server.yml ```bash cd dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release #conf suffix appoint config file, @@ -66,10 +64,10 @@ sh ./bin/load.sh start [conf suffix] go client ```bash cd dubbo/go-client/target/linux/user_info_client-0.3.1-20190517-0921-release -#conf suffix appoint config file, -#such as client_zookeeper.yml when "sh ./bin/load.sh start is zookeeper", -#default client.yml -sh ./bin/load_user_info_client.sh start [conf suffix] +# $SUFFIX is a suffix of config file, +# such as client_zookeeper.yml when $SUFFIX = zookeeper", +# if $SUFFIX = "", config file is client.yml +sh ./bin/load_user_info_client.sh start $SUFFIX ``` ## jsonrpc diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 25b3263ac5c916df585031438b9c489b83cefb3c..8fea06a05b18012a0aa00ce3f0d227d8bfac97e5 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -57,13 +57,13 @@ func main() { hessian.RegisterJavaEnum(Gender(WOMAN)) hessian.RegisterPOJO(&User{}) - conMap, _ := config.Load() - if conMap == nil { + conLen, _ := config.Load() + if conLen == 0 { panic("conMap is nil") } println("\n\n\necho") - res, err := conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).Echo(context.TODO(), "OK") + res, err := userProvider.Echo(context.TODO(), "OK") if err != nil { panic(err) } @@ -73,44 +73,51 @@ func main() { println("\n\n\nstart to test dubbo") user := &User{} - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser(context.TODO(), []interface{}{"A003"}, user) + err = userProvider.GetUser(context.TODO(), []interface{}{"A003"}, user) if err != nil { panic(err) } println("response result: %v", user) println("\n\n\nstart to test dubbo - GetUser0") - ret, err := conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser0("A003", "Moorse") + ret, err := userProvider.GetUser0("A003", "Moorse") if err != nil { panic(err) } println("response result: %v", ret) println("\n\n\nstart to test dubbo - GetUsers") - ret1, err := conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) + ret1, err := userProvider.GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) if err != nil { panic(err) } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser2") + println("\n\n\nstart to test dubbo - getUser") user = &User{} - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser2(context.TODO(), []interface{}{1}, user) + var i int32 = 1 + err = userProvider.GetUser2(context.TODO(), []interface{}{i}, user) if err != nil { - println("getUser - error: %v", err) - } else { - println("response result: %v", user) + panic(err) + } + println("response result: %v", user) + + println("\n\n\nstart to test dubbo - GetUser3") + err = userProvider.GetUser3() + if err != nil { + panic(err) } + println("succ!") println("\n\n\nstart to test dubbo - getErr") user = &User{} - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetErr(context.TODO(), []interface{}{"A003"}, user) + err = userProvider.GetErr(context.TODO(), []interface{}{"A003"}, user) if err != nil { println("getErr - error: %v", err) } println("\n\n\nstart to test dubbo illegal method") - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser1(context.TODO(), []interface{}{"A003"}, user) + err = userProvider.GetUser1(context.TODO(), []interface{}{"A003"}, user) if err != nil { panic(err) } diff --git a/examples/dubbo/go-client/app/user.go b/examples/dubbo/go-client/app/user.go index 59e105109ecb7213a071ca9071737e14fff0869e..d491c3633384ad9ee6acdb2786d383e420f26db3 100644 --- a/examples/dubbo/go-client/app/user.go +++ b/examples/dubbo/go-client/app/user.go @@ -34,8 +34,10 @@ import ( type Gender hessian.JavaEnum +var userProvider = new(UserProvider) + func init() { - config.SetConsumerService(new(UserProvider)) + config.SetConsumerService(userProvider) } const ( @@ -101,7 +103,8 @@ type UserProvider struct { GetUser func(ctx context.Context, req []interface{}, rsp *User) error GetUser0 func(id string, name string) (User, error) GetUser1 func(ctx context.Context, req []interface{}, rsp *User) error - GetUser2 func(ctx context.Context, req []interface{}, rsp *User) error `dubbo:"getUser"` + GetUser2 func(ctx context.Context, req []interface{}, rsp *User) error `dubbo:"getUser"` + GetUser3 func() error Echo func(ctx context.Context, req interface{}) (interface{}, error) // Echo represent EchoFilter will be used } diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index 3fcbd2cb15cd4217a8900a148753b8a7337ac553..2eae902e50b757886cf9a377291d017f4384978d 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -32,9 +32,11 @@ registries : references: - registries : - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "dubbo" +# version: "2.0" +# group: "as" # url: "dubbo://127.0.0.1:20000" interface : "com.ikurento.user.UserProvider" cluster: "failover" diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index 998f232fcac87da0b793ca2c062cf6cf6de8f6f3..d30a9907f460e5bd61d0f4f2df1c1e6f576030d1 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -32,9 +32,11 @@ registries : references: - registries : - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "dubbo" +# version: "2.0" +# group: "as" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : diff --git a/examples/dubbo/go-client/profiles/test/client.yml b/examples/dubbo/go-client/profiles/test/client.yml index 998f232fcac87da0b793ca2c062cf6cf6de8f6f3..d30a9907f460e5bd61d0f4f2df1c1e6f576030d1 100644 --- a/examples/dubbo/go-client/profiles/test/client.yml +++ b/examples/dubbo/go-client/profiles/test/client.yml @@ -32,9 +32,11 @@ registries : references: - registries : - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "dubbo" +# version: "2.0" +# group: "as" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : diff --git a/examples/dubbo/go-server/app/server.go b/examples/dubbo/go-server/app/server.go index a5c89be7a6b36ddaaae8b515729aaccd203c19f1..2a032a624ca0fd636c587a32f0a604da6f7764b6 100644 --- a/examples/dubbo/go-server/app/server.go +++ b/examples/dubbo/go-server/app/server.go @@ -58,8 +58,8 @@ func main() { hessian.RegisterPOJO(&User{}) // ------------ - _, proMap := config.Load() - if proMap == nil { + _, proLen := config.Load() + if proLen == 0 { panic("proMap is nil") } diff --git a/examples/dubbo/go-server/app/user.go b/examples/dubbo/go-server/app/user.go index e4400cc270ad46c84085b52e8879bbd49969bcb0..fcd9ea7b8677add705127b817799bcb4beb6dabb 100644 --- a/examples/dubbo/go-server/app/user.go +++ b/examples/dubbo/go-server/app/user.go @@ -25,12 +25,12 @@ import ( ) import ( + "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/config" - hessian "github.com/dubbogo/hessian2" ) type Gender hessian.JavaEnum @@ -145,10 +145,6 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User return err } -func (u *UserProvider) GetErr(ctx context.Context, req []interface{}, rsp *User) error { - return hessian.NewThrowable("exception") -} - func (u *UserProvider) GetUser0(id string, name string) (User, error) { var err error @@ -163,6 +159,22 @@ func (u *UserProvider) GetUser0(id string, name string) (User, error) { return *user, err } +func (u *UserProvider) GetUser2(ctx context.Context, req []interface{}, rsp *User) error { + var err error + + println("req:%#v", req) + rsp.Id = strconv.Itoa(int(req[0].(int32))) + return err +} + +func (u *UserProvider) GetUser3() error { + return nil +} + +func (u *UserProvider) GetErr(ctx context.Context, req []interface{}, rsp *User) error { + return hessian.NewThrowable("exception") +} + func (u *UserProvider) GetUsers(req []interface{}) ([]interface{}, error) { var err error @@ -182,6 +194,12 @@ func (u *UserProvider) GetUsers(req []interface{}) ([]interface{}, error) { return []interface{}{user, user1}, err } +func (s *UserProvider) MethodMapper() map[string]string { + return map[string]string{ + "GetUser2": "getUser", + } +} + func (u *UserProvider) Service() string { return "com.ikurento.user.UserProvider" } diff --git a/examples/dubbo/go-server/profiles/dev/server.yml b/examples/dubbo/go-server/profiles/dev/server.yml index 619f2ddca6a0164e222f4a08669beb79dc077914..521f916faaaf41cc14fcc5e1eaf45447a788eb14 100644 --- a/examples/dubbo/go-server/profiles/dev/server.yml +++ b/examples/dubbo/go-server/profiles/dev/server.yml @@ -28,7 +28,7 @@ registries : services: - registries: - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "dubbo" # 相当于dubbo.xml中的interface interface : "com.ikurento.user.UserProvider" diff --git a/examples/dubbo/go-server/profiles/release/server.yml b/examples/dubbo/go-server/profiles/release/server.yml index c4d98eb746d7c5418dce6d0dca2fe89da137d680..5431df1172f4230a112a70b9410258a46dafda9c 100644 --- a/examples/dubbo/go-server/profiles/release/server.yml +++ b/examples/dubbo/go-server/profiles/release/server.yml @@ -28,7 +28,7 @@ registries : services: - registries: - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "dubbo" # 相当于dubbo.xml中的interface interface : "com.ikurento.user.UserProvider" diff --git a/examples/dubbo/go-server/profiles/test/server.yml b/examples/dubbo/go-server/profiles/test/server.yml index c4d98eb746d7c5418dce6d0dca2fe89da137d680..5431df1172f4230a112a70b9410258a46dafda9c 100644 --- a/examples/dubbo/go-server/profiles/test/server.yml +++ b/examples/dubbo/go-server/profiles/test/server.yml @@ -28,7 +28,7 @@ registries : services: - registries: - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "dubbo" # 相当于dubbo.xml中的interface interface : "com.ikurento.user.UserProvider" diff --git a/examples/dubbo/java-client/src/main/java/com/ikurento/user/Consumer.java b/examples/dubbo/java-client/src/main/java/com/ikurento/user/Consumer.java index f1100e79ac2ee31ea13b98af345d5472eb08488e..edf4c0d2b20a08c17241132cd03bf16a51b2fbb8 100644 --- a/examples/dubbo/java-client/src/main/java/com/ikurento/user/Consumer.java +++ b/examples/dubbo/java-client/src/main/java/com/ikurento/user/Consumer.java @@ -51,6 +51,12 @@ public class Consumer { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + " UserInfo, Id:" + user2.getId() + ", name:" + user2.getName() + ", sex:" + user2.getSex().toString() + ", age:" + user2.getAge() + ", time:" + user2.getTime().toString()); + User user3 = userProvider.getUser(1); + System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + + " UserInfo, Id:" + user3.getId() + ", name:" + user3.getName() + ", sex:" + user3.getSex().toString() + + ", age:" + user3.getAge() + ", time:" + user3.getTime().toString()); + userProvider.GetUser3(); + System.out.println("GetUser3 succ"); User user9 = userProvider.GetUser1("A003"); } catch (Exception e) { diff --git a/examples/dubbo/java-client/src/main/java/com/ikurento/user/UserProvider.java b/examples/dubbo/java-client/src/main/java/com/ikurento/user/UserProvider.java index 036f46332105710ffcc53465b487ada860c25172..ed765d4c0c6b6b9b382666b29d90c9dd25206f8d 100644 --- a/examples/dubbo/java-client/src/main/java/com/ikurento/user/UserProvider.java +++ b/examples/dubbo/java-client/src/main/java/com/ikurento/user/UserProvider.java @@ -22,7 +22,8 @@ public interface UserProvider { User GetUser(String userId); User GetErr(String userId) throws Exception; User GetUser1(String userId); - + User getUser(int usercode); + void GetUser3(); List<User> GetUsers(List<String> userIdList); User GetUser0(String userId, String name); } diff --git a/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProvider.java b/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProvider.java index bb1d9a9d42444b1782d919c6a62ac2fc60e509b1..24567d23be662312a4750f07b605c685a8dfa5fe 100644 --- a/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProvider.java +++ b/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProvider.java @@ -12,6 +12,8 @@ public interface UserProvider { List<User> GetUsers(List<String> userIdList); + void GetUser3(); + User GetUser0(String userId, String name); User GetErr(String userId) throws Exception; diff --git a/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java b/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java index 9eaf989a8204be5a7b0065b23dfec88eff837854..d600545c5084a40f1318e47a6a1c20bfcd6d36bc 100644 --- a/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java +++ b/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java @@ -35,6 +35,8 @@ public class UserProviderAnotherImpl implements UserProvider { public User GetUser0(String userId, String name) { return new User(userId, name, 48); } + public void GetUser3() { + } public User GetErr(String userId) throws Exception { throw new Exception("exception"); } diff --git a/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java b/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java index f8875651096ef719182eb0eeffb779de16425702..47a4e2d9732aa8d8d9279d47af5bb4fb3db37195 100644 --- a/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java +++ b/examples/dubbo/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java @@ -56,6 +56,9 @@ public class UserProviderImpl implements UserProvider { return userList; } + public void GetUser3() { + } + public Map<String, User> GetUserMap(List<String> userIdList) { Iterator it = userIdList.iterator(); Map<String, User> map = new HashMap<String, User>(); diff --git a/examples/dubbo/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml b/examples/dubbo/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml index b3a1b19d6764ca6db895719709412c07b348f13e..f8dd13a833e6095485d0504e21cec272a3c9a288 100644 --- a/examples/dubbo/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml +++ b/examples/dubbo/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml @@ -30,9 +30,9 @@ <dubbo:protocol id="dubbo" name="dubbo" host="127.0.0.1" port="20010" /> <dubbo:protocol id="jsonrpc" name="jsonrpc" host="127.0.0.1" port="10010" /> <!-- 声明需要暴露的服务接口 --> - <dubbo:service registry="ikurento,ikurento2" timeout="3000" interface="com.ikurento.user.UserProvider" ref="demoService"/> - <dubbo:service registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="otherService" version="2.0"/> - <dubbo:service registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="otherService" group="as" version="2.0"/> + <dubbo:service id="aaa" registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="demoService"/> + <dubbo:service id="bbb" registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="otherService" version="2.0"/> + <dubbo:service id="ccc" registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="otherService" group="as" version="2.0"/> <bean id="demoService" class="com.ikurento.user.UserProviderImpl" /> <bean id="otherService" class="com.ikurento.user.UserProviderAnotherImpl"/> diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index 30a4ed1692d650a2580c008dde6f834e2c29b81e..4c7b5146f9682867e50549d6dfa419f430c5c558 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -49,13 +49,13 @@ var ( // export APP_LOG_CONF_FILE="xxx" func main() { - conMap, _ := config.Load() - if conMap == nil { + conLen, _ := config.Load() + if conLen == 0 { panic("conMap is nil") } println("\n\n\necho") - res, err := conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).Echo(context.TODO(), "OK") + res, err := userProvider.Echo(context.TODO(), "OK") if err != nil { println("echo - error: %v", err) } else { @@ -66,21 +66,21 @@ func main() { println("\n\n\nstart to test jsonrpc") user := &JsonRPCUser{} - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser(context.TODO(), []interface{}{"A003"}, user) + err = userProvider.GetUser(context.TODO(), []interface{}{"A003"}, user) if err != nil { panic(err) } println("response result: %v", user) println("\n\n\nstart to test jsonrpc - GetUser0") - ret, err := conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser0("A003", "Moorse") + ret, err := userProvider.GetUser0("A003", "Moorse") if err != nil { panic(err) } println("response result: %v", ret) println("\n\n\nstart to test jsonrpc - GetUsers") - ret1, err := conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) + ret1, err := userProvider.GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) if err != nil { panic(err) } @@ -88,15 +88,21 @@ func main() { println("\n\n\nstart to test jsonrpc - getUser") user = &JsonRPCUser{} - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser2(context.TODO(), []interface{}{1}, user) + err = userProvider.GetUser2(context.TODO(), []interface{}{1}, user) if err != nil { - println("getUser - error: %v", err) - } else { - println("response result: %v", user) + panic(err) + } + println("response result: %v", user) + + println("\n\n\nstart to test jsonrpc - GetUser3") + err = userProvider.GetUser3() + if err != nil { + panic(err) } + println("succ!") println("\n\n\nstart to test jsonrpc illegal method") - err = conMap["com.ikurento.user.UserProvider"].GetRPCService().(*UserProvider).GetUser1(context.TODO(), []interface{}{"A003"}, user) + err = userProvider.GetUser1(context.TODO(), []interface{}{"A003"}, user) if err != nil { panic(err) } diff --git a/examples/jsonrpc/go-client/app/user.go b/examples/jsonrpc/go-client/app/user.go index 6e2a97081ce74b28424e9faa8a74194cf9ea141f..ca98b1af0b3c1379c73623162546db9fb4fc95d6 100644 --- a/examples/jsonrpc/go-client/app/user.go +++ b/examples/jsonrpc/go-client/app/user.go @@ -27,8 +27,10 @@ import ( "github.com/apache/dubbo-go/config" ) +var userProvider = new(UserProvider) + func init() { - config.SetConsumerService(new(UserProvider)) + config.SetConsumerService(userProvider) } type JsonRPCUser struct { @@ -52,7 +54,8 @@ type UserProvider struct { GetUser0 func(id string, name string) (JsonRPCUser, error) GetUser1 func(ctx context.Context, req []interface{}, rsp *JsonRPCUser) error GetUser2 func(ctx context.Context, req []interface{}, rsp *JsonRPCUser) error `dubbo:"getUser"` - Echo func(ctx context.Context, req interface{}) (interface{}, error) // Echo represent EchoFilter will be used + GetUser3 func() error + Echo func(ctx context.Context, req interface{}) (interface{}, error) // Echo represent EchoFilter will be used } func (u *UserProvider) Service() string { diff --git a/examples/jsonrpc/go-client/profiles/dev/client.yml b/examples/jsonrpc/go-client/profiles/dev/client.yml index 020adc5e6bd1294a8174f5c0e663f379fe03c88e..e82a200e1ce8ea85ea98773fb562f46ca250e80f 100644 --- a/examples/jsonrpc/go-client/profiles/dev/client.yml +++ b/examples/jsonrpc/go-client/profiles/dev/client.yml @@ -33,9 +33,11 @@ registries : references: - registries : - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "jsonrpc" +# version : "2.0" +# group: "as" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : diff --git a/examples/jsonrpc/go-client/profiles/release/client.yml b/examples/jsonrpc/go-client/profiles/release/client.yml index 3e3bd63f5b2a73312936fc6dbc07404bcbe11687..f7e7df4036ff7a0ba70587542bb61592126f6d09 100644 --- a/examples/jsonrpc/go-client/profiles/release/client.yml +++ b/examples/jsonrpc/go-client/profiles/release/client.yml @@ -34,9 +34,11 @@ registries : references: - registries : - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "jsonrpc" +# version : "2.0" +# group: "as" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : diff --git a/examples/jsonrpc/go-client/profiles/test/client.yml b/examples/jsonrpc/go-client/profiles/test/client.yml index e79f7f47f72f598775be0c226b2f63a3d334f8fe..e4a6c4b166feddb042ee2193e5b6569703976e6e 100644 --- a/examples/jsonrpc/go-client/profiles/test/client.yml +++ b/examples/jsonrpc/go-client/profiles/test/client.yml @@ -33,9 +33,11 @@ registries : references: - registries : - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "jsonrpc" +# version : "2.0" +# group: "as" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : diff --git a/examples/jsonrpc/go-server/app/server.go b/examples/jsonrpc/go-server/app/server.go index 45692b4d1259ef0a8a8c4d784575488184289658..ba747497bf608deadcb4e8392e3def30bebfc6f5 100644 --- a/examples/jsonrpc/go-server/app/server.go +++ b/examples/jsonrpc/go-server/app/server.go @@ -48,8 +48,8 @@ var ( // export APP_LOG_CONF_FILE="xxx" func main() { - _, proMap := config.Load() - if proMap == nil { + _, proLen := config.Load() + if proLen == 0 { panic("proMap is nil") } diff --git a/examples/jsonrpc/go-server/app/user.go b/examples/jsonrpc/go-server/app/user.go index fbe6f3339c212d2bd42d52b6bbf7c7fcec6fb9c3..e86d915417cc54b05faa25ebaa06dae2c5fb6dd1 100644 --- a/examples/jsonrpc/go-server/app/user.go +++ b/examples/jsonrpc/go-server/app/user.go @@ -20,6 +20,7 @@ package main import ( "context" "fmt" + "strconv" "time" ) @@ -127,6 +128,19 @@ func (u *UserProvider) GetUser0(id string, name string) (User, error) { return *user, err } +func (u *UserProvider) GetUser2(ctx context.Context, req []interface{}, rsp *User) error { + var err error + + println("req:%#v", req) + rsp.Id = strconv.FormatFloat(req[0].(float64), 'f', 0, 64) + rsp.Sex = Gender(MAN).String() + return err +} + +func (u *UserProvider) GetUser3() error { + return nil +} + func (u *UserProvider) GetUsers(req []interface{}) ([]User, error) { var err error @@ -146,6 +160,12 @@ func (u *UserProvider) GetUsers(req []interface{}) ([]User, error) { return []User{*user, *user1}, err } +func (s *UserProvider) MethodMapper() map[string]string { + return map[string]string{ + "GetUser2": "getUser", + } +} + func (u *UserProvider) Service() string { return "com.ikurento.user.UserProvider" } diff --git a/examples/jsonrpc/go-server/profiles/dev/server.yml b/examples/jsonrpc/go-server/profiles/dev/server.yml index 0175847a68c19427a35d000f6a7e6d215fd8cf64..70e62b20c8f1a323486f0468463d10241cf801b4 100644 --- a/examples/jsonrpc/go-server/profiles/dev/server.yml +++ b/examples/jsonrpc/go-server/profiles/dev/server.yml @@ -28,7 +28,7 @@ registries : services: - registries: - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "jsonrpc" # 相当于dubbo.xml中的interface interface : "com.ikurento.user.UserProvider" diff --git a/examples/jsonrpc/go-server/profiles/release/server.yml b/examples/jsonrpc/go-server/profiles/release/server.yml index 3c70e82483be227d60e7e9806ed19d3f371ded11..1ded448a136807bfd602fa2f0bdf93d7f4bd46d5 100644 --- a/examples/jsonrpc/go-server/profiles/release/server.yml +++ b/examples/jsonrpc/go-server/profiles/release/server.yml @@ -28,7 +28,7 @@ registries : services: - registries: - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "jsonrpc" # 相当于dubbo.xml中的interface interface : "com.ikurento.user.UserProvider" diff --git a/examples/jsonrpc/go-server/profiles/test/server.yml b/examples/jsonrpc/go-server/profiles/test/server.yml index a8ad5746b35804cc0154830fa2d59be249bbc46b..5948fd78f6dbda86e63211cdbfac663570a94dbd 100644 --- a/examples/jsonrpc/go-server/profiles/test/server.yml +++ b/examples/jsonrpc/go-server/profiles/test/server.yml @@ -27,7 +27,7 @@ registries : services: - registries: - "hangzhouzk" - - "shanghaizk" +# - "shanghaizk" protocol : "jsonrpc" # 相当于dubbo.xml中的interface interface : "com.ikurento.user.UserProvider" diff --git a/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/Consumer.java b/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/Consumer.java index b2b8e95f94b5112721e12bf738b05bdd3bd9c419..ddf899aa10979d65f9c88bc0b79ccbb065812417 100644 --- a/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/Consumer.java +++ b/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/Consumer.java @@ -55,6 +55,13 @@ public class Consumer { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + " UserInfo, Id:" + user2.getId() + ", name:" + user2.getName() + ", sex:" + user2.getSex().toString() + ", age:" + user2.getAge() + ", time:" + user2.getTime().toString()); + User user3 = userProvider.getUser(1); + System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + + " UserInfo, Id:" + user3.getId() + ", name:" + user3.getName() + ", sex:" + user3.getSex().toString() + + ", age:" + user3.getAge() + ", time:" + user3.getTime().toString()); + + userProvider.GetUser3(); + System.out.println("GetUser3 succ"); } catch (Exception e) { e.printStackTrace(); } diff --git a/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/UserProvider.java b/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/UserProvider.java index d5bce8105673a24d78ddd3a636788d1ccf8e57a6..93e7aadf02a869163f41a8f3dda84d2ebf18a475 100644 --- a/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/UserProvider.java +++ b/examples/jsonrpc/java-client/src/main/java/com/ikurento/user/UserProvider.java @@ -20,7 +20,8 @@ import java.util.List; public interface UserProvider { User GetUser(String userId); - + User getUser(int usercode); + void GetUser3(); List<User> GetUsers(List<String> userIdList); User GetUser0(String userId, String name); } diff --git a/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProvider.java b/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProvider.java index b75740bbcd26a6438d22f7d3bf08fa5e316f7aa7..a74b2222018988e13b29f5d3b74cee00815ec40e 100644 --- a/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProvider.java +++ b/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProvider.java @@ -14,6 +14,8 @@ public interface UserProvider { User GetUser0(String userId, String name); + void GetUser3(); + Map<String, User> GetUserMap(List<String> userIdList); User getUser(int usercode); diff --git a/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java b/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java index 157253575b9e5e75dadaaeaffa1e256374fefa5d..753a6f89a5f60e0f4884711d4c3b79e52ed2f094 100644 --- a/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java +++ b/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderAnotherImpl.java @@ -32,9 +32,12 @@ public class UserProviderAnotherImpl implements UserProvider { return new User(userId, "Joe", 48); } - public User GetUser0(String userId, String name) { - return new User(userId, name, 48); - } + public User GetUser0(String userId, String name) { + return new User(userId, name, 48); + } + + public void GetUser3() { + } public List<User> GetUsers(ArrayList<String> userIdList) { Iterator it = userIdList.iterator(); diff --git a/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java b/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java index 25e97dd1c4482f2ff6ae7acb1ecb01a5ed66b328..960c678cf76cf4bafb3de9d5ce2a587b61aa1bac 100644 --- a/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java +++ b/examples/jsonrpc/java-server/src/main/java/com/ikurento/user/UserProviderImpl.java @@ -78,7 +78,8 @@ public class UserProviderImpl implements UserProvider { public Map<String, User> queryAll() { return userMap; } - + public void GetUser3() { + } public User getUser(int userCode) { return new User(String.valueOf(userCode), "userCode get", 48); diff --git a/examples/jsonrpc/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml b/examples/jsonrpc/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml index b3a1b19d6764ca6db895719709412c07b348f13e..6eb03bc94bbf34b23b6df890b7c70568466ffad8 100644 --- a/examples/jsonrpc/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml +++ b/examples/jsonrpc/java-server/src/main/resources/META-INF/spring/dubbo.provider.xml @@ -30,7 +30,7 @@ <dubbo:protocol id="dubbo" name="dubbo" host="127.0.0.1" port="20010" /> <dubbo:protocol id="jsonrpc" name="jsonrpc" host="127.0.0.1" port="10010" /> <!-- 声明需要暴露的服务接口 --> - <dubbo:service registry="ikurento,ikurento2" timeout="3000" interface="com.ikurento.user.UserProvider" ref="demoService"/> + <dubbo:service registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="demoService"/> <dubbo:service registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="otherService" version="2.0"/> <dubbo:service registry="ikurento" timeout="3000" interface="com.ikurento.user.UserProvider" ref="otherService" group="as" version="2.0"/> diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index d22ac91ed562468a40daf693948d79962bf07e0d..d6155b6021a07e51e4d4a779e56aff5e62bd40ce 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -222,7 +222,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") p.Service.Target = svcUrl.GetParam(constant.INTERFACE_KEY, "") p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") - p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION) + p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Method = method p.Service.Timeout = opts.RequestTimeout if opts.SerialID == 0 { diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index d9ba540dc7f1a4c927bf0449aa2bd56ea16dcc65..1ea9e4fb0e02a1e1234e8026f5a291398508133c 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -36,17 +36,6 @@ import ( "github.com/apache/dubbo-go/protocol" ) -type ( - User struct { - Id string `json:"id"` - Name string `json:"name"` - } - - UserProvider struct { - user map[string]User - } -) - func TestClient_CallOneway(t *testing.T) { proto, url := InitTest(t) @@ -74,41 +63,53 @@ func TestClient_Call(t *testing.T) { c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) user := &User{} - err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{}, user) - assert.NoError(t, err) - assert.NotEqual(t, "", user.Id) - assert.NotEqual(t, "", user.Name) + //err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) + //assert.NoError(t, err) + //assert.NotEqual(t, "", user.Id) + //assert.NotEqual(t, "", user.Name) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) + err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", "username"}, user) + err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, user) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) - user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser1", []interface{}{"1", "username"}, user) + err = c.Call("127.0.0.1:20000", url, "GetUser1", []interface{}{}, user) + assert.NoError(t, err) + + err = c.Call("127.0.0.1:20000", url, "GetUser2", []interface{}{}, user) assert.EqualError(t, err, "error") user2 := []interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser2", []interface{}{"1", "username"}, &user2) + err = c.Call("127.0.0.1:20000", url, "GetUser3", []interface{}{}, &user2) assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) user2 = []interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser3", []interface{}{[]interface{}{"1", "username"}}, &user2) + err = c.Call("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, &user2) assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) user3 := map[interface{}]interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser4", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, &user3) + err = c.Call("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, &user3) assert.NoError(t, err) assert.NotNil(t, user3) assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"]) + user = &User{} + err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, user) + assert.NoError(t, err) + assert.Equal(t, User{Id: "", Name: ""}, *user) + + user = &User{} + err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, user) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: ""}, *user) + // destroy proto.Destroy() } @@ -144,7 +145,7 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) assert.NoError(t, err) - assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4", methods) + assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods) // config SetClientConf(ClientConfig{ @@ -207,6 +208,21 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { return proto, url } +////////////////////////////////// +// provider +////////////////////////////////// + +type ( + User struct { + Id string `json:"id"` + Name string `json:"name"` + } + + UserProvider struct { + user map[string]User + } +) + // size:4801228 func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error { argBuf := new(bytes.Buffer) @@ -225,28 +241,39 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User return nil } -func (u *UserProvider) GetUser0(id string, name string) (User, error) { +func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) { return User{Id: id, Name: name}, nil } -func (u *UserProvider) GetUser1(ctx context.Context, req []interface{}, rsp *User) error { +func (u *UserProvider) GetUser1() error { + return nil +} + +func (u *UserProvider) GetUser2() error { return perrors.New("error") } -func (u *UserProvider) GetUser2(ctx context.Context, req []interface{}, rsp *[]interface{}) error { - *rsp = append(*rsp, User{Id: req[0].(string), Name: req[1].(string)}) +func (u *UserProvider) GetUser3(rsp *[]interface{}) error { + *rsp = append(*rsp, User{Id: "1", Name: "username"}) return nil } -func (u *UserProvider) GetUser3(ctx context.Context, req []interface{}) ([]interface{}, error) { +func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) { return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil } -func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) { +func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) { return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil } +func (u *UserProvider) GetUser6(id int64) (*User, error) { + if id == 0 { + return nil, nil + } + return &User{Id: "1"}, nil +} + func (u *UserProvider) Service() string { return "com.ikurento.user.UserProvider" } diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 269a38cf6f9e5891bf622aa6a63820ebc5989194..a2df3d91b2fa6b1ef8907ecba8832368d0613b8e 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" + "sync" ) const ( @@ -38,7 +39,8 @@ var ( type DubboProtocol struct { protocol.BaseProtocol - serverMap map[string]*Server + serverMap map[string]*Server + serverLock sync.Mutex } func NewDubboProtocol() *DubboProtocol { @@ -50,7 +52,7 @@ func NewDubboProtocol() *DubboProtocol { func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() - serviceKey := url.Key() + serviceKey := url.ServiceKey() exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap()) dp.SetExporterMap(serviceKey, exporter) logger.Infof("Export service: %s", url.String()) @@ -80,18 +82,27 @@ func (dp *DubboProtocol) Destroy() { } func (dp *DubboProtocol) openServer(url common.URL) { - exporter, ok := dp.ExporterMap().Load(url.Key()) + _, ok := dp.serverMap[url.Location] if !ok { - panic("[DubboProtocol]" + url.Key() + "is not existing") + _, ok := dp.ExporterMap().Load(url.ServiceKey()) + if !ok { + panic("[DubboProtocol]" + url.Key() + "is not existing") + } + + dp.serverLock.Lock() + _, ok = dp.serverMap[url.Location] + if !ok { + srv := NewServer() + dp.serverMap[url.Location] = srv + srv.Start(url) + } + dp.serverLock.Unlock() } - srv := NewServer(exporter.(protocol.Exporter)) - dp.serverMap[url.Location] = srv - srv.Start(url) } func GetProtocol() protocol.Protocol { - if dubboProtocol != nil { - return dubboProtocol + if dubboProtocol == nil { + dubboProtocol = NewDubboProtocol() } - return NewDubboProtocol() + return dubboProtocol } diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 0c4bfdcc7a7089873b66f278028f2e4358b013d3..3543d8da803b00befe9e08286bd67c09cd0afef2 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -19,6 +19,7 @@ package dubbo import ( "context" + "github.com/apache/dubbo-go/common/constant" "testing" ) @@ -34,24 +35,36 @@ import ( func TestDubboProtocol_Export(t *testing.T) { // Export proto := GetProtocol() + srvConf = &ServerConfig{} url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245") assert.NoError(t, err) - srvConf = &ServerConfig{} exporter := proto.Export(protocol.NewBaseInvoker(url)) // make sure url eq := exporter.GetInvoker().GetUrl().URLEqual(url) assert.True(t, eq) + // second service: the same path and the diferent version + url2, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245", common.WithParamsValue(constant.VERSION_KEY, "v1.1")) + assert.NoError(t, err) + exporter2 := proto.Export(protocol.NewBaseInvoker(url2)) + // make sure url + eq2 := exporter2.GetInvoker().GetUrl().URLEqual(url2) + assert.True(t, eq2) + // make sure exporterMap after 'Unexport' - _, ok := proto.(*DubboProtocol).ExporterMap().Load(url.Key()) + _, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey()) assert.True(t, ok) exporter.Unexport() - _, ok = proto.(*DubboProtocol).ExporterMap().Load(url.Key()) + _, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey()) assert.False(t, ok) // make sure serverMap after 'Destroy' diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index ea8c38db9817ae2df078ef9b21caaf7b13751b3e..15e222676afe5579fc53df46a3983b55e5d3f2b4 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -19,6 +19,8 @@ package dubbo import ( "context" + "fmt" + "net/url" "reflect" "sync" "time" @@ -133,16 +135,14 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { //////////////////////////////////////////// type RpcServerHandler struct { - exporter protocol.Exporter maxSessionNum int sessionTimeout time.Duration sessionMap map[getty.Session]*rpcSession rwlock sync.RWMutex } -func NewRpcServerHandler(exporter protocol.Exporter, maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { +func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { return &RpcServerHandler{ - exporter: exporter, maxSessionNum: maxSessionNum, sessionTimeout: sessionTimeout, sessionMap: make(map[getty.Session]*rpcSession), @@ -208,11 +208,28 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { twoway = false } - invoker := h.exporter.GetInvoker() + group := p.Body.(map[string]interface{})["attachments"].(map[interface{}]interface{})[constant.GROUP_KEY] + if group == nil { + group = "" + } + u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}), + common.WithParamsValue(constant.GROUP_KEY, group.(string)), + common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface), + common.WithParamsValue(constant.VERSION_KEY, p.Service.Version)) + exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey()) + if exporter == nil { + err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey()) + logger.Errorf(err.Error()) + p.Header.ResponseStatus = hessian.Response_OK + p.Body = err + h.reply(session, p, hessian.PackageResponse) + return + } + invoker := exporter.(protocol.Exporter).GetInvoker() if invoker != nil { result := invoker.Invoke(invocation.NewRPCInvocationForProvider(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), map[string]string{ - constant.PATH_KEY: p.Service.Path, - //attachments[constant.GROUP_KEY] = url.GetParam(constant.GROUP_KEY, "") + constant.PATH_KEY: p.Service.Path, + constant.GROUP_KEY: group.(string), constant.INTERFACE_KEY: p.Service.Interface, constant.VERSION_KEY: p.Service.Version, })) @@ -268,13 +285,13 @@ func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { if e := recover(); e != nil { req.Header.ResponseStatus = hessian.Response_SERVER_ERROR if err, ok := e.(error); ok { - logger.Errorf("callService panic: %#v", err) + logger.Errorf("callService panic: %+v", perrors.WithStack(err)) req.Body = perrors.WithStack(err) } else if err, ok := e.(string); ok { - logger.Errorf("callService panic: %#v", perrors.New(err)) + logger.Errorf("callService panic: %+v", perrors.New(err)) req.Body = perrors.New(err) } else { - logger.Errorf("callService panic: %#v, this is impossible.", e) + logger.Errorf("callService panic: %+v, this is impossible.", e) req.Body = e } } @@ -307,13 +324,21 @@ func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { in = append(in, reflect.ValueOf(argv)) } else { for i := 0; i < len(argv.([]interface{})); i++ { - in = append(in, reflect.ValueOf(argv.([]interface{})[i])) + t := reflect.ValueOf(argv.([]interface{})[i]) + if !t.IsValid() { + at := method.ArgsType()[i] + if at.Kind() == reflect.Ptr { + at = at.Elem() + } + t = reflect.New(at) + } + in = append(in, t) } } // prepare replyv var replyv reflect.Value - if method.ReplyType() == nil { + if method.ReplyType() == nil && len(method.ArgsType()) > 0 { replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) in = append(in, replyv) } @@ -331,7 +356,11 @@ func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { req.Header.ResponseStatus = hessian.Response_OK req.Body = retErr } else { - req.Body = replyv.Interface() + if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { + req.Body = replyv.Interface() + } else { + req.Body = nil + } } } diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index c3b106efcb83ce50bc1697f32e5cdf987f714d4e..de205fa75f5fe8e8ef75c7c0f12cc2f78b3c397b 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -21,7 +21,6 @@ import ( "fmt" "math/rand" "net" - "strings" "sync" "time" ) @@ -337,13 +336,3 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { } } } - -func GenerateEndpointAddr(protocol, addr string) string { - var builder strings.Builder - - builder.WriteString(protocol) - builder.WriteString("://") - builder.WriteString(addr) - - return builder.String() -} diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 8bed30f7465bbe22e8c61b5ca887da1c668a3ba9..ac521bdc485c5add3745ba78acf8cafab6675158 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -31,7 +31,6 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/protocol" ) var srvConf *ServerConfig @@ -76,21 +75,18 @@ func GetServerConfig() ServerConfig { } type Server struct { - conf ServerConfig - tcpServer getty.Server - exporter protocol.Exporter - + conf ServerConfig + tcpServer getty.Server rpcHandler *RpcServerHandler } -func NewServer(exporter protocol.Exporter) *Server { +func NewServer() *Server { s := &Server{ - exporter: exporter, - conf: *srvConf, + conf: *srvConf, } - s.rpcHandler = NewRpcServerHandler(s.exporter, s.conf.SessionNumber, s.conf.sessionTimeout) + s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout) return s } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index c8f45c561ef88f49d33b755f988ff9a125e30c8f..60562639395d6379179aba4c7a0e141f97b3eed9 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -48,7 +48,7 @@ func NewRPCInvocationForConsumer(methodName string, parameterTypes []reflect.Typ attachments[constant.PATH_KEY] = url.Path attachments[constant.GROUP_KEY] = url.GetParam(constant.GROUP_KEY, "") attachments[constant.INTERFACE_KEY] = url.GetParam(constant.INTERFACE_KEY, "") - attachments[constant.VERSION_KEY] = url.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION) + attachments[constant.VERSION_KEY] = url.GetParam(constant.VERSION_KEY, "") return &RPCInvocation{ methodName: methodName, diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index af65db04b880a0e6d7f18fd5a96b775795be0fab..46e2da06b77c070cf15ff6ee6b4781c453022747 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -100,7 +100,7 @@ func (c *HTTPClient) NewRequest(service common.URL, method string, args interfac ID: atomic.AddInt64(&c.ID, 1), group: service.GetParam(constant.GROUP_KEY, ""), protocol: service.Protocol, - version: service.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION), + version: service.GetParam(constant.VERSION_KEY, ""), service: service.Path, method: method, args: args, @@ -136,7 +136,7 @@ func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, return perrors.WithStack(err) } - rspBody, err := c.Do(service.Location, service.Params.Get("interface"), httpHeader, reqBody) + rspBody, err := c.Do(service.Location, service.Path, httpHeader, reqBody) if err != nil { return perrors.WithStack(err) } diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 3f6f983702360b9c3036c943b0651e5c935376e3..f05f47b8c1b571846335852503bb2c3a5a8c5a8d 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -50,7 +50,7 @@ func TestHTTPClient_Call(t *testing.T) { methods, err := common.ServiceMap.Register("jsonrpc", &UserProvider{}) assert.NoError(t, err) - assert.Equal(t, "GetUser,GetUser0,GetUser1,GetUser2,GetUser3", methods) + assert.Equal(t, "GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4", methods) // Export proto := GetProtocol() @@ -82,9 +82,9 @@ func TestHTTPClient_Call(t *testing.T) { ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", "X-Services": url.Path, - "X-Method": "GetUser", + "X-Method": "GetUser0", }) - req = client.NewRequest(url, "GetUser0", []interface{}{"1", "username"}) + req = client.NewRequest(url, "GetUser0", []interface{}{"1", nil, "username"}) reply = &User{} err = client.Call(ctx, url, req, reply) assert.NoError(t, err) @@ -97,7 +97,7 @@ func TestHTTPClient_Call(t *testing.T) { "X-Services": url.Path, "X-Method": "GetUser1", }) - req = client.NewRequest(url, "GetUser1", []interface{}{""}) + req = client.NewRequest(url, "GetUser1", []interface{}{}) reply = &User{} err = client.Call(ctx, url, req, reply) assert.True(t, strings.Contains(err.Error(), "500 Internal Server Error")) @@ -107,7 +107,7 @@ func TestHTTPClient_Call(t *testing.T) { ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", "X-Services": url.Path, - "X-Method": "GetUser", + "X-Method": "GetUser2", }) req = client.NewRequest(url, "GetUser2", []interface{}{"1", "username"}) reply1 := []User{} @@ -119,7 +119,7 @@ func TestHTTPClient_Call(t *testing.T) { ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", "X-Services": url.Path, - "X-Method": "GetUser", + "X-Method": "GetUser3", }) req = client.NewRequest(url, "GetUser3", []interface{}{"1", "username"}) reply1 = []User{} @@ -127,6 +127,29 @@ func TestHTTPClient_Call(t *testing.T) { assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, reply1[0]) + // call GetUser4 + ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ + "X-Proxy-Id": "dubbogo", + "X-Services": url.Path, + "X-Method": "GetUser4", + }) + req = client.NewRequest(url, "GetUser4", []interface{}{0}) + reply = &User{} + err = client.Call(ctx, url, req, reply) + assert.NoError(t, err) + assert.Equal(t, &User{Id: "", Name: ""}, reply) + + ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ + "X-Proxy-Id": "dubbogo", + "X-Services": url.Path, + "X-Method": "GetUser4", + }) + req = client.NewRequest(url, "GetUser4", []interface{}{1}) + reply = &User{} + err = client.Call(ctx, url, req, reply) + assert.NoError(t, err) + assert.Equal(t, &User{Id: "1", Name: ""}, reply) + // destroy proto.Destroy() @@ -138,11 +161,11 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User return nil } -func (u *UserProvider) GetUser0(id string, name string) (User, error) { +func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) { return User{Id: id, Name: name}, nil } -func (u *UserProvider) GetUser1(ctx context.Context, req []interface{}, rsp *User) error { +func (u *UserProvider) GetUser1() error { return perrors.New("error") } @@ -155,6 +178,13 @@ func (u *UserProvider) GetUser3(ctx context.Context, req []interface{}) ([]User, return []User{{Id: req[0].(string), Name: req[1].(string)}}, nil } +func (u *UserProvider) GetUser4(id float64) (*User, error) { + if id == 0 { + return nil, nil + } + return &User{Id: "1"}, nil +} + func (u *UserProvider) Service() string { return "com.ikurento.user.UserProvider" } diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index c147f9add85041b3dfaf1c9463813c560f9238d0..7ee454e8ad16d2ee96ed08e7e5f55b2209a81054 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -192,6 +192,9 @@ func (c *jsonClientCodec) Read(streamBytes []byte, x interface{}) error { return perrors.New(c.rsp.Error.Error()) } + if c.rsp.Result == nil { + return nil + } return perrors.WithStack(json.Unmarshal(*c.rsp.Result, x)) } diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go index 9641fb532148bc78562755ea4c55696142278951..0dd427eb69127317d646c599506dc476f2859a3f 100644 --- a/protocol/jsonrpc/jsonrpc_invoker_test.go +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -37,7 +37,7 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) { methods, err := common.ServiceMap.Register("jsonrpc", &UserProvider{}) assert.NoError(t, err) - assert.Equal(t, "GetUser,GetUser0,GetUser1,GetUser2,GetUser3", methods) + assert.Equal(t, "GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4", methods) // Export proto := GetProtocol() diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index fa4071b4fd75ecc4f0be94f93d31c13a8cccb8f2..c18345d413edb2d263f1acaef1741514b665f042 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -17,6 +17,11 @@ package jsonrpc +import ( + "strings" + "sync" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" @@ -35,10 +40,11 @@ var jsonrpcProtocol *JsonrpcProtocol type JsonrpcProtocol struct { protocol.BaseProtocol - serverMap map[string]*Server + serverMap map[string]*Server + serverLock sync.Mutex } -func NewDubboProtocol() *JsonrpcProtocol { +func NewJsonrpcProtocol() *JsonrpcProtocol { return &JsonrpcProtocol{ BaseProtocol: protocol.NewBaseProtocol(), serverMap: make(map[string]*Server), @@ -47,7 +53,8 @@ func NewDubboProtocol() *JsonrpcProtocol { func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() - serviceKey := url.Key() + serviceKey := strings.TrimPrefix(url.Path, "/") + exporter := NewJsonrpcExporter(serviceKey, invoker, jp.ExporterMap()) jp.SetExporterMap(serviceKey, exporter) logger.Infof("Export service: %s", url.String()) @@ -81,18 +88,27 @@ func (jp *JsonrpcProtocol) Destroy() { } func (jp *JsonrpcProtocol) openServer(url common.URL) { - exporter, ok := jp.ExporterMap().Load(url.Key()) + _, ok := jp.serverMap[url.Location] if !ok { - panic("[JsonrpcProtocol]" + url.Key() + "is not existing") + _, ok := jp.ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + if !ok { + panic("[JsonrpcProtocol]" + url.Key() + "is not existing") + } + + jp.serverLock.Lock() + _, ok = jp.serverMap[url.Location] + if !ok { + srv := NewServer() + jp.serverMap[url.Location] = srv + srv.Start(url) + } + jp.serverLock.Unlock() } - srv := NewServer(exporter.(protocol.Exporter)) - jp.serverMap[url.Location] = srv - srv.Start(url) } func GetProtocol() protocol.Protocol { - if jsonrpcProtocol != nil { - return jsonrpcProtocol + if jsonrpcProtocol == nil { + jsonrpcProtocol = NewJsonrpcProtocol() } - return NewDubboProtocol() + return jsonrpcProtocol } diff --git a/protocol/jsonrpc/jsonrpc_protocol_test.go b/protocol/jsonrpc/jsonrpc_protocol_test.go index 866f3d72be7b907cd9e2a8e56528f1b76848646d..253ab830dd85e5424811b7fd4e7e7e848adad415 100644 --- a/protocol/jsonrpc/jsonrpc_protocol_test.go +++ b/protocol/jsonrpc/jsonrpc_protocol_test.go @@ -19,6 +19,8 @@ package jsonrpc import ( "context" + "fmt" + "strings" "testing" "time" ) @@ -49,10 +51,11 @@ func TestJsonrpcProtocol_Export(t *testing.T) { assert.True(t, eq) // make sure exporterMap after 'Unexport' - _, ok := proto.(*JsonrpcProtocol).ExporterMap().Load(url.Key()) + fmt.Println(url.Path) + _, ok := proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) assert.True(t, ok) exporter.Unexport() - _, ok = proto.(*JsonrpcProtocol).ExporterMap().Load(url.Key()) + _, ok = proto.(*JsonrpcProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) assert.False(t, ok) // make sure serverMap after 'Destroy' diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index a7643dc215e8ae1ef5f7e5b02236f881b0f963c9..5b5548067225bcf6d8bcbaf35cee63c829c03edc 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -40,7 +40,6 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -58,19 +57,17 @@ const ( ) type Server struct { - exporter protocol.Exporter - done chan struct{} - once sync.Once + done chan struct{} + once sync.Once sync.RWMutex wg sync.WaitGroup timeout time.Duration } -func NewServer(exporter protocol.Exporter) *Server { +func NewServer() *Server { return &Server{ - exporter: exporter, - done: make(chan struct{}), + done: make(chan struct{}), } } @@ -161,7 +158,7 @@ func (s *Server) handlePkg(conn net.Conn) { } setTimeout(conn, httpTimeout) - if err := serveRequest(ctx, reqHeader, reqBody, conn, s.exporter); err != nil { + if err := serveRequest(ctx, reqHeader, reqBody, conn); err != nil { if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil { logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s", r.Header, perrors.WithStack(err), errRsp) @@ -249,8 +246,7 @@ func (s *Server) Stop() { } func serveRequest(ctx context.Context, - header map[string]string, body []byte, conn net.Conn, exporter protocol.Exporter) error { - + header map[string]string, body []byte, conn net.Conn) error { sendErrorResp := func(header map[string]string, body []byte) error { rsp := &http.Response{ Header: make(http.Header), @@ -309,27 +305,26 @@ func serveRequest(ctx context.Context, return perrors.New("server cannot decode request: " + err.Error()) } - serviceName := header["Path"] + path := header["Path"] methodName := codec.req.Method - if len(serviceName) == 0 || len(methodName) == 0 { + if len(path) == 0 || len(methodName) == 0 { codec.ReadBody(nil) - return perrors.New("service/method request ill-formed: " + serviceName + "/" + methodName) + return perrors.New("service/method request ill-formed: " + path + "/" + methodName) } // read body - var args interface{} + var args []interface{} if err = codec.ReadBody(&args); err != nil { return perrors.WithStack(err) } logger.Debugf("args: %v", args) // exporter invoke - invoker := exporter.GetInvoker() + exporter, _ := jsonrpcProtocol.ExporterMap().Load(path) + invoker := exporter.(*JsonrpcExporter).GetInvoker() if invoker != nil { - result := invoker.Invoke(invocation.NewRPCInvocationForProvider(methodName, args.([]interface{}), map[string]string{ - //attachments[constant.PATH_KEY] = url.Path - //attachments[constant.GROUP_KEY] = url.GetParam(constant.GROUP_KEY, "") - //attachments[constant.INTERFACE_KEY] = url.GetParam(constant.INTERFACE_KEY, "") + result := invoker.Invoke(invocation.NewRPCInvocationForProvider(methodName, args, map[string]string{ + constant.PATH_KEY: path, constant.VERSION_KEY: codec.req.Version, })) if err := result.Error(); err != nil { @@ -351,7 +346,7 @@ func serveRequest(ctx context.Context, } } } - + serviceName := invoker.GetUrl().Service() // get method svc := common.ServiceMap.GetService(JSONRPC, serviceName) if svc == nil { @@ -371,14 +366,22 @@ func serveRequest(ctx context.Context, if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { in = append(in, reflect.ValueOf(args)) } else { - for i := 0; i < len(args.([]interface{})); i++ { - in = append(in, reflect.ValueOf(args.([]interface{})[i])) + for i := 0; i < len(args); i++ { + t := reflect.ValueOf(args[i]) + if !t.IsValid() { + at := method.ArgsType()[i] + if at.Kind() == reflect.Ptr { + at = at.Elem() + } + t = reflect.New(at) + } + in = append(in, t) } } // prepare replyv var replyv reflect.Value - if method.ReplyType() == nil { + if method.ReplyType() == nil && len(method.ArgsType()) > 0 { replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) in = append(in, replyv) } @@ -401,7 +404,10 @@ func serveRequest(ctx context.Context, // write response code := 200 - rspReply := replyv.Interface() + var rspReply interface{} + if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { + rspReply = replyv.Interface() + } if len(errMsg) != 0 { code = 500 rspReply = invalidRequest diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index 8131accfd3105d101042c1c51178f2794ebc47f6..8a332490f71ead601d151fe5e27390eadcc1cbd8 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -38,7 +38,7 @@ func TestProtocolFilterWrapper_Export(t *testing.T) { filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} - u := common.NewURLWithOptions("Service", + u := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.SERVICE_FILTER_KEY, impl.ECHO)) exporter := filtProto.Export(protocol.NewBaseInvoker(*u)) @@ -50,7 +50,7 @@ func TestProtocolFilterWrapper_Refer(t *testing.T) { filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} - u := common.NewURLWithOptions("Service", + u := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.REFERENCE_FILTER_KEY, impl.ECHO)) invoker := filtProto.Refer(*u) diff --git a/protocol/result.go b/protocol/result.go index f6e27280216641269cfc031888ea53176cc8b225..dcdb62310d359d441067395ea92f8460df97eb22 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -18,8 +18,14 @@ package protocol type Result interface { + SetError(error) Error() error + SetResult(interface{}) Result() interface{} + SetAttachments(map[string]string) + Attachments() map[string]string + AddAttachment(string, string) + Attachment(string, string) string } ///////////////////////////// @@ -27,14 +33,43 @@ type Result interface { ///////////////////////////// type RPCResult struct { - Err error - Rest interface{} + Attrs map[string]string + Err error + Rest interface{} +} + +func (r *RPCResult) SetError(err error) { + r.Err = err } func (r *RPCResult) Error() error { return r.Err } +func (r *RPCResult) SetResult(rest interface{}) { + r.Rest = rest +} + func (r *RPCResult) Result() interface{} { return r.Rest } + +func (r *RPCResult) SetAttachments(attr map[string]string) { + r.Attrs = attr +} + +func (r *RPCResult) Attachments() map[string]string { + return r.Attrs +} + +func (r *RPCResult) AddAttachment(key, value string) { + r.Attrs[key] = value +} + +func (r *RPCResult) Attachment(key, defaultValue string) string { + v, ok := r.Attrs[key] + if !ok { + v = defaultValue + } + return v +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 763a6149b355aadeff212b8e2f05ae51f6457045..a4a9263156345a6e98c5a5189a49d4e54d65a76b 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -18,7 +18,6 @@ package directory import ( - "github.com/apache/dubbo-go/remoting" "sync" "time" ) @@ -36,6 +35,7 @@ import ( "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" ) const ( @@ -171,7 +171,9 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { } if len(groupInvokersMap) == 1 { //len is 1 it means no group setting ,so do not need cluster again - groupInvokersList = groupInvokersMap[""] + for _, invokers := range groupInvokersMap { + groupInvokersList = invokers + } } else { for _, invokers := range groupInvokersMap { staticDir := directory.NewStaticDirectory(invokers) diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 99cf93a23f7d93afdc9790a1399f30da13e857ac..a40452756c73d0b80a91e5424132d0c7bf8251f4 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -19,7 +19,6 @@ package directory import ( "context" - "github.com/apache/dubbo-go/remoting" "net/url" "strconv" "testing" @@ -38,6 +37,7 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" ) func TestSubscribe(t *testing.T) { @@ -51,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) { registryDirectory, mockRegistry := normalRegistryDir() time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 3) - mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))}) time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 2) } @@ -74,14 +74,14 @@ func TestSubscribe_Group(t *testing.T) { mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) - go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) + go registryDirectory.Subscribe(*common.NewURLWithOptions(common.WithPath("testservice"))) //for group1 urlmap := url.Values{} urlmap.Set(constant.GROUP_KEY, "group1") urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"), common.WithParams(urlmap))}) } //for group2 @@ -89,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap2.Set(constant.GROUP_KEY, "group2") urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"), common.WithParams(urlmap2))}) } @@ -127,9 +127,9 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) - go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) + go registryDirectory.Subscribe(*common.NewURLWithOptions(common.WithPath("testservice"))) for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"))}) } return registryDirectory, mockRegistry.(*registry.MockRegistry) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index ff6a9939694acf89f9a4e73e9566d20d55bf9c0e..21a156bc694f3b85033c692252c90d55106f3103 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -291,7 +291,7 @@ func (r *zkRegistry) register(c common.URL) error { return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } // 先创建服务下面的provider node - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.PROVIDER]) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -329,11 +329,11 @@ func (r *zkRegistry) register(c common.URL) error { encodedURL = url.QueryEscape(rawURL) // 把自己注册service providers - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.PROVIDER)).String()) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.PROVIDER)).String()) logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) case common.CONSUMER: - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.CONSUMER]) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.CONSUMER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -341,7 +341,7 @@ func (r *zkRegistry) register(c common.URL) error { logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return perrors.WithStack(err) } - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, common.DubboNodes[common.PROVIDER]) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -358,7 +358,7 @@ func (r *zkRegistry) register(c common.URL) error { rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) encodedURL = url.QueryEscape(rawURL) - dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String()) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.CONSUMER)).String()) logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) default: @@ -427,7 +427,7 @@ func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListene //注册到dataconfig的interested r.dataListener.AddInterestedURL(&conf) - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo%s/providers", conf.Path), r.dataListener) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", conf.Service()), r.dataListener) return zkListener, nil }