diff --git a/common/url.go b/common/url.go index 360f0aa4caa185d8086eb07497176ae627f47d47..726a74ffba670c2e0c5300d26c85ff8251508271 100644 --- a/common/url.go +++ b/common/url.go @@ -277,6 +277,7 @@ func (c URL) URLEqual(url URL) bool { } return true } + func isMatchCategory(category1 string, category2 string) bool { if len(category2) == 0 { return category1 == constant.DEFAULT_CATEGORY @@ -288,6 +289,7 @@ func isMatchCategory(category1 string, category2 string) bool { return strings.Contains(category2, category1) } } + func (c URL) String() string { var buildString string if len(c.Username) == 0 && len(c.Password) == 0 { diff --git a/config/protocol_config.go b/config/protocol_config.go index 4828d6e5bd28de19d896340f39c5633d0acd4874..586b53165c5cdd1684dc810f07061d31eef1944f 100644 --- a/config/protocol_config.go +++ b/config/protocol_config.go @@ -38,11 +38,11 @@ func (c *ProtocolConfig) Prefix() string { } func loadProtocol(protocolsIds string, protocols map[string]*ProtocolConfig) []*ProtocolConfig { - returnProtocols := []*ProtocolConfig{} + var returnProtocols []*ProtocolConfig for _, v := range strings.Split(protocolsIds, ",") { - for k, prot := range protocols { + for k, protocol := range protocols { if v == k { - returnProtocols = append(returnProtocols, prot) + returnProtocols = append(returnProtocols, protocol) } } diff --git a/config/reference_config.go b/config/reference_config.go index edfa17a27e88a605b71bc7f6dec1b133bd29abe9..e8e68a0b10a43c988088eac9290fff5eddaabf40 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -77,7 +77,6 @@ func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig { // UnmarshalYAML ... func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { - type rf ReferenceConfig raw := rf{} // Put your defaults here if err := unmarshal(&raw); err != nil { @@ -101,8 +100,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) { common.WithParamsValue(constant.BEAN_NAME_KEY, c.id), ) - //1. user specified URL, could be peer-to-peer address, or register center's address. if c.Url != "" { + //1. user specified URL, could be peer-to-peer address, or register center's address. urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*") for _, urlStr := range urlStrings { serviceUrl, err := common.NewURL(urlStr) @@ -120,7 +119,6 @@ func (c *ReferenceConfig) Refer(_ interface{}) { newUrl := common.MergeUrl(&serviceUrl, cfgURL) c.urls = append(c.urls, newUrl) } - } } else { //2. assemble SubURL from register center's configuration妯″紡 @@ -131,10 +129,11 @@ func (c *ReferenceConfig) Refer(_ interface{}) { regUrl.SubURL = cfgURL } } + if len(c.urls) == 1 { c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0]) } else { - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker var regUrl *common.URL for _, u := range c.urls { invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u)) @@ -219,7 +218,6 @@ func (c *ReferenceConfig) getUrlMap() url.Values { } return urlMap - } // GenericLoad ... diff --git a/config/service_config.go b/config/service_config.go index 2111838395d507ebac4f72883c99dd2bb1615850..7d97fa4d1e95bd79e051f77deaeafa1afcc58b0f 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -96,14 +96,12 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // NewServiceConfig The only way to get a new ServiceConfig func NewServiceConfig(id string, context context.Context) *ServiceConfig { - return &ServiceConfig{ context: context, id: id, unexported: atomic.NewBool(false), exported: atomic.NewBool(false), } - } // Export ... @@ -171,10 +169,8 @@ func (c *ServiceConfig) Export() error { panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL))) } } - } return nil - } // Implement ... @@ -242,5 +238,4 @@ func (c *ServiceConfig) getUrlMap() url.Values { } return urlMap - } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 748b8204d97e60c9803821290184fc5717c41025..a7678ba4e2f38cfeb77f202103e03066a7efdbef 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -77,6 +77,7 @@ func newRegistryProtocol() *registryProtocol { bounds: &sync.Map{}, } } + func getRegistry(regUrl *common.URL) registry.Registry { reg, err := extension.GetRegistry(regUrl.Protocol, regUrl) if err != nil { @@ -85,13 +86,14 @@ func getRegistry(regUrl *common.URL) registry.Registry { } return reg } + func (proto *registryProtocol) initConfigurationListeners() { proto.overrideListeners = &sync.Map{} proto.serviceConfigurationListeners = &sync.Map{} proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) } -func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { +func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url var serviceUrl = registryUrl.SubURL if registryUrl.Protocol == constant.REGISTRY_PROTOCOL { @@ -115,6 +117,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { serviceUrl.String(), err.Error()) return nil } + err = reg.Register(*serviceUrl) if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", @@ -131,7 +134,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { - proto.once.Do(func() { proto.initConfigurationListeners() }) @@ -172,13 +174,14 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte wrappedInvoker := newWrappedInvoker(invoker, providerUrl) cachedExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(wrappedInvoker) proto.bounds.Store(key, cachedExporter) - logger.Infof("The exporter has not been cached, and will return a new exporter!") + logger.Infof("The exporter has not been cached, and will return a new exporter!") } go reg.Subscribe(overriderUrl, overrideSubscribeListener) return cachedExporter.(protocol.Exporter) } + func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) { url := getProviderUrl(invoker) key := getCacheKey(url) @@ -202,12 +205,14 @@ type overrideSubscribeListener struct { func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Invoker, proto *registryProtocol) *overrideSubscribeListener { return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} } + func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd { nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) nl.doOverrideIfNecessary() } } + func (nl *overrideSubscribeListener) doOverrideIfNecessary() { providerUrl := getProviderUrl(nl.originInvoker) key := getCacheKey(providerUrl) @@ -276,6 +281,7 @@ func isMatched(providerUrl *common.URL, consumerUrl *common.URL) bool { consumerVersion == providerVersion) && (len(consumerClassifier) == 0 || consumerClassifier == constant.ANY_VALUE || consumerClassifier == providerClassifier) } + func isMatchCategory(category string, categories string) bool { if len(categories) == 0 { return category == constant.DEFAULT_CATEGORY @@ -287,6 +293,7 @@ func isMatchCategory(category string, categories string) bool { return strings.Contains(categories, category) } } + func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { newUrl := providerUrl.Clone() newUrl.Protocol = constant.PROVIDER_PROTOCOL @@ -334,6 +341,7 @@ func getProviderUrl(invoker protocol.Invoker) *common.URL { //be careful params maps in url is map type return url.SubURL.Clone() } + func setProviderUrl(regURL *common.URL, providerURL *common.URL) { regURL.SubURL = providerURL }