diff --git a/.gitignore b/.gitignore index e917b9dfe024bf641baf6c35f8f8d1a1676fdad5..f369c2833aeacbff3aa85a6cd1cdc25520928209 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +*.jar # Test binary, build with `go test -c` *.test diff --git a/README.md b/README.md index d01c5df56a4e33a6bd062c64201fc985c79077b1..36e0ad78c3aefe21c3847f2ede1c4cbec8db9851 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,6 @@ --- Apache Dubbo Go Implementation. - - -Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/). ## License diff --git a/README_CN.md b/README_CN.md index 9a769a196afa87eabc88649a5315b2772a8e4fef..387070d267fbba05fb99226f83bde45e236e491c 100644 --- a/README_CN.md +++ b/README_CN.md @@ -5,9 +5,6 @@ --- Apache Dubbo Go 璇█瀹炵幇 - - -Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/). ## 璇佷功 ## diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go index 027461ccb7f32d6383d756ac986295b6300f249c..c8dbeda09f62e88b51dd4ad2b6b09d5715f0b224 100644 --- a/cluster/cluster_impl/failback_cluster_invoker.go +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -18,6 +18,7 @@ package cluster_impl import ( + "strconv" "sync" "time" ) @@ -54,15 +55,18 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { invoker := &failbackClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } - retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) - if retriesConfig <= 0 { - retriesConfig = constant.DEFAULT_FAILBACK_TIMES + retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) + retries, err := strconv.Atoi(retriesConfig) + if err != nil || retries < 0 { + logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.") + retries = constant.DEFAULT_FAILBACK_TIMES_INT } + failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) if failbackTasksConfig <= 0 { failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS } - invoker.maxRetries = retriesConfig + invoker.maxRetries = int64(retries) invoker.failbackTasks = failbackTasksConfig return invoker } diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index 3dd8c41dfb7b7512d7c81fa6d7f8a7c35cec0dcb..70db6d4c1cdf5150d607d6c5250dfb3da631e95a 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -17,6 +17,10 @@ package cluster_impl +import ( + "strconv" +) + import ( perrors "github.com/pkg/errors" ) @@ -24,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" ) @@ -53,16 +58,21 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr url := invokers[0].GetUrl() //get reties - retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) + retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) //Get the service method loadbalance config if have - if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, 0); v != 0 { - retries = v + if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 { + retriesConfig = v + } + retries, err := strconv.Atoi(retriesConfig) + if err != nil || retries < 0 { + logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.") + retries = constant.DEFAULT_RETRIES_INT } invoked := []protocol.Invoker{} providers := []string{} var result protocol.Result - for i := int64(0); i < retries; i++ { + for i := 0; i <= retries; i++ { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go index dc039db8de41ab6722b20f99c5a0c5536a42a7e6..78b799320dfa58d55e531c658ec5eb0e69306cff 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -118,14 +118,14 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio } func Test_FailoverInvokeSuccess(t *testing.T) { urlParams := url.Values{} - result := normalInvoke(t, 2, urlParams) + result := normalInvoke(t, 3, urlParams) assert.NoError(t, result.Error()) count = 0 } func Test_FailoverInvokeFail(t *testing.T) { urlParams := url.Values{} - result := normalInvoke(t, 3, urlParams) + result := normalInvoke(t, 4, urlParams) assert.Errorf(t, result.Error(), "error") count = 0 } @@ -133,7 +133,7 @@ func Test_FailoverInvokeFail(t *testing.T) { func Test_FailoverInvoke1(t *testing.T) { urlParams := url.Values{} urlParams.Set(constant.RETRIES_KEY, "3") - result := normalInvoke(t, 3, urlParams) + result := normalInvoke(t, 4, urlParams) assert.NoError(t, result.Error()) count = 0 } @@ -144,7 +144,7 @@ func Test_FailoverInvoke2(t *testing.T) { urlParams.Set("methods.test."+constant.RETRIES_KEY, "3") ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) - result := normalInvoke(t, 3, urlParams, ivc) + result := normalInvoke(t, 4, urlParams, ivc) assert.NoError(t, result.Error()) count = 0 } diff --git a/common/constant/default.go b/common/constant/default.go index 9144249ef6d2b04a1e00d2eac210be814e74025d..d937fcb3edb5075922655db8824c9ca2ede5ce1c 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -32,13 +32,15 @@ const ( ) const ( - DEFAULT_LOADBALANCE = "random" - DEFAULT_RETRIES = 2 - DEFAULT_PROTOCOL = "dubbo" - DEFAULT_REG_TIMEOUT = "10s" - DEFAULT_CLUSTER = "failover" - DEFAULT_FAILBACK_TIMES = 3 - DEFAULT_FAILBACK_TASKS = 100 + DEFAULT_LOADBALANCE = "random" + DEFAULT_RETRIES = "2" + DEFAULT_RETRIES_INT = 2 + DEFAULT_PROTOCOL = "dubbo" + DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" + DEFAULT_FAILBACK_TIMES = "3" + DEFAULT_FAILBACK_TIMES_INT = 3 + DEFAULT_FAILBACK_TASKS = 100 ) const ( diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 1665a7346e09016570dd36c56d231d3706b96a54..bafba60b400ec59d99e2d68ecf4d067c906ba6fb 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -17,10 +17,20 @@ package proxy_factory +import ( + "reflect" + "strings" +) + +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/proxy" "github.com/apache/dubbo-go/protocol" ) @@ -51,6 +61,86 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm return proxy.NewProxy(invoker, nil, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { - // todo: call service - return protocol.NewBaseInvoker(url) + return &ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + } +} + +type ProxyInvoker struct { + protocol.BaseInvoker +} + +func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + result := &protocol.RPCResult{} + result.SetAttachments(invocation.Attachments()) + + url := pi.GetUrl() + + methodName := invocation.MethodName() + proto := url.Protocol + path := strings.TrimPrefix(url.Path, "/") + args := invocation.Arguments() + + // get service + svc := common.ServiceMap.GetService(proto, path) + if svc == nil { + logger.Errorf("cannot find service [%s] in %s", path, proto) + result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto)) + return result + } + + // get method + method := svc.Method()[methodName] + if method == nil { + logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto) + result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)) + return result + } + + in := []reflect.Value{svc.Rcvr()} + if method.CtxType() != nil { + in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later. + } + + // prepare argv + if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { + in = append(in, reflect.ValueOf(args)) + } else { + for i := 0; i < len(args); i++ { + t := reflect.ValueOf(args[i]) + if !t.IsValid() { + at := method.ArgsType()[i] + if at.Kind() == reflect.Ptr { + at = at.Elem() + } + t = reflect.New(at) + } + in = append(in, t) + } + } + + // prepare replyv + var replyv reflect.Value + if method.ReplyType() == nil && len(method.ArgsType()) > 0 { + replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) + in = append(in, replyv) + } + + returnValues := method.Method().Func.Call(in) + + var retErr interface{} + if len(returnValues) == 1 { + retErr = returnValues[0].Interface() + } else { + replyv = returnValues[0] + retErr = returnValues[1].Interface() + } + if retErr != nil { + result.SetError(retErr.(error)) + } else { + if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { + result.SetResult(replyv.Interface()) + } + } + return result } diff --git a/common/url.go b/common/url.go index 8b663051ca9f9f3bd011803b016289bba20e7ee2..9f5b50264afe5f60feff2bbccee6cb894aaa44f3 100644 --- a/common/url.go +++ b/common/url.go @@ -73,7 +73,7 @@ type baseUrl struct { Port string //url.Values is not safe map, add to avoid concurrent map read and map write error paramsLock sync.RWMutex - Params url.Values + params url.Values PrimitiveURL string ctx context.Context } @@ -110,13 +110,13 @@ func WithMethods(methods []string) option { func WithParams(params url.Values) option { return func(url *URL) { - url.Params = params + url.params = params } } func WithParamsValue(key, val string) option { return func(url *URL) { - url.Params.Set(key, val) + url.SetParam(key, val) } } @@ -191,7 +191,7 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error) return s, perrors.Errorf("url.Parse(url string{%s}), error{%v}", rawUrlString, err) } - s.Params, err = url.ParseQuery(serviceUrl.RawQuery) + s.params, err = url.ParseQuery(serviceUrl.RawQuery) if err != nil { return s, perrors.Errorf("url.ParseQuery(raw url string{%s}), error{%v}", serviceUrl.RawQuery, err) } @@ -263,7 +263,9 @@ func (c URL) String() string { "%s://%s:%s@%s:%s%s?", c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path) } - buildString += c.Params.Encode() + c.paramsLock.RLock() + buildString += c.params.Encode() + c.paramsLock.RUnlock() return buildString } @@ -322,20 +324,43 @@ func (c URL) Service() string { func (c *URL) AddParam(key string, value string) { c.paramsLock.Lock() - c.Params.Add(key, value) + c.params.Add(key, value) + c.paramsLock.Unlock() +} + +func (c *URL) SetParam(key string, value string) { + c.paramsLock.Lock() + c.params.Set(key, value) c.paramsLock.Unlock() } +func (c *URL) RangeParams(f func(key, value string) bool) { + c.paramsLock.RLock() + defer c.paramsLock.RUnlock() + for k, v := range c.params { + if !f(k, v[0]) { + break + } + } +} + func (c URL) GetParam(s string, d string) string { var r string c.paramsLock.RLock() - if r = c.Params.Get(s); len(r) == 0 { + if r = c.params.Get(s); len(r) == 0 { r = d } c.paramsLock.RUnlock() return r } + +func (c URL) GetParams() url.Values { + return c.params +} + func (c URL) GetParamAndDecoded(key string) (string, error) { + c.paramsLock.RLock() + defer c.paramsLock.RUnlock() ruleDec, err := base64.URLEncoding.DecodeString(c.GetParam(key, "")) value := string(ruleDec) return value, err @@ -356,7 +381,7 @@ func (c URL) GetRawParam(key string) string { case "path": return c.Path default: - return c.Params.Get(key) + return c.GetParam(key, "") } } @@ -365,7 +390,7 @@ func (c URL) GetParamBool(s string, d bool) bool { var r bool var err error - if r, err = strconv.ParseBool(c.Params.Get(s)); err != nil { + if r, err = strconv.ParseBool(c.GetParam(s, "")); err != nil { return d } return r @@ -374,7 +399,8 @@ func (c URL) GetParamBool(s string, d bool) bool { func (c URL) GetParamInt(s string, d int64) int64 { var r int var err error - if r, err = strconv.Atoi(c.Params.Get(s)); r == 0 || err != nil { + + if r, err = strconv.Atoi(c.GetParam(s, "")); r == 0 || err != nil { return d } return int64(r) @@ -383,7 +409,9 @@ func (c URL) GetParamInt(s string, d int64) int64 { func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { var r int var err error - if r, err = strconv.Atoi(c.Params.Get("methods." + method + "." + key)); r == 0 || err != nil { + c.paramsLock.RLock() + defer c.paramsLock.RUnlock() + if r, err = strconv.Atoi(c.GetParam("methods."+method+"."+key, "")); r == 0 || err != nil { return d } return int64(r) @@ -400,21 +428,24 @@ func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 { func (c URL) GetMethodParam(method string, key string, d string) string { var r string - if r = c.Params.Get("methods." + method + "." + key); r == "" { + if r = c.GetParam("methods."+method+"."+key, ""); r == "" { r = d } return r } + func (c *URL) RemoveParams(set *container.HashSet) { - for k, _ := range set.Items { + c.paramsLock.Lock() + defer c.paramsLock.Unlock() + for k := range set.Items { s := k.(string) - delete(c.Params, s) + delete(c.params, s) } } -func (c *URL) SetParams(m url.Values) { - for k, _ := range m { - c.Params.Set(k, m.Get(k)) +func (c *URL) SetParams(m url.Values) { + for k := range m { + c.SetParam(k, m.Get(k)) } } @@ -423,9 +454,11 @@ func (c URL) ToMap() map[string]string { paramsMap := make(map[string]string) - for k, v := range c.Params { - paramsMap[k] = v[0] - } + c.RangeParams(func(key, value string) bool { + paramsMap[key] = value + return true + }) + if c.Protocol != "" { paramsMap["protocol"] = c.Protocol } @@ -460,40 +493,24 @@ func (c URL) ToMap() map[string]string { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. + func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { mergedUrl := serviceUrl.Clone() - var methodConfigMergeFcn = []func(method string){} - //iterator the referenceUrl if serviceUrl not have the key ,merge in - - for k, v := range referenceUrl.Params { - if _, ok := mergedUrl.Params[k]; !ok { - mergedUrl.Params.Set(k, v[0]) - } - } - //loadBalance strategy config - if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" { - mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v) - } - methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { - if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" { - mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v) - } - }) - //cluster strategy config - if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" { - mergedUrl.Params.Set(constant.CLUSTER_KEY, v) - } - methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { - if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" { - mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v) + //iterator the referenceUrl if serviceUrl not have the key ,merge in + referenceUrl.RangeParams(func(key, value string) bool { + if v := mergedUrl.GetParam(key, ""); len(v) == 0 { + mergedUrl.SetParam(key, value) } + return true }) + //loadBalance,cluster,retries strategy config + methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY}) //remote timestamp - if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" { - mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v) - mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY)) + if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 { + mergedUrl.SetParam(constant.REMOTE_TIMESTAMP_KEY, v) + mergedUrl.SetParam(constant.TIMESTAMP_KEY, referenceUrl.GetParam(constant.TIMESTAMP_KEY, "")) } //finally execute methodConfigMergeFcn @@ -508,9 +525,25 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { func (c *URL) Clone() *URL { newUrl := &URL{} copier.Copy(newUrl, c) - newUrl.Params = url.Values{} - for k, v := range c.Params { - newUrl.Params[k] = v - } + newUrl.params = url.Values{} + c.RangeParams(func(key, value string) bool { + newUrl.SetParam(key, value) + return true + }) return newUrl } + +func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) { + var methodConfigMergeFcn = []func(method string){} + for _, paramKey := range paramKeys { + if v := referenceUrl.GetParam(paramKey, ""); len(v) > 0 { + mergedUrl.SetParam(paramKey, v) + } + methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { + if v := referenceUrl.GetParam(method+"."+paramKey, ""); len(v) > 0 { + mergedUrl.SetParam(method+"."+paramKey, v) + } + }) + } + return methodConfigMergeFcn +} diff --git a/common/url_test.go b/common/url_test.go index 957cf6189682987bd7e1769419eaf50961a4f7a4..41fd374a4d8a4ad3e15de1080fe46d426620909f 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -52,7 +52,7 @@ func TestNewURLWithOptions(t *testing.T) { assert.Equal(t, "127.0.0.1", u.Ip) assert.Equal(t, "8080", u.Port) assert.Equal(t, methods, u.Methods) - assert.Equal(t, params, u.Params) + assert.Equal(t, params, u.params) } func TestURL(t *testing.T) { @@ -74,7 +74,7 @@ func TestURL(t *testing.T) { assert.Equal(t, "anyhost=true&application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-"+ "provider-golang-1.0.0&environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%"+ "2C&module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&t"+ - "imestamp=1556509797245", u.Params.Encode()) + "imestamp=1556509797245", u.params.Encode()) assert.Equal(t, "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&application=BDTServi"+ "ce&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&environment=dev&interface=com.ikure"+ @@ -101,7 +101,7 @@ func TestURLWithoutSchema(t *testing.T) { assert.Equal(t, "anyhost=true&application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-"+ "provider-golang-1.0.0&environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%"+ "2C&module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&t"+ - "imestamp=1556509797245", u.Params.Encode()) + "imestamp=1556509797245", u.params.Encode()) assert.Equal(t, "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&application=BDTServi"+ "ce&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&environment=dev&interface=com.ikure"+ @@ -124,7 +124,7 @@ func TestURL_URLEqual(t *testing.T) { func TestURL_GetParam(t *testing.T) { params := url.Values{} params.Set("key", "value") - u := URL{baseUrl: baseUrl{Params: params}} + u := URL{baseUrl: baseUrl{params: params}} v := u.GetParam("key", "default") assert.Equal(t, "value", v) @@ -136,7 +136,7 @@ func TestURL_GetParam(t *testing.T) { func TestURL_GetParamInt(t *testing.T) { params := url.Values{} params.Set("key", "3") - u := URL{baseUrl: baseUrl{Params: params}} + u := URL{baseUrl: baseUrl{params: params}} v := u.GetParamInt("key", 1) assert.Equal(t, int64(3), v) @@ -148,7 +148,7 @@ func TestURL_GetParamInt(t *testing.T) { func TestURL_GetParamBool(t *testing.T) { params := url.Values{} params.Set("force", "true") - u := URL{baseUrl: baseUrl{Params: params}} + u := URL{baseUrl: baseUrl{params: params}} v := u.GetParamBool("force", false) assert.Equal(t, true, v) @@ -161,7 +161,7 @@ func TestURL_GetParamAndDecoded(t *testing.T) { rule := "host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4" params := url.Values{} params.Set("rule", base64.URLEncoding.EncodeToString([]byte(rule))) - u := URL{baseUrl: baseUrl{Params: params}} + u := URL{baseUrl: baseUrl{params: params}} v, _ := u.GetParamAndDecoded("rule") assert.Equal(t, rule, v) } @@ -196,7 +196,7 @@ func TestURL_ToMap(t *testing.T) { func TestURL_GetMethodParamInt(t *testing.T) { params := url.Values{} params.Set("methods.GetValue.timeout", "3") - u := URL{baseUrl: baseUrl{Params: params}} + u := URL{baseUrl: baseUrl{params: params}} v := u.GetMethodParamInt("GetValue", "timeout", 1) assert.Equal(t, int64(3), v) @@ -208,7 +208,7 @@ func TestURL_GetMethodParamInt(t *testing.T) { func TestURL_GetMethodParam(t *testing.T) { params := url.Values{} params.Set("methods.GetValue.timeout", "3s") - u := URL{baseUrl: baseUrl{Params: params}} + u := URL{baseUrl: baseUrl{params: params}} v := u.GetMethodParam("GetValue", "timeout", "1s") assert.Equal(t, "3s", v) @@ -220,17 +220,23 @@ func TestURL_GetMethodParam(t *testing.T) { func TestMergeUrl(t *testing.T) { referenceUrlParams := url.Values{} referenceUrlParams.Set(constant.CLUSTER_KEY, "random") + referenceUrlParams.Set(constant.RETRIES_KEY, "1") referenceUrlParams.Set("test3", "1") + referenceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "1") serviceUrlParams := url.Values{} serviceUrlParams.Set("test2", "1") serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") - referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams)) + serviceUrlParams.Set(constant.RETRIES_KEY, "2") + serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2") + referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"})) serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) mergedUrl := MergeUrl(&serviceUrl, &referenceUrl) assert.Equal(t, "random", mergedUrl.GetParam(constant.CLUSTER_KEY, "")) assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) + assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, "")) + assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, "")) } func TestURL_SetParams(t *testing.T) { @@ -239,8 +245,8 @@ func TestURL_SetParams(t *testing.T) { params := url.Values{} params.Set("key", "3") u1.SetParams(params) - assert.Equal(t, "3", u1.Params.Get("key")) - assert.Equal(t, "2.6.0", u1.Params.Get("version")) + assert.Equal(t, "3", u1.GetParam("key", "")) + assert.Equal(t, "2.6.0", u1.GetParam("version", "")) } func TestClone(t *testing.T) { @@ -248,7 +254,7 @@ func TestClone(t *testing.T) { assert.NoError(t, err) u2 := u1.Clone() assert.Equal(t, u2.Protocol, "dubbo") - assert.Equal(t, "1.0", u2.Params.Get("configVersion")) + assert.Equal(t, "1.0", u2.GetParam("configVersion", "")) u2.Protocol = "provider" assert.Equal(t, u1.Protocol, "dubbo") assert.Equal(t, u2.Protocol, "provider") diff --git a/config/base_config_test.go b/config/base_config_test.go index 4180247637a78473ac455e93c70aebbc6e2d6f8e..6dc3749e55f7efbfb1177079f613360cd0d4cc33 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -92,7 +92,7 @@ func Test_refresh(t *testing.T) { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ @@ -100,14 +100,14 @@ func Test_refresh(t *testing.T) { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", }, { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, @@ -118,9 +118,9 @@ func Test_refresh(t *testing.T) { c.SetFatherConfig(father) c.fresh() assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol) - assert.Equal(t, int64(10), father.References["MockService"].Retries) + assert.Equal(t, "10", father.References["MockService"].Retries) - assert.Equal(t, int64(10), father.References["MockService"].Methods[0].Retries) + assert.Equal(t, "10", father.References["MockService"].Methods[0].Retries) assert.Equal(t, &[]bool{false}[0], father.Check) assert.Equal(t, "dubbo", father.ApplicationConfig.Name) } @@ -188,7 +188,7 @@ func Test_appExternal_refresh(t *testing.T) { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ @@ -196,14 +196,14 @@ func Test_appExternal_refresh(t *testing.T) { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", }, { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, @@ -214,9 +214,9 @@ func Test_appExternal_refresh(t *testing.T) { c.SetFatherConfig(father) c.fresh() assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol) - assert.Equal(t, int64(10), father.References["MockService"].Retries) + assert.Equal(t, "10", father.References["MockService"].Retries) - assert.Equal(t, int64(10), father.References["MockService"].Methods[0].Retries) + assert.Equal(t, "10", father.References["MockService"].Methods[0].Retries) assert.Equal(t, &[]bool{true}[0], father.Check) assert.Equal(t, "dubbo", father.ApplicationConfig.Name) } @@ -283,7 +283,7 @@ func Test_appExternalWithoutId_refresh(t *testing.T) { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ @@ -291,14 +291,14 @@ func Test_appExternalWithoutId_refresh(t *testing.T) { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser", - Retries: 2, + Retries: "3", Loadbalance: "random", }, { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, @@ -309,9 +309,9 @@ func Test_appExternalWithoutId_refresh(t *testing.T) { c.SetFatherConfig(father) c.fresh() assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol) - assert.Equal(t, int64(10), father.References["MockService"].Retries) + assert.Equal(t, "10", father.References["MockService"].Retries) - assert.Equal(t, int64(10), father.References["MockService"].Methods[0].Retries) + assert.Equal(t, "10", father.References["MockService"].Methods[0].Retries) assert.Equal(t, &[]bool{true}[0], father.Check) assert.Equal(t, "dubbo", father.ApplicationConfig.Name) } @@ -344,7 +344,7 @@ func Test_refresh_singleRegistry(t *testing.T) { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ @@ -352,14 +352,14 @@ func Test_refresh_singleRegistry(t *testing.T) { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", }, { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, @@ -370,9 +370,9 @@ func Test_refresh_singleRegistry(t *testing.T) { c.SetFatherConfig(father) c.fresh() assert.Equal(t, "mock100://127.0.0.1:2181", father.Registry.Address) - assert.Equal(t, int64(10), father.References["MockService"].Retries) + assert.Equal(t, "10", father.References["MockService"].Retries) - assert.Equal(t, int64(10), father.References["MockService"].Methods[0].Retries) + assert.Equal(t, "10", father.References["MockService"].Methods[0].Retries) assert.Equal(t, &[]bool{false}[0], father.Check) assert.Equal(t, "dubbo", father.ApplicationConfig.Name) } @@ -440,7 +440,7 @@ func Test_refreshProvider(t *testing.T) { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ @@ -448,13 +448,13 @@ func Test_refreshProvider(t *testing.T) { InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", }, {InterfaceId: "MockService", InterfaceName: "com.MockService", Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, @@ -465,9 +465,9 @@ func Test_refreshProvider(t *testing.T) { c.SetFatherConfig(father) c.fresh() assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol) - assert.Equal(t, int64(10), father.Services["MockService"].Retries) + assert.Equal(t, "10", father.Services["MockService"].Retries) - assert.Equal(t, int64(10), father.Services["MockService"].Methods[0].Retries) + assert.Equal(t, "10", father.Services["MockService"].Methods[0].Retries) assert.Equal(t, "dubbo", father.ApplicationConfig.Name) assert.Equal(t, "20001", father.Protocols["jsonrpc1"].Port) } diff --git a/config/method_config.go b/config/method_config.go index aff322535a0fe50bf3a6e2fbf5099ec81fdbcdcd..ac9242a230816711ab84a074473890b6f96c2b11 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -28,7 +28,7 @@ type MethodConfig struct { InterfaceId string InterfaceName string Name string `yaml:"name" json:"name,omitempty" property:"name"` - Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` } diff --git a/config/reference_config.go b/config/reference_config.go index fb854a14d570288a7d41b3637991ffbdcc952596..26976f1ccfad2185a4d3fe6b51dad411eb176099 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -51,7 +51,7 @@ type ReferenceConfig struct { Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` Group string `yaml:"group" json:"group,omitempty" property:"group"` Version string `yaml:"version" json:"version,omitempty" property:"version"` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` @@ -163,7 +163,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster) urlMap.Set(constant.LOADBALANCE_KEY, refconfig.Loadbalance) - urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10)) + urlMap.Set(constant.RETRIES_KEY, refconfig.Retries) urlMap.Set(constant.GROUP_KEY, refconfig.Group) urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) @@ -189,7 +189,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { for _, v := range refconfig.Methods { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) - urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.Retries, 10)) + urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) } return urlMap diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 4dbdd31d8f0697e1a8288ea29b488772a802b87b..a81dbf06cef7d275cf6af4a7f651ff8d1600a3c9 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -90,18 +90,18 @@ func doInitConsumer() { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ { Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", }, { Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, @@ -135,18 +135,18 @@ func doInitConsumerWithSingleRegistry() { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ { Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", }, { Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", }, }, diff --git a/config/service_config.go b/config/service_config.go index c41509917dc1a79f6b9daade3fbda693f385c610..df6b8d183907a79768f855c2e33bc888a19fa9e3 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -55,7 +55,7 @@ type ServiceConfig struct { Version string `yaml:"version" json:"version,omitempty" property:"version" ` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` - Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` unexported *atomic.Bool exported *atomic.Bool @@ -168,7 +168,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster) urlMap.Set(constant.LOADBALANCE_KEY, srvconfig.Loadbalance) urlMap.Set(constant.WARMUP_KEY, srvconfig.Warmup) - urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10)) + urlMap.Set(constant.RETRIES_KEY, srvconfig.Retries) urlMap.Set(constant.GROUP_KEY, srvconfig.Group) urlMap.Set(constant.VERSION_KEY, srvconfig.Version) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) @@ -186,7 +186,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { for _, v := range srvconfig.Methods { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) - urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.Retries, 10)) + urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) urlMap.Set("methods."+v.Name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10)) } diff --git a/config/service_config_test.go b/config/service_config_test.go index d229ce4d2f92ff06d0554b0189a34781ea2fa64a..8ae67533bd1cdd1f9170efd762de51d371d0ad38 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -75,19 +75,19 @@ func doInitProvider() { Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ { Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", Weight: 200, }, { Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", Weight: 200, }, @@ -125,19 +125,19 @@ func doInitProviderWithSingleRegistry() { Protocol: "mock", Cluster: "failover", Loadbalance: "random", - Retries: 3, + Retries: "3", Group: "huadong_idc", Version: "1.0.0", Methods: []*MethodConfig{ { Name: "GetUser", - Retries: 2, + Retries: "2", Loadbalance: "random", Weight: 200, }, { Name: "GetUser1", - Retries: 2, + Retries: "2", Loadbalance: "random", Weight: 200, }, diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index a13ad86faace9f43e3cb05ca0e68115f0e532270..9fd50bb4d35a40d8532c9a644a86ad6834f8e89b 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -43,7 +43,7 @@ references: cluster: "failover" methods : - name: "GetUser" - retries: 3 + retries: "3" params: "serviceid": "soa.com.ikurento.user.UserProvider" diff --git a/config/testdata/consumer_config_with_configcenter.yml b/config/testdata/consumer_config_with_configcenter.yml index 7ee4ce7029dc4acc7132efdf9c779df8c3939f6e..0550cc89741b6a490aaba9ff8906d7dda1b3ed49 100644 --- a/config/testdata/consumer_config_with_configcenter.yml +++ b/config/testdata/consumer_config_with_configcenter.yml @@ -15,7 +15,7 @@ references: cluster: "failover" methods : - name: "GetUser" - retries: 3 + retries: "3" protocol_conf: dubbo: diff --git a/config/testdata/consumer_config_withoutProtocol.yml b/config/testdata/consumer_config_withoutProtocol.yml index e4393063ea9e2d7664b64a04c65d775527714ecc..5e57c7ddf6e82152e4f207b2d06df1443766717c 100644 --- a/config/testdata/consumer_config_withoutProtocol.yml +++ b/config/testdata/consumer_config_withoutProtocol.yml @@ -1,77 +1,78 @@ -# dubbo client yaml configure file - -filter: "" - -# client -request_timeout : "100ms" -# connect timeout -connect_timeout : "100ms" -check: true -# application config -application: - organization : "ikurento.com" - name : "BDTService" - module : "dubbogo user-info client" - version : "0.0.1" - owner : "ZX" - environment : "dev" - -registries : - - "hangzhouzk": - protocol: "zookeeper" - timeout : "3s" - address: "127.0.0.1:2181" - username: "" - password: "" - "shanghaizk": - protocol: "zookeeper" - timeout : "3s" - address: "127.0.0.1:2182" - username: "" - password: "" - -references: - "UserProvider": - registry: "hangzhouzk,shanghaizk" - filter: "" - version: "1.0" - group: "as" - interface : "com.ikurento.user.UserProvider" - url: "dubbo://127.0.0.1:20000/UserProvider" - cluster: "failover" - methods : - - name: "GetUser" - retries: 3 - params: - "serviceid": - "soa.com.ikurento.user.UserProvider" - "forks": 5 - -protocol_conf: - dubbo: - reconnect_interval: 0 - connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - pool_size: 64 - pool_ttl: 600 - # gr_pool_size is recommended to be set to [cpu core number] * 100 - gr_pool_size: 1200 - # queue_len is recommended to be set to 64 or 128 - queue_len: 64 - # queue_number is recommended to be set to gr_pool_size / 20 - queue_number: 60 - getty_session_param: - compress_encoding: false - tcp_no_delay: true - tcp_keep_alive: true - keep_alive_period: "120s" - tcp_r_buf_size: 262144 - tcp_w_buf_size: 65536 - pkg_wq_size: 512 - tcp_read_timeout: "1s" - tcp_write_timeout: "5s" - wait_timeout: "1s" - max_msg_len: 1024 - session_name: "client" +# dubbo client yaml configure file + +filter: "" + +# client +request_timeout : "100ms" +# connect timeout +connect_timeout : "100ms" +check: true +# application config +application: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info client" + version : "0.0.1" + owner : "ZX" + environment : "dev" + +registries : + + "hangzhouzk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + "shanghaizk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2182" + username: "" + password: "" + +references: + "UserProvider": + registry: "hangzhouzk,shanghaizk" + filter: "" + version: "1.0" + group: "as" + interface : "com.ikurento.user.UserProvider" + url: "dubbo://127.0.0.1:20000/UserProvider" + cluster: "failover" + methods : + - name: "GetUser" + retries: 3 + params: + "serviceid": + "soa.com.ikurento.user.UserProvider" + "forks": 5 + +protocol_conf: + dubbo: + reconnect_interval: 0 + connection_number: 2 + heartbeat_period: "5s" + session_timeout: "20s" + pool_size: 64 + pool_ttl: 600 + # gr_pool_size is recommended to be set to [cpu core number] * 100 + gr_pool_size: 1200 + # queue_len is recommended to be set to 64 or 128 + queue_len: 64 + # queue_number is recommended to be set to gr_pool_size / 20 + queue_number: 60 + getty_session_param: + compress_encoding: false + tcp_no_delay: true + tcp_keep_alive: true + keep_alive_period: "120s" + tcp_r_buf_size: 262144 + tcp_w_buf_size: 65536 + pkg_wq_size: 512 + tcp_read_timeout: "1s" + tcp_write_timeout: "5s" + wait_timeout: "1s" + max_msg_len: 1024 + session_name: "client" + diff --git a/config/testdata/provider_config_withoutProtocol.yml b/config/testdata/provider_config_withoutProtocol.yml index 9aa36d4677c70ef44b22a4ef193e67caa678af0f..2f65868d4948db9a8b99c500014ea1307569d86f 100644 --- a/config/testdata/provider_config_withoutProtocol.yml +++ b/config/testdata/provider_config_withoutProtocol.yml @@ -1,76 +1,76 @@ -# dubbo server yaml configure file - -filter: "" -# application config -application: - organization : "ikurento.com" - name : "BDTService" - module : "dubbogo user-info server" - version : "0.0.1" - owner : "ZX" - environment : "dev" - -registries : - "hangzhouzk": - protocol: "zookeeper" - timeout : "3s" - address: "127.0.0.1:2181" - username: "" - password: "" - "shanghaizk": - protocol: "zookeeper" - timeout : "3s" - address: "127.0.0.1:2182" - username: "" - password: "" - - -services: - "UserProvider": - registry: "hangzhouzk,shanghaizk" - filter: "" - # equivalent to interface of dubbo.xml - interface : "com.ikurento.user.UserProvider" - loadbalance: "random" - version: "1.0" - group: "as" - warmup: "100" - cluster: "failover" - methods: - - name: "GetUser" - retries: 1 - loadbalance: "random" - -protocols: - "dubbo": - name: "dubbo" - # while using dubbo protocol, ip cannot is 127.0.0.1, because client of java-dubbo will get 'connection refuse' - ip : "127.0.0.1" - port : 20000 - #- name: "jsonrpc" - # ip: "127.0.0.1" - # port: 20001 - -protocol_conf: - dubbo: - session_number: 700 - session_timeout: "20s" - # gr_pool_size is recommended to be set to [cpu core number] * 10 - gr_pool_size: 120 - # queue_len is recommended to be set to 64 or 128 - queue_len: 64 - # queue_number is recommended to be set to gr_pool_size / 20 - queue_number: 6 - getty_session_param: - compress_encoding: false - tcp_no_delay: true - tcp_keep_alive: true - keep_alive_period: "120s" - tcp_r_buf_size: 262144 - tcp_w_buf_size: 65536 - pkg_wq_size: 512 - tcp_read_timeout: "1s" - tcp_write_timeout: "5s" - wait_timeout: "1s" - max_msg_len: 1024 - session_name: "server" +# dubbo server yaml configure file + +filter: "" +# application config +application: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info server" + version : "0.0.1" + owner : "ZX" + environment : "dev" + +registries : + "hangzhouzk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + "shanghaizk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2182" + username: "" + password: "" + + +services: + "UserProvider": + registry: "hangzhouzk,shanghaizk" + filter: "" + # equivalent to interface of dubbo.xml + interface : "com.ikurento.user.UserProvider" + loadbalance: "random" + version: "1.0" + group: "as" + warmup: "100" + cluster: "failover" + methods: + - name: "GetUser" + retries: 1 + loadbalance: "random" + +protocols: + "dubbo": + name: "dubbo" + # while using dubbo protocol, ip cannot is 127.0.0.1, because client of java-dubbo will get 'connection refuse' + ip : "127.0.0.1" + port : 20000 + #- name: "jsonrpc" + # ip: "127.0.0.1" + # port: 20001 + +protocol_conf: + dubbo: + session_number: 700 + session_timeout: "20s" + # gr_pool_size is recommended to be set to [cpu core number] * 10 + gr_pool_size: 120 + # queue_len is recommended to be set to 64 or 128 + queue_len: 64 + # queue_number is recommended to be set to gr_pool_size / 20 + queue_number: 6 + getty_session_param: + compress_encoding: false + tcp_no_delay: true + tcp_keep_alive: true + keep_alive_period: "120s" + tcp_r_buf_size: 262144 + tcp_w_buf_size: 65536 + pkg_wq_size: 512 + tcp_read_timeout: "1s" + tcp_write_timeout: "5s" + wait_timeout: "1s" + max_msg_len: 1024 + session_name: "server" diff --git a/config_center/configurator/mock.go b/config_center/configurator/mock.go index 4ccfcbd6379f54b0d4c43333b0d537a00f7a014d..1f03d107c8f588cfd4c23c9086bb0fbe42e05fff 100644 --- a/config_center/configurator/mock.go +++ b/config_center/configurator/mock.go @@ -36,6 +36,6 @@ func (c *mockConfigurator) GetUrl() *common.URL { func (c *mockConfigurator) Configure(url *common.URL) { if cluster := c.GetUrl().GetParam(constant.CLUSTER_KEY, ""); cluster != "" { - url.Params.Set(constant.CLUSTER_KEY, cluster) + url.SetParam(constant.CLUSTER_KEY, cluster) } } diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index 458e2f44e92e17adf6c41786b9876ea7a6253f39..660c6ee315b299a9cf73d9399f572361adbafbd3 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -88,17 +88,24 @@ func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { conditionKeys.Add(constant.SIDE_KEY) conditionKeys.Add(constant.CONFIG_VERSION_KEY) conditionKeys.Add(constant.COMPATIBLE_CONFIG_KEY) - for k, _ := range c.configuratorUrl.Params { - value := c.configuratorUrl.Params.Get(k) + returnUrl := false + c.configuratorUrl.RangeParams(func(k, v string) bool { + value := c.configuratorUrl.GetParam(k, "") if strings.HasPrefix(k, "~") || k == constant.APPLICATION_KEY || k == constant.SIDE_KEY { conditionKeys.Add(k) - if len(value) != 0 && value != constant.ANY_VALUE && value != c.configuratorUrl.Params.Get(strings.TrimPrefix(k, "~")) { - return + if len(value) != 0 && value != constant.ANY_VALUE && value != url.GetParam(strings.TrimPrefix(k, "~"), "") { + returnUrl = true + return false } } + return true + }) + if returnUrl { + return } - c.configuratorUrl.RemoveParams(conditionKeys) - url.SetParams(c.configuratorUrl.Params) + configUrl := c.configuratorUrl.Clone() + configUrl.RemoveParams(conditionKeys) + url.SetParams(configUrl.GetParams()) } } } diff --git a/config_center/configurator/override_test.go b/config_center/configurator/override_test.go index 2790488f276f7fdf2dde2fc42e0c65e0916d6751..a585f4217f81a5d600ec9a48c12b3b47ff2d5322 100644 --- a/config_center/configurator/override_test.go +++ b/config_center/configurator/override_test.go @@ -39,7 +39,7 @@ func Test_configureVerison2p6(t *testing.T) { providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") configurator.Configure(&providerUrl) - assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY)) + assert.Equal(t, "failfast", providerUrl.GetParam(constant.CLUSTER_KEY, "")) } func Test_configureVerisonOverrideAddr(t *testing.T) { @@ -50,7 +50,7 @@ func Test_configureVerisonOverrideAddr(t *testing.T) { providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") configurator.Configure(&providerUrl) - assert.Equal(t, "failover", providerUrl.Params.Get(constant.CLUSTER_KEY)) + assert.Equal(t, "failover", providerUrl.GetParam(constant.CLUSTER_KEY, "")) } func Test_configureVerison2p6WithIp(t *testing.T) { @@ -61,7 +61,7 @@ func Test_configureVerison2p6WithIp(t *testing.T) { providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") configurator.Configure(&providerUrl) - assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY)) + assert.Equal(t, "failfast", providerUrl.GetParam(constant.CLUSTER_KEY, "")) } @@ -72,6 +72,6 @@ func Test_configureVerison2p7(t *testing.T) { providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") configurator.Configure(&providerUrl) - assert.Equal(t, "failfast", providerUrl.Params.Get(constant.CLUSTER_KEY)) + assert.Equal(t, "failfast", providerUrl.GetParam(constant.CLUSTER_KEY, "")) } diff --git a/examples/README.md b/examples/README.md index 497926f1d3822f7b7d33640fa18cbc2bd65bdbb9..d520c5cceecd30ea6d4cae9bf416ae079b3a3f84 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,74 +1,80 @@ # examples -Examples of go-for-apache-dubbo +Examples of dubbo-go -## dubbo +## What does this contain -#### Build by these command +* helloworld -java server -```bash -cd dubbo/java-server -sh build.sh -``` + A simplest example. It contain 'go-client', 'go-server', 'java-server' of dubbo protocol. -java client -```bash -cd dubbo/java-client -sh build.sh -``` +* general -go server + A general example. It had validated zookeeper registry and different parameter lists of service. + And it has a comprehensive testing with dubbo/jsonrpc protocol. You can refer to it to create your first complete dubbo-go project. -* sh ./assembly/\[os]/\[environment].sh -```bash -cd dubbo/go-server -# $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] -sh ./assembly/$ARCH/$ENV.sh -``` +* generic -go client -```bash -cd dubbo/go-client -# $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] -sh ./assembly/$ARCH/$ENV.sh -``` + A generic reference example. It show how to use generic reference of dubbo-go. + +* configcenter + + Some examples of different config center. There is only one -- zookeeper at present. -#### Run by these command: +## How to build and run + +> Take `helloworld` as an example java server + ```bash -cd dubbo/java-server/target +cd helloworld/dubbo/java-server +sh build.sh + +cd ./target tar -zxvf user-info-server-0.2.0-assembly.tar.gz cd ./user-info-server-0.2.0 sh ./bin/server.sh start ``` java client + ```bash -cd dubbo/java-client/target +cd helloworld/dubbo/java-client +sh build.sh + +cd ./target tar -zxvf user-info-client-0.2.0-assembly.tar.gz cd ./user-info-client-0.2.0 sh ./bin/server.sh start ``` go server + +* $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] + ```bash -cd dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release -#conf suffix appoint config file, -#such as server_zookeeper.yml when "sh ./bin/load.sh start is zookeeper", -#default server.yml -sh ./bin/load.sh start [conf suffix] +cd helloworld/dubbo/go-server +sh ./assembly/$ARCH/$ENV.sh + +cd ./target/linux/user_info_server-0.3.1-20190517-0930-release +# $SUFFIX is a suffix of config file, +# such as server_zookeeper.yml when $SUFFIX is "zookeeper", +# if $SUFFIX = "", default server.yml +sh ./bin/load.sh start $SUFFIX ``` go client + +* $ARCH = [linux, mac, windows] and $ENV = [dev, release, test] + ```bash -cd dubbo/go-client/target/linux/user_info_client-0.3.1-20190517-0921-release +cd helloworld/dubbo/go-client +sh ./assembly/$ARCH/$ENV.sh + +cd ./target/linux/user_info_client-0.3.1-20190517-0921-release # $SUFFIX is a suffix of config file, # such as client_zookeeper.yml when $SUFFIX = zookeeper", # if $SUFFIX = "", config file is client.yml sh ./bin/load_user_info_client.sh start $SUFFIX ``` - -## jsonrpc -Similar to dubbo diff --git a/examples/configcenter/zookeeper/dubbo/with-configcenter-go-client/profiles/dev/client.yml b/examples/configcenter/zookeeper/dubbo/with-configcenter-go-client/profiles/dev/client.yml new file mode 100644 index 0000000000000000000000000000000000000000..48b7b0ce95f11f21f1c94095c3c4fea3c4232f9d --- /dev/null +++ b/examples/configcenter/zookeeper/dubbo/with-configcenter-go-client/profiles/dev/client.yml @@ -0,0 +1,38 @@ +# dubbo client yaml configure file + +config_center: + protocol: "zookeeper" + address: "127.0.0.1:2181" + group: "dubbo" + config_file: "dubbo.client.properties" + +references: + "UserProvider": + protocol : "dubbo" + interface : "com.ikurento.user.UserProvider" + cluster: "failover" + methods : + - name: "GetUser" + retries: "3" + +protocol_conf: + dubbo: + reconnect_interval: 0 + connection_number: 2 + heartbeat_period: "5s" + session_timeout: "20s" + pool_size: 64 + pool_ttl: 600 + getty_session_param: + compress_encoding: false + tcp_no_delay: true + tcp_keep_alive: true + keep_alive_period: "120s" + tcp_r_buf_size: 262144 + tcp_w_buf_size: 65536 + pkg_wq_size: 512 + tcp_read_timeout: "1s" + tcp_write_timeout: "5s" + wait_timeout: "1s" + max_msg_len: 10240 + session_name: "client" diff --git a/examples/configcenter/zookeeper/dubbo/with-configcenter-go-server/profiles/dev/server.yml b/examples/configcenter/zookeeper/dubbo/with-configcenter-go-server/profiles/dev/server.yml new file mode 100644 index 0000000000000000000000000000000000000000..219b92aebf1262dbbccf6291301387447589e500 --- /dev/null +++ b/examples/configcenter/zookeeper/dubbo/with-configcenter-go-server/profiles/dev/server.yml @@ -0,0 +1,39 @@ +# dubbo server yaml configure file + + +config_center: + protocol: "zookeeper" + address: "127.0.0.1:2181" + group: "dubbo" + config_file: "dubbo.properties" + +services: + "UserProvider": + protocol : "dubbo" + # 鐩稿綋浜巇ubbo.xml涓殑interface + interface : "com.ikurento.user.UserProvider" + loadbalance: "random" + warmup: "100" + cluster: "failover" + methods: + - name: "GetUser" + retries: "1" + loadbalance: "random" + +protocol_conf: + dubbo: + session_number: 700 + session_timeout: "20s" + getty_session_param: + compress_encoding: false + tcp_no_delay: true + tcp_keep_alive: true + keep_alive_period: "120s" + tcp_r_buf_size: 262144 + tcp_w_buf_size: 65536 + pkg_wq_size: 512 + tcp_read_timeout: "1s" + tcp_write_timeout: "5s" + wait_timeout: "1s" + max_msg_len: 1024 + session_name: "server" diff --git a/examples/consul/go-client/config/client.yml b/examples/consul/go-client/config/client.yml index 8a4c088bd669cc311fc16d719cdc672c66db2624..556ac2b1121f40d9fa3774f29beb6bdb510991bf 100644 --- a/examples/consul/go-client/config/client.yml +++ b/examples/consul/go-client/config/client.yml @@ -21,7 +21,7 @@ references: cluster: "failover" methods : - name: "SayHello" - retries: 3 + retries: "3" protocol_conf: dubbo: diff --git a/examples/consul/go-server/config/server.yml b/examples/consul/go-server/config/server.yml index b251860d7a4a902c564de24543e72dfc4e3daa07..f3c169ab782be9d94c7258c65474e2cafde0e1cb 100644 --- a/examples/consul/go-server/config/server.yml +++ b/examples/consul/go-server/config/server.yml @@ -23,7 +23,7 @@ services: cluster: "failover" methods: - name: "SayHello" - retries: 1 + retries: "1" loadbalance: "random" protocols: diff --git a/examples/general/dubbo/go-client/app/client.go b/examples/general/dubbo/go-client/app/client.go index 60adad356b8a8ff843034dd8e6086a2b832c7792..f3ac4b99ab0abdd472056682ce8f4b3c4b41bc8b 100644 --- a/examples/general/dubbo/go-client/app/client.go +++ b/examples/general/dubbo/go-client/app/client.go @@ -33,9 +33,9 @@ import ( "github.com/apache/dubbo-go/common/logger" _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" - _ "github.com/apache/dubbo-go/registry/protocol" - _ "github.com/apache/dubbo-go/filter/impl" + _ "github.com/apache/dubbo-go/protocol/dubbo" + _ "github.com/apache/dubbo-go/registry/protocol" _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" @@ -168,7 +168,7 @@ func test1() { time.Sleep(3e9) - println("\n\n\nstart to test dubbo") + println("\n\n\nstart to test1 dubbo") user := &User{} err = userProvider1.GetUser(context.TODO(), []interface{}{"A003"}, user) if err != nil { @@ -176,7 +176,7 @@ func test1() { } println("response result: %v", user) - println("\n\n\nstart to test dubbo - GetUser0") + println("\n\n\nstart to test1 dubbo - GetUser0") ret, err := userProvider1.GetUser0("A003", "Moorse") if err != nil { panic(err) @@ -190,7 +190,7 @@ func test1() { } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser") + println("\n\n\nstart to test1 dubbo - getUser") user = &User{} var i int32 = 1 err = userProvider1.GetUser2(context.TODO(), []interface{}{i}, user) @@ -206,7 +206,7 @@ func test1() { } println("succ!") - println("\n\n\nstart to test dubbo - getErr") + println("\n\n\nstart to test1 dubbo - getErr") user = &User{} err = userProvider1.GetErr(context.TODO(), []interface{}{"A003"}, user) if err == nil { @@ -214,7 +214,7 @@ func test1() { } println("getErr - error: %v", err) - println("\n\n\nstart to test dubbo illegal method") + println("\n\n\nstart to test1 dubbo illegal method") err = userProvider1.GetUser1(context.TODO(), []interface{}{"A003"}, user) if err == nil { panic("err is nil") @@ -232,7 +232,7 @@ func test2() { time.Sleep(3e9) - println("\n\n\nstart to test dubbo") + println("\n\n\nstart to test2 dubbo") user := &User{} err = userProvider2.GetUser(context.TODO(), []interface{}{"A003"}, user) if err != nil { @@ -240,21 +240,21 @@ func test2() { } println("response result: %v", user) - println("\n\n\nstart to test dubbo - GetUser0") + println("\n\n\nstart to test2 dubbo - GetUser0") ret, err := userProvider2.GetUser0("A003", "Moorse") if err != nil { panic(err) } println("response result: %v", ret) - println("\n\n\nstart to test dubbo - GetUsers") + println("\n\n\nstart to test2 dubbo - GetUsers") ret1, err := userProvider2.GetUsers([]interface{}{[]interface{}{"A002", "A003"}}) if err != nil { panic(err) } println("response result: %v", ret1) - println("\n\n\nstart to test dubbo - getUser") + println("\n\n\nstart to test2 dubbo - getUser") user = &User{} var i int32 = 1 err = userProvider2.GetUser2(context.TODO(), []interface{}{i}, user) @@ -263,14 +263,14 @@ func test2() { } println("response result: %v", user) - println("\n\n\nstart to test dubbo - GetUser3") + println("\n\n\nstart to test2 dubbo - GetUser3") err = userProvider2.GetUser3() if err != nil { panic(err) } println("succ!") - println("\n\n\nstart to test dubbo - getErr") + println("\n\n\nstart to test2 dubbo - getErr") user = &User{} err = userProvider2.GetErr(context.TODO(), []interface{}{"A003"}, user) if err == nil { @@ -278,7 +278,7 @@ func test2() { } println("getErr - error: %v", err) - println("\n\n\nstart to test dubbo illegal method") + println("\n\n\nstart to test2 dubbo illegal method") err = userProvider2.GetUser1(context.TODO(), []interface{}{"A003"}, user) if err == nil { panic("err is nil") diff --git a/examples/general/dubbo/go-client/profiles/dev/client.yml b/examples/general/dubbo/go-client/profiles/dev/client.yml index 3e22e97c0a2b6f62340bfeb753805ca4d143514b..002da1ddb34f1fe6bfb7122e631153823960f8f5 100644 --- a/examples/general/dubbo/go-client/profiles/dev/client.yml +++ b/examples/general/dubbo/go-client/profiles/dev/client.yml @@ -39,7 +39,7 @@ references: cluster: "failover" methods : - name: "GetUser" - retries: 3 + retries: "3" "UserProvider1": registry: "hangzhouzk" protocol: "dubbo" @@ -48,7 +48,7 @@ references: cluster: "failover" methods: - name: "GetUser" - retries: 3 + retries: "3" "UserProvider2": registry: "hangzhouzk" protocol: "dubbo" @@ -58,7 +58,7 @@ references: cluster: "failover" methods: - name: "GetUser" - retries: 3 + retries: "3" protocol_conf: dubbo: diff --git a/examples/general/dubbo/go-server/profiles/dev/server.yml b/examples/general/dubbo/go-server/profiles/dev/server.yml index c4aeedf8eb03d513267cdd6216ce7f5a4e994bac..c936e12828e3850f23a7a20e5e5361f820e04676 100644 --- a/examples/general/dubbo/go-server/profiles/dev/server.yml +++ b/examples/general/dubbo/go-server/profiles/dev/server.yml @@ -38,7 +38,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" "UserProvider1": registry: "hangzhouzk" @@ -50,7 +50,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" "UserProvider2": registry: "hangzhouzk" @@ -63,7 +63,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" protocols: diff --git a/examples/general/jsonrpc/go-client/profiles/dev/client.yml b/examples/general/jsonrpc/go-client/profiles/dev/client.yml index ed757d7eca15f8f4e5548886f679997cf7f675a0..98c5c820e1af2d96702dbae27abf9af1d5129497 100644 --- a/examples/general/jsonrpc/go-client/profiles/dev/client.yml +++ b/examples/general/jsonrpc/go-client/profiles/dev/client.yml @@ -38,7 +38,7 @@ references: cluster: "failover" methods : - name: "GetUser" - retries: 3 + retries: "3" "UserProvider1": registry: "hangzhouzk" protocol: "jsonrpc" @@ -47,7 +47,7 @@ references: cluster: "failover" methods: - name: "GetUser" - retries: 3 + retries: "3" "UserProvider2": registry: "hangzhouzk" protocol: "jsonrpc" @@ -57,4 +57,4 @@ references: cluster: "failover" methods: - name: "GetUser" - retries: 3 + retries: "3" diff --git a/examples/general/jsonrpc/go-server/profiles/dev/server.yml b/examples/general/jsonrpc/go-server/profiles/dev/server.yml index e5983dcfc21bcfee445f84523cc37f6a84e10fa2..f4a2766f48309ec9191810fd60b393d472574ce8 100644 --- a/examples/general/jsonrpc/go-server/profiles/dev/server.yml +++ b/examples/general/jsonrpc/go-server/profiles/dev/server.yml @@ -36,7 +36,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" "UserProvider1": registry: "hangzhouzk" @@ -48,7 +48,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" "UserProvider2": registry: "hangzhouzk" @@ -61,7 +61,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" protocols: diff --git a/examples/helloworld/dubbo/go-client/profiles/dev/client.yml b/examples/helloworld/dubbo/go-client/profiles/dev/client.yml index 8a318fd040e6960702ede165a7b4be407ecba241..6fcc678342d17c6a58ad0a11754fb396514f501b 100644 --- a/examples/helloworld/dubbo/go-client/profiles/dev/client.yml +++ b/examples/helloworld/dubbo/go-client/profiles/dev/client.yml @@ -34,7 +34,7 @@ references: cluster: "failover" methods : - name: "GetUser" - retries: 3 + retries: "3" protocol_conf: diff --git a/examples/helloworld/dubbo/go-server/profiles/dev/server.yml b/examples/helloworld/dubbo/go-server/profiles/dev/server.yml index ab7f63d7e4f93876e56f775a994ed81614e6dfd7..27e9d55c8dd7f50320f90a38ac6b501425d5ba83 100644 --- a/examples/helloworld/dubbo/go-server/profiles/dev/server.yml +++ b/examples/helloworld/dubbo/go-server/profiles/dev/server.yml @@ -28,7 +28,7 @@ services: cluster: "failover" methods: - name: "GetUser" - retries: 1 + retries: "1" loadbalance: "random" protocols: diff --git a/examples/hystrixfilter/dubbo/with-hystrix-go-client/profiles/dev/client.yml b/examples/hystrixfilter/dubbo/with-hystrix-go-client/profiles/dev/client.yml index 71b0c2bf1c5620046a9c9d8c85fbbe4c95867225..8602b3a190aa6b89497b6d0095334ef6ad9ea216 100644 --- a/examples/hystrixfilter/dubbo/with-hystrix-go-client/profiles/dev/client.yml +++ b/examples/hystrixfilter/dubbo/with-hystrix-go-client/profiles/dev/client.yml @@ -41,7 +41,7 @@ references: cluster: "failover" methods : - name: "GetUser" - retries: 3 + retries: "3" protocol_conf: dubbo: diff --git a/filter/impl/echo_filter.go b/filter/impl/echo_filter.go index 5eb5a37fa500bd8c180d879240d1c1e367df31ce..18e42c8cb2b15acb27573c5e24f11a8b69e0d496 100644 --- a/filter/impl/echo_filter.go +++ b/filter/impl/echo_filter.go @@ -43,7 +43,8 @@ func (ef *EchoFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invoc logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 { return &protocol.RPCResult{ - Rest: invocation.Arguments()[0], + Rest: invocation.Arguments()[0], + Attrs: invocation.Attachments(), } } diff --git a/go.mod b/go.mod index fa66c6a6473e4b1d3c1f6757d6465d0426af2407..ec3c24c278712e7aaf80638b67971bf6f2ced516 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect - github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 + github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible diff --git a/go.sum b/go.sum index 4add9fcf790eb8c97ef41c131b06a2dc794dc0ca..1452a762985e615d49a40df33d8ee8166940a4c7 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 h1:Ku+3LFRYVelgo/INS9893QOUeIiKNeNKzK3CzDcqt/4= +github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 6ac5e7215429338aa7cabb646b3afd3f1a136b68..4927f51cf7c964ac9bb26c1089231b4bdcc65157 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -59,7 +59,7 @@ func init() { return } dubboConf := protocolConf.(map[interface{}]interface{})[DUBBO] - if protocolConf == nil { + if dubboConf == nil { logger.Warnf("dubboConf is nil") return } @@ -150,46 +150,74 @@ func NewClient(opt Options) *Client { return c } +type Request struct { + addr string + svcUrl common.URL + method string + args interface{} + atta map[string]string +} + +func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request { + return &Request{ + addr: addr, + svcUrl: svcUrl, + method: method, + args: args, + atta: atta, + } +} + +type Response struct { + reply interface{} + atta map[string]string +} + +func NewResponse(reply interface{}, atta map[string]string) *Response { + return &Response{ + reply: reply, + atta: atta, + } +} + // call one way -func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}) error { +func (c *Client) CallOneway(request *Request) error { - return perrors.WithStack(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil)) + return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) } -// if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}) error { +// if @response is nil, the transport layer will get the response without notify the invoker. +func (c *Client) Call(request *Request, response *Response) error { ct := CT_TwoWay - if reply == nil { + if response.reply == nil { ct = CT_OneWay } - return perrors.WithStack(c.call(ct, addr, svcUrl, method, args, reply, nil)) + return perrors.WithStack(c.call(ct, request, response, nil)) } -func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args interface{}, - callback AsyncCallback, reply interface{}) error { +func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error { - return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback)) + return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) } -func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, - args, reply interface{}, callback AsyncCallback) error { +func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error { p := &DubboPackage{} - p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") - p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") - p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, "") - p.Service.Method = method + p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") + p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "") + p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") + p.Service.Method = request.method p.Service.Timeout = c.opts.RequestTimeout p.Header.SerialID = byte(S_Dubbo) - p.Body = args + p.Body = hessian.NewRequest(request.args, request.atta) var rsp *PendingResponse if ct != CT_OneWay { p.Header.Type = hessian.PackageRequest_TwoWay rsp = NewPendingResponse() - rsp.reply = reply + rsp.response = response rsp.callback = callback } else { p.Header.Type = hessian.PackageRequest @@ -200,7 +228,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string session getty.Session conn *gettyRPCClient ) - conn, session, err = c.selectSession(addr) + conn, session, err = c.selectSession(request.addr) if err != nil { return perrors.WithStack(err) } @@ -259,6 +287,7 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, if pkg == nil { pkg = &DubboPackage{} + pkg.Body = hessian.NewRequest([]interface{}{}, nil) pkg.Body = []interface{}{} pkg.Header.Type = hessian.PackageHeartbeat pkg.Header.SerialID = byte(S_Dubbo) diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index cd961d382933443e37a08c21b4e4de5edb971860..eb1f15c862a910120e118c06bf9b572e93f58832 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -33,6 +33,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" ) @@ -50,7 +51,7 @@ func TestClient_CallOneway(t *testing.T) { c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) //user := &User{} - err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}) + err := c.CallOneway(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil)) assert.NoError(t, err) // destroy @@ -70,51 +71,56 @@ func TestClient_Call(t *testing.T) { } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) - user := &User{} - err := c.Call("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, user) + var ( + user *User + err error + ) + + user = &User{} + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.NotEqual(t, "", user.Id) assert.NotEqual(t, "", user.Name) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) - err = c.Call("127.0.0.1:20000", url, "GetUser1", []interface{}{}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser1", []interface{}{}, nil), NewResponse(user, nil)) assert.NoError(t, err) - err = c.Call("127.0.0.1:20000", url, "GetUser2", []interface{}{}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser2", []interface{}{}, nil), NewResponse(user, nil)) assert.EqualError(t, err, "error") user2 := []interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser3", []interface{}{}, &user2) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser3", []interface{}{}, nil), NewResponse(&user2, nil)) assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) user2 = []interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, &user2) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil), NewResponse(&user2, nil)) assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) user3 := map[interface{}]interface{}{} - err = c.Call("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, &user3) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil), NewResponse(&user3, nil)) assert.NoError(t, err) assert.NotNil(t, user3) assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"]) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "", Name: ""}, *user) user = &User{} - err = c.Call("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, user) + err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, nil), NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: ""}, *user) @@ -138,10 +144,10 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, func(response CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) { + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() - }, user) + }, NewResponse(user, nil)) assert.NoError(t, err) assert.Equal(t, User{}, *user) @@ -209,7 +215,9 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 98c29a4e5bc576f7e37f74c0c0abbbab4687717b..a878ffd91e29d6949870ec25fed9481f301b435a 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -91,9 +91,8 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) if !ok { return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) - } else { - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply} } + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } // read body @@ -111,14 +110,15 @@ type PendingResponse struct { start time.Time readStart time.Time callback AsyncCallback - reply interface{} + response *Response done chan struct{} } func NewPendingResponse() *PendingResponse { return &PendingResponse{ - start: time.Now(), - done: make(chan struct{}), + start: time.Now(), + response: &Response{}, + done: make(chan struct{}), } } @@ -127,6 +127,6 @@ func (r PendingResponse) GetCallResponse() CallResponse { Cause: r.err, Start: r.start, ReadStart: r.readStart, - Reply: r.reply, + Reply: r.response, } } diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go index 52bb1fc130bb2dad866799f01c43d11ffd10a220..c192c2294db5597517ace011224e34f8affefb1f 100644 --- a/protocol/dubbo/codec_test.go +++ b/protocol/dubbo/codec_test.go @@ -64,11 +64,11 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type) assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) assert.Equal(t, int64(10086), pkgres.Header.ID) - assert.Equal(t, "2.5.4", pkgres.Body.([]interface{})[0]) + assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0]) assert.Equal(t, "path", pkgres.Body.([]interface{})[1]) assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) - assert.Equal(t, map[interface{}]interface{}{"group": "", "interface": "Service", "path": "path", "timeout": "1000"}, pkgres.Body.([]interface{})[6]) + assert.Equal(t, map[string]string{"dubbo": "2.0.2", "group": "", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6]) } diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 46b3dcc8463d8406c494d80149b28f0dba7444d0..4bfc1324cf9dcdb2412e7026214f6a3951a9c7c1 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -34,7 +34,7 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) -var Err_No_Reply = perrors.New("request need @reply") +var Err_No_Reply = perrors.New("request need @response") type DubboInvoker struct { protocol.BaseInvoker @@ -64,21 +64,23 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { logger.Errorf("ParseBool - error: %v", err) async = false } + response := NewResponse(inv.Reply(), nil) if async { if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { - result.Err = di.client.AsyncCall(url.Location, url, inv.MethodName(), inv.Arguments(), callBack, inv.Reply()) + result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { - result.Err = di.client.CallOneway(url.Location, url, inv.MethodName(), inv.Arguments()) + result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) } } else { if inv.Reply() == nil { result.Err = Err_No_Reply } else { - result.Err = di.client.Call(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Reply()) + result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) } } if result.Err == nil { result.Rest = inv.Reply() + result.Attrs = response.atta } logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 09a4c128b600e605de616a65027da9b2ce6fcb20..0a765356f7353829c8486fddba986e3a444441a1 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -49,12 +49,13 @@ func TestDubboInvoker_Invoke(t *testing.T) { user := &User{} inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}), - invocation.WithReply(user)) + invocation.WithReply(user), invocation.WithAttachments(map[string]string{"test_key": "test_value"})) // Call res := invoker.Invoke(inv) assert.NoError(t, res.Error()) assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) + assert.Equal(t, "test_value", res.Attachments()["test_key"]) // test attachments for request/response // CallOneway inv.SetAttachments(constant.ASYNC_KEY, "true") @@ -65,7 +66,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { lock := sync.Mutex{} lock.Lock() inv.SetCallBack(func(response CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(inv) @@ -75,7 +76,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { inv.SetAttachments(constant.ASYNC_KEY, "false") inv.SetReply(nil) res = invoker.Invoke(inv) - assert.EqualError(t, res.Error(), "request need @reply") + assert.EqualError(t, res.Error(), "request need @response") // destroy lock.Lock() diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 4438a0b3d0e32127536b818806d190a2d2a5a2ba..59d1ea05160696754b46dfead5713684aa7a94f7 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -17,13 +17,16 @@ package dubbo +import ( + "sync" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol" - "sync" ) const ( diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 26ce4a1906d5d6fe425f23984586914c293f47a4..a6b0bc1df3cf2eb46e07c9dab149d04f62f78012 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -19,7 +19,6 @@ package dubbo import ( "context" - "github.com/apache/dubbo-go/common/constant" "testing" ) @@ -29,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index aa208284825665dae4c23f871117a7f34c548d16..1a7b002819271d7841e4749881d2ab379f9a7240 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -18,10 +18,8 @@ package dubbo import ( - "context" "fmt" "net/url" - "reflect" "sync" "time" ) @@ -107,6 +105,8 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { pendingResponse.err = p.Err } + pendingResponse.response.atta = p.Body.(*Response).atta + if pendingResponse.callback == nil { pendingResponse.done <- struct{}{} } else { @@ -209,6 +209,28 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { twoway = false } + defer func() { + if e := recover(); e != nil { + p.Header.ResponseStatus = hessian.Response_SERVER_ERROR + if err, ok := e.(error); ok { + logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err)) + p.Body = perrors.WithStack(err) + } else if err, ok := e.(string); ok { + logger.Errorf("OnMessage panic: %+v", perrors.New(err)) + p.Body = perrors.New(err) + } else { + logger.Errorf("OnMessage panic: %+v, this is impossible.", e) + p.Body = e + } + + if !twoway { + return + } + h.reply(session, p, hessian.PackageResponse) + } + + }() + u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}), common.WithParamsValue(constant.GROUP_KEY, p.Service.Group), common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface), @@ -224,27 +246,18 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { } invoker := exporter.(protocol.Exporter).GetInvoker() if invoker != nil { - result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), map[string]string{ - constant.PATH_KEY: p.Service.Path, - constant.GROUP_KEY: p.Service.Group, - constant.INTERFACE_KEY: p.Service.Interface, - constant.VERSION_KEY: p.Service.Version, - })) + result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}), + p.Body.(map[string]interface{})["attachments"].(map[string]string))) if err := result.Error(); err != nil { p.Header.ResponseStatus = hessian.Response_OK - p.Body = err - h.reply(session, p, hessian.PackageResponse) - return - } - if res := result.Result(); res != nil { + p.Body = hessian.NewResponse(nil, err, result.Attachments()) + } else { + res := result.Result() p.Header.ResponseStatus = hessian.Response_OK - p.Body = res - h.reply(session, p, hessian.PackageResponse) - return + p.Body = hessian.NewResponse(res, nil, result.Attachments()) } } - h.callService(p, nil) if !twoway { return } @@ -276,91 +289,6 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -func (h *RpcServerHandler) callService(req *DubboPackage, ctx context.Context) { - - defer func() { - if e := recover(); e != nil { - req.Header.ResponseStatus = hessian.Response_SERVER_ERROR - if err, ok := e.(error); ok { - logger.Errorf("callService panic: %+v", perrors.WithStack(err)) - req.Body = perrors.WithStack(err) - } else if err, ok := e.(string); ok { - logger.Errorf("callService panic: %+v", perrors.New(err)) - req.Body = perrors.New(err) - } else { - logger.Errorf("callService panic: %+v, this is impossible.", e) - req.Body = e - } - } - }() - - svcIf := req.Body.(map[string]interface{})["service"] - if svcIf == nil { - logger.Errorf("service not found!") - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - req.Body = perrors.New("service not found") - return - } - svc := svcIf.(*common.Service) - method := svc.Method()[req.Service.Method] - if method == nil { - logger.Errorf("method not found!") - req.Header.ResponseStatus = hessian.Response_BAD_REQUEST - req.Body = perrors.New("method not found") - return - } - - in := []reflect.Value{svc.Rcvr()} - if method.CtxType() != nil { - in = append(in, method.SuiteContext(ctx)) - } - - // prepare argv - argv := req.Body.(map[string]interface{})["args"] - if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { - in = append(in, reflect.ValueOf(argv)) - } else { - for i := 0; i < len(argv.([]interface{})); i++ { - t := reflect.ValueOf(argv.([]interface{})[i]) - if !t.IsValid() { - at := method.ArgsType()[i] - if at.Kind() == reflect.Ptr { - at = at.Elem() - } - t = reflect.New(at) - } - in = append(in, t) - } - } - - // prepare replyv - var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } - - returnValues := method.Method().Func.Call(in) - - var retErr interface{} - if len(returnValues) == 1 { - retErr = returnValues[0].Interface() - } else { - replyv = returnValues[0] - retErr = returnValues[1].Interface() - } - if retErr != nil { - req.Header.ResponseStatus = hessian.Response_OK - req.Body = retErr - } else { - if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { - req.Body = replyv.Interface() - } else { - req.Body = nil - } - } -} - func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 8c6c8a5a53af4df9a89eae5db5752eb07f3aa446..a57c29f890cc76aa57b316aba8bead1bb76cf6ff 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -23,7 +23,7 @@ import ( ) import ( - hessian "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) @@ -63,7 +63,7 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface } pkg.Err = pkg.Body.(*hessian.Response).Exception - pkg.Body = pkg.Body.(*hessian.Response).RspObj + pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } @@ -118,7 +118,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface if len(req) > 0 { var dubboVersion, argsTypes string var args []interface{} - var attachments map[interface{}]interface{} + var attachments map[string]string if req[0] != nil { dubboVersion = req[0].(string) } @@ -138,14 +138,18 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface args = req[5].([]interface{}) } if req[6] != nil { - attachments = req[6].(map[interface{}]interface{}) + attachments = req[6].(map[string]string) } - pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) - if pkg.Service.Path == "" && attachments[constant.PATH_KEY] != nil { - pkg.Service.Path = attachments[constant.PATH_KEY].(string) + if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { + pkg.Service.Path = attachments[constant.PATH_KEY] } - if attachments[constant.GROUP_KEY] != nil { - pkg.Service.Group = attachments[constant.GROUP_KEY].(string) + if _, ok := attachments[constant.INTERFACE_KEY]; ok { + pkg.Service.Interface = attachments[constant.INTERFACE_KEY] + } else { + pkg.Service.Interface = pkg.Service.Path + } + if len(attachments[constant.GROUP_KEY]) > 0 { + pkg.Service.Group = attachments[constant.GROUP_KEY] } pkg.Body = map[string]interface{}{ "dubboVersion": dubboVersion, diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 8daeee05e2ccc115aad590c2ed80a269360482d5..5f93a794d253bb8a0af3dae42ea7aa627751bbdb 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -48,7 +48,7 @@ func init() { return } dubboConf := protocolConf.(map[interface{}]interface{})[DUBBO] - if protocolConf == nil { + if dubboConf == nil { logger.Warnf("dubboConf is nil") return } diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 1f446803fd6c5f174f51e3fe9496c49ae4991691..9be55e247a730460a3adee5622fa978ef2defbfb 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -32,6 +32,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" ) @@ -60,7 +61,9 @@ func TestHTTPClient_Call(t *testing.T) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) client := NewHTTPClient(&HTTPOptions{}) diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go index bc88759bf522a35a30e8585429f1db614c3a15ce..8c910339858f4960ad0e394ae6271863d7654adc 100644 --- a/protocol/jsonrpc/jsonrpc_invoker_test.go +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -47,7 +48,9 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) { "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider") assert.NoError(t, err) - proto.Export(protocol.NewBaseInvoker(url)) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) time.Sleep(time.Second * 2) client := NewHTTPClient(&HTTPOptions{ diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 6b3a39c68b4fdb417e8d2efaec4a43806acb2219..dc85e0f5e76fd07dbcd11646ae529c98e5323a15 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "net" "net/http" - "reflect" "runtime" "runtime/debug" "sync" @@ -330,13 +329,16 @@ func serveRequest(ctx context.Context, constant.VERSION_KEY: codec.req.Version, })) if err := result.Error(); err != nil { - if errRsp := sendErrorResp(header, []byte(err.Error())); errRsp != nil { + rspStream, err := codec.Write(err.Error(), invalidRequest) + if err != nil { + return perrors.WithStack(err) + } + if errRsp := sendErrorResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s", header, err, errRsp) - return perrors.WithStack(errRsp) } - } - if res := result.Result(); res != nil { + } else { + res := result.Result() rspStream, err := codec.Write("", res) if err != nil { return perrors.WithStack(err) @@ -344,102 +346,9 @@ func serveRequest(ctx context.Context, if errRsp := sendResp(header, rspStream); errRsp != nil { logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s", header, err, errRsp) - return perrors.WithStack(errRsp) - } - } - } - // get method - svc := common.ServiceMap.GetService(JSONRPC, path) - if svc == nil { - return perrors.New("cannot find svc " + path) - } - method := svc.Method()[methodName] - if method == nil { - return perrors.New("cannot find method " + methodName + " of svc " + path) - } - - in := []reflect.Value{svc.Rcvr()} - if method.CtxType() != nil { - in = append(in, method.SuiteContext(ctx)) - } - - // prepare argv - if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { - in = append(in, reflect.ValueOf(args)) - } else { - for i := 0; i < len(args); i++ { - t := reflect.ValueOf(args[i]) - if !t.IsValid() { - at := method.ArgsType()[i] - if at.Kind() == reflect.Ptr { - at = at.Elem() - } - t = reflect.New(at) } - in = append(in, t) } } - // prepare replyv - var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } - - returnValues := method.Method().Func.Call(in) - - var ( - retErr interface{} - errMsg string - ) - if len(returnValues) == 1 { - retErr = returnValues[0].Interface() - } else { - replyv = returnValues[0] - retErr = returnValues[1].Interface() - } - if retErr != nil { - errMsg = retErr.(error).Error() - } - - // write response - code := 200 - var rspReply interface{} - if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) { - rspReply = replyv.Interface() - } - if len(errMsg) != 0 { - code = 500 - rspReply = invalidRequest - } - rspStream, err := codec.Write(errMsg, rspReply) - if err != nil { - return perrors.WithStack(err) - } - rsp := &http.Response{ - StatusCode: code, - ProtoMajor: 1, - ProtoMinor: 1, - Header: make(http.Header), - ContentLength: int64(len(rspStream)), - Body: ioutil.NopCloser(bytes.NewReader(rspStream)), - } - delete(header, "Content-Type") - delete(header, "Content-Length") - delete(header, "Timeout") - for k, v := range header { - rsp.Header.Set(k, v) - } - - rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize)) - rspBuf.Reset() - if err = rsp.Write(rspBuf); err != nil { - logger.Warnf("rsp.Write(rsp:%#v) = error:%s", rsp, err) - return nil - } - if _, err = rspBuf.WriteTo(conn); err != nil { - logger.Warnf("rspBuf.WriteTo(conn:%#v) = error:%s", conn, err) - } return nil } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index b1392fff511dba2e2cbedf2547d6be2d4276a912..7c58fabea3cccf5a39e1622fedd4a3a297e05983 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -62,7 +62,7 @@ func (pfw *ProtocolFilterWrapper) Destroy() { } func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { - filtName := invoker.GetUrl().Params.Get(key) + filtName := invoker.GetUrl().GetParam(key, "") if filtName == "" { return invoker } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index 98bd206325ffd38a325b667f7f0bc2684bbf626f..dc376313549c24da1cc6cb64a42e8445ef4fe346 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -32,7 +32,6 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" - //"github.com/apache/dubbo-go/filter/impl" "github.com/apache/dubbo-go/protocol" ) diff --git a/registry/consul/utils.go b/registry/consul/utils.go index 6ecb0573ec5683d66795f539a8beb2eff5b6be2c..ee17fcc0df43228e26b40f3ac3f992147fc33d6e 100644 --- a/registry/consul/utils.go +++ b/registry/consul/utils.go @@ -62,9 +62,12 @@ func buildService(url common.URL) (*consul.AgentServiceRegistration, error) { // tags tags := make([]string, 0, 8) - for k := range url.Params { - tags = append(tags, k+"="+url.Params.Get(k)) - } + + url.RangeParams(func(key, value string) bool { + tags = append(tags, key+"="+value) + return true + }) + tags = append(tags, "dubbo") // meta diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index e2e18020e2abf6bcffec69d0bef07029f2aa8cf7..b3c1d35aaa66b3437ff89807fba2df0a383921cb 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -74,7 +74,7 @@ func TestSubscribe_Group(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") - suburl.Params.Set(constant.CLUSTER_KEY, "mock") + suburl.SetParam(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index ca02d1f2ef2854e50bb62f1f7c193922852e04d8..8bb1ff430eb8e674666266af5169bba4b4e22d30 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -260,10 +260,11 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { } params := url.Values{} - for k, v := range svc.Params { - params[k] = v - } + svc.RangeParams(func(key, value string) bool { + params[key] = []string{value} + return true + }) params.Add("pid", processID) params.Add("ip", localIP) params.Add("anyhost", "true") diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index 4b9fa59afab5776a8f9a979c45f2d1267c522d4c..6da9ad9d7d13bb0b6f8d1dab9b582669735ceec8 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -63,7 +63,7 @@ func (suite *RegistryTestSuite) TestSubscribe() { } //consumer register - regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) reg2 := initRegistry(t) reg2.Register(url) diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 3699fab3c561bd8831b35462dc4a92ec27df94db..f1a78264ce431745181ca6f633eda642cf90a31e 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -125,10 +125,13 @@ func appendParam(target *bytes.Buffer, url common.URL, key string) { func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam { category := getCategory(url) - params := make(map[string]string, len(url.Params)+3) - for k := range url.Params { - params[k] = url.Params.Get(k) - } + params := make(map[string]string) + + url.RangeParams(func(key, value string) bool { + params[key] = value + return true + }) + params[constant.NACOS_CATEGORY_KEY] = category params[constant.NACOS_PROTOCOL_KEY] = url.Protocol params[constant.NACOS_PATH_KEY] = url.Path diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go index fc9dd25868db7040392b2d84f3b852d5306879c6..023ff788091c0c0f7c83ab213d8ab52006cfdc81 100644 --- a/registry/nacos/registry_test.go +++ b/registry/nacos/registry_test.go @@ -66,7 +66,7 @@ func TestNacosRegistry_Subscribe(t *testing.T) { return } - regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) reg2, _ := newNacosRegistry(®url) listener, err := reg2.(*nacosRegistry).subscribe(&url) assert.Nil(t, err) @@ -111,7 +111,7 @@ func TestNacosRegistry_Subscribe_del(t *testing.T) { return } - regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) reg2, _ := newNacosRegistry(®url) listener, err := reg2.(*nacosRegistry).subscribe(&url1) assert.Nil(t, err) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 8deb783910307ccd9ec9dc979fea33c68b385a18..d1e02b11b21396d7963d2cfffb8e1211b6ee6666 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -22,11 +22,16 @@ import ( "sync" ) +import ( + "github.com/dubbogo/gost/container" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config_center" _ "github.com/apache/dubbo-go/config_center/configurator" @@ -60,10 +65,11 @@ func init() { func getCacheKey(url *common.URL) string { newUrl := url.Clone() - newUrl.Params.Del("dynamic") - newUrl.Params.Del("enabled") + delKeys := container.NewSet("dynamic", "enabled") + newUrl.RemoveParams(delKeys) return newUrl.String() } + func newRegistryProtocol() *registryProtocol { return ®istryProtocol{ registries: &sync.Map{}, @@ -283,8 +289,8 @@ func isMatchCategory(category string, categories string) bool { func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { newUrl := providerUrl.Clone() newUrl.Protocol = constant.PROVIDER_PROTOCOL - newUrl.Params.Add(constant.CATEGORY_KEY, constant.CONFIGURATORS_CATEGORY) - newUrl.Params.Add(constant.CHECK_KEY, "false") + newUrl.SetParam(constant.CATEGORY_KEY, constant.CONFIGURATORS_CATEGORY) + newUrl.SetParam(constant.CHECK_KEY, "false") return newUrl } @@ -340,22 +346,20 @@ func GetProtocol() protocol.Protocol { type wrappedInvoker struct { invoker protocol.Invoker - url *common.URL protocol.BaseInvoker } func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoker { return &wrappedInvoker{ invoker: invoker, - url: url, - BaseInvoker: *protocol.NewBaseInvoker(common.URL{}), + BaseInvoker: *protocol.NewBaseInvoker(*url), } } -func (ivk *wrappedInvoker) GetUrl() common.URL { - return *ivk.url -} -func (ivk *wrappedInvoker) getInvoker() protocol.Invoker { - return ivk.invoker + +func (ivk *wrappedInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + // get right url + ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) + return ivk.invoker.Invoke(invocation) } type providerConfigurationListener struct { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 011fdc257ccbd67b00a387511d3807590cb97082..0c19da59df6e4fd2f663f9e8d541165fe26c3ffa 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -249,7 +249,7 @@ func TestExportWithOverrideListener(t *testing.T) { reg.MockEvent(event) time.Sleep(1e9) newUrl := url.SubURL.Clone() - newUrl.Params.Set(constant.CLUSTER_KEY, "mock1") + newUrl.SetParam(constant.CLUSTER_KEY, "mock1") v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) assert.NotNil(t, v2) } @@ -268,7 +268,7 @@ func TestExportWithServiceConfig(t *testing.T) { dc.(*config_center.MockDynamicConfiguration).MockServiceConfigEvent() newUrl := url.SubURL.Clone() - newUrl.Params.Set(constant.CLUSTER_KEY, "mock1") + newUrl.SetParam(constant.CLUSTER_KEY, "mock1") v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) assert.NotNil(t, v2) } @@ -287,7 +287,7 @@ func TestExportWithApplicationConfig(t *testing.T) { dc.(*config_center.MockDynamicConfiguration).MockApplicationConfigEvent() newUrl := url.SubURL.Clone() - newUrl.Params.Set(constant.CLUSTER_KEY, "mock1") + newUrl.SetParam(constant.CLUSTER_KEY, "mock1") v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) assert.NotNil(t, v2) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 6f892b8f4be1da4ce9c84a692e79be5036f789ca..e14541dd04691bf89825b1d77d79932ad54f7720 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -271,9 +271,11 @@ func (r *zkRegistry) register(c common.URL) error { return perrors.WithStack(err) } params = url.Values{} - for k, v := range c.Params { - params[k] = v - } + + c.RangeParams(func(key, value string) bool { + params.Add(key, value) + return true + }) params.Add("pid", processID) params.Add("ip", localIP) diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 9cb7c011f9235f9c4840632605761a8a16278afa..841c38da7fbf1830b6f7c55809fc50d52468ef46 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -60,7 +60,7 @@ func Test_Subscribe(t *testing.T) { } //consumer register - regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) reg2.Register(url)