diff --git a/common/url.go b/common/url.go index fd233244bb5fe73e70c9a0b4d7ade049ed0967d6..f727f20b681fba70086a38cdebcd7d2c4bef185b 100644 --- a/common/url.go +++ b/common/url.go @@ -25,7 +25,6 @@ import ( "net/url" "strconv" "strings" - "sync" ) import ( @@ -39,9 +38,9 @@ import ( "github.com/apache/dubbo-go/common/constant" ) -///////////////////////////////// +// /////////////////////////////// // dubbo role type -///////////////////////////////// +// /////////////////////////////// // role constant const ( @@ -75,24 +74,26 @@ func (t RoleType) Role() string { } type baseUrl struct { - Protocol string - Location string // ip+port - Ip string - Port string - //url.Values is not safe map, add to avoid concurrent map read and map write error - paramsLock sync.RWMutex + Protocol string + Location string // ip+port + Ip string + Port string params url.Values PrimitiveURL string } -// URL ... +// URL is not thread-safe. +// we fail to define this struct to be immutable object. +// but, those method which will update the URL, including SetParam, SetParams +// are only allowed to be invoked in creating URL instance +// Please keep in mind that this struct is immutable after it has been created and initialized. type URL struct { baseUrl Path string // like /com.ikurento.dubbo.UserProvider3 Username string Password string Methods []string - //special for registry + // special for registry SubURL *URL } @@ -211,7 +212,7 @@ func NewURL(urlString string, opts ...option) (URL, error) { return s, perrors.Errorf("url.QueryUnescape(%s), error{%v}", urlString, err) } - //rawUrlString = "//" + rawUrlString + // rawUrlString = "//" + rawUrlString if strings.Index(rawUrlString, "//") < 0 { t := URL{baseUrl: baseUrl{}} for _, opt := range opts { @@ -274,7 +275,7 @@ func (c URL) URLEqual(url URL) bool { return false } - //TODO :may need add interface key any value condition + // TODO :may need add interface key any value condition return isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) } @@ -301,9 +302,7 @@ func (c URL) String() string { "%s://%s:%s@%s:%s%s?", c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path)) } - c.paramsLock.RLock() buf.WriteString(c.params.Encode()) - c.paramsLock.RUnlock() return buf.String() } @@ -374,31 +373,31 @@ func (c URL) Service() string { return service } else if c.SubURL != nil { service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) - if service != "" { //if url.path is "" then return suburl's path, special for registry url + if service != "" { // if url.path is "" then return suburl's path, special for registry url return service } } return "" } -// AddParam ... +// AddParam will add the key-value pair +// Not thread-safe +// think twice before using it. func (c *URL) AddParam(key string, value string) { - c.paramsLock.Lock() c.params.Add(key, value) - c.paramsLock.Unlock() } -// SetParam ... +// SetParam will put the key-value pair into url +// it's not thread safe. +// think twice before you want to use this method +// usually it should only be invoked when you want to initialized an url func (c *URL) SetParam(key string, value string) { - c.paramsLock.Lock() c.params.Set(key, value) - c.paramsLock.Unlock() } -// RangeParams ... +// RangeParams will iterate the params +// it's not thread-safe func (c *URL) RangeParams(f func(key, value string) bool) { - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() for k, v := range c.params { if !f(k, v[0]) { break @@ -408,8 +407,6 @@ func (c *URL) RangeParams(f func(key, value string) bool) { // GetParam ... func (c URL) GetParam(s string, d string) string { - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() r := c.params.Get(s) if len(r) == 0 { r = d @@ -424,8 +421,6 @@ func (c URL) GetParams() url.Values { // GetParamAndDecoded ... func (c URL) GetParamAndDecoded(key string) (string, error) { - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() ruleDec, err := base64.URLEncoding.DecodeString(c.GetParam(key, "")) value := string(ruleDec) return value, err @@ -502,17 +497,10 @@ func (c URL) GetMethodParamBool(method string, key string, d bool) bool { return r } -// RemoveParams ... -func (c *URL) RemoveParams(set *gxset.HashSet) { - c.paramsLock.Lock() - defer c.paramsLock.Unlock() - for k := range set.Items { - s := k.(string) - delete(c.params, s) - } -} - -// SetParams ... +// SetParams will put all key-value pair into url. +// 1. if there already has same key, the value will be override +// 2. it's not thread safe +// 3. think twice when you want to invoke this method func (c *URL) SetParams(m url.Values) { for k := range m { c.SetParam(k, m.Get(k)) @@ -561,29 +549,35 @@ func (c URL) ToMap() map[string]string { // configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. -//TODO configuration merge, in the future , the configuration center's config should merge too. - -// MergeUrl ... +// TODO configuration merge, in the future , the configuration center's config should merge too. + +// MergeUrl will merge those two url +// the result is based on serviceUrl, and the key which si only contained in referenceUrl +// will be added into result. +// for example, if serviceUrl contains params (a1->v1, b1->v2) and referenceUrl contains params(a2->v3, b1 -> v4) +// the params of result will be (a1->v1, b1->v2, a2->v3). +// You should notice that the value of b1 is v2, not v4. +// due to URL is not thread-safe, so this method is not thread-safe func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { mergedUrl := serviceUrl.Clone() - //iterator the referenceUrl if serviceUrl not have the key ,merge in + // iterator the referenceUrl if serviceUrl not have the key ,merge in referenceUrl.RangeParams(func(key, value string) bool { if v := mergedUrl.GetParam(key, ""); len(v) == 0 { mergedUrl.SetParam(key, value) } return true }) - //loadBalance,cluster,retries strategy config + // loadBalance,cluster,retries strategy config methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY}) - //remote timestamp + // remote timestamp if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 { mergedUrl.SetParam(constant.REMOTE_TIMESTAMP_KEY, v) mergedUrl.SetParam(constant.TIMESTAMP_KEY, referenceUrl.GetParam(constant.TIMESTAMP_KEY, "")) } - //finally execute methodConfigMergeFcn + // finally execute methodConfigMergeFcn for _, method := range referenceUrl.Methods { for _, fcn := range methodConfigMergeFcn { fcn("methods." + method) @@ -593,7 +587,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { return mergedUrl } -// Clone ... +// Clone will copy the url func (c *URL) Clone() *URL { newUrl := &URL{} copier.Copy(newUrl, c) @@ -605,6 +599,19 @@ func (c *URL) Clone() *URL { return newUrl } +func (c *URL) CloneExceptParams(excludeParams *gxset.HashSet) *URL { + newUrl := &URL{} + copier.Copy(newUrl, c) + newUrl.params = url.Values{} + c.RangeParams(func(key, value string) bool { + if !excludeParams.Contains(key) { + newUrl.SetParam(key, value) + } + return true + }) + return newUrl +} + // Copy url based on the reserved parameters' keys. func (c *URL) CloneWithParams(reserveParams []string) *URL { params := url.Values{} diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index 18415bee3a28b37ffc2f3f73cc7309b685de5408..ebd3dc601b2821f3f4e1e4405720e4ebc55b607e 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -50,12 +50,12 @@ func (c *overrideConfigurator) GetUrl() *common.URL { } func (c *overrideConfigurator) Configure(url *common.URL) { - //remove configuratorUrl some param that can not be configured + // remove configuratorUrl some param that can not be configured if c.configuratorUrl.GetParam(constant.ENABLED_KEY, "true") == "false" || len(c.configuratorUrl.Location) == 0 { return } - //branch for version 2.7.x + // branch for version 2.7.x apiVersion := c.configuratorUrl.GetParam(constant.CONFIG_VERSION_KEY, "") if len(apiVersion) != 0 { currentSide := url.GetParam(constant.SIDE_KEY, "") @@ -67,12 +67,12 @@ func (c *overrideConfigurator) Configure(url *common.URL) { c.configureIfMatch(url.Ip, url) } } else { - //branch for version 2.6.x and less + // branch for version 2.6.x and less c.configureDeprecated(url) } } -//translate from java, compatible rules in java +// configureIfMatch translate from java, compatible rules in java func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { if constant.ANYHOST_VALUE == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip { providers := c.configuratorUrl.GetParam(constant.OVERRIDE_PROVIDERS_KEY, "") @@ -105,8 +105,7 @@ func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { if returnUrl { return } - configUrl := c.configuratorUrl.Clone() - configUrl.RemoveParams(conditionKeys) + configUrl := c.configuratorUrl.CloneExceptParams(conditionKeys) url.SetParams(configUrl.GetParams()) } } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index aa8fbcbe7d6eca682892d4627878fe6bfc3756fe..a936db80bf2c3b46ba389142cc40686ed3df17b1 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -56,8 +56,8 @@ type registryProtocol struct { invokers []protocol.Invoker // Registry Map<RegistryAddress, Registry> registries *sync.Map - //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. - //providerurl <--> exporter + // To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. + // providerurl <--> exporter bounds *sync.Map overrideListeners *sync.Map serviceConfigurationListeners *sync.Map @@ -70,10 +70,8 @@ func init() { } func getCacheKey(url *common.URL) string { - newUrl := url.Clone() delKeys := gxset.NewSet("dynamic", "enabled") - newUrl.RemoveParams(delKeys) - return newUrl.String() + return url.CloneExceptParams(delKeys).String() } func newRegistryProtocol() *registryProtocol { @@ -103,16 +101,14 @@ func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common. // filterHideKey filter the parameters that do not need to be output in url(Starting with .) func filterHideKey(url *common.URL) *common.URL { - //be careful params maps in url is map type - cloneURL := url.Clone() + // be careful params maps in url is map type removeSet := gxset.NewSet() - for k, _ := range cloneURL.GetParams() { + for k, _ := range url.GetParams() { if strings.HasPrefix(k, ".") { removeSet.Add(k) } } - cloneURL.RemoveParams(removeSet) - return cloneURL + return url.CloneExceptParams(removeSet) } func (proto *registryProtocol) initConfigurationListeners() { @@ -138,7 +134,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { reg = regI.(registry.Registry) } - //new registry directory for store service url from registry + // new registry directory for store service url from registry directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", @@ -152,7 +148,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { serviceUrl.String(), registryUrl.String(), err.Error()) } - //new cluster invoker + // new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) invoker := cluster.Join(directory) @@ -217,7 +213,7 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common oldExporter.(protocol.Exporter).Unexport() proto.bounds.Delete(key) proto.Export(wrappedNewInvoker) - //TODO: unregister & unsubscribe + // TODO: unregister & unsubscribe } } @@ -300,7 +296,7 @@ func isMatched(providerUrl *common.URL, consumerUrl *common.URL) bool { providerGroup := providerUrl.GetParam(constant.GROUP_KEY, "") providerVersion := providerUrl.GetParam(constant.VERSION_KEY, "") providerClassifier := providerUrl.GetParam(constant.CLASSIFIER_KEY, "") - //todo: public static boolean isContains(String values, String value) { + // todo: public static boolean isContains(String values, String value) { // return isNotEmpty(values) && isContains(COMMA_SPLIT_PATTERN.split(values), value); // } return (consumerGroup == constant.ANY_VALUE || consumerGroup == providerGroup || @@ -353,9 +349,9 @@ func (proto *registryProtocol) Destroy() { } func getRegistryUrl(invoker protocol.Invoker) *common.URL { - //here add * for return a new url + // here add * for return a new url url := invoker.GetUrl() - //if the protocol == registry ,set protocol the registry value in url.params + // if the protocol == registry ,set protocol the registry value in url.params if url.Protocol == constant.REGISTRY_PROTOCOL { protocol := url.GetParam(constant.REGISTRY_KEY, "") url.Protocol = protocol @@ -365,7 +361,7 @@ func getRegistryUrl(invoker protocol.Invoker) *common.URL { func getProviderUrl(invoker protocol.Invoker) *common.URL { url := invoker.GetUrl() - //be careful params maps in url is map type + // be careful params maps in url is map type return url.SubURL.Clone() }