From d18be63cb62b1343d17c8f54c50254911aaf354a Mon Sep 17 00:00:00 2001
From: "vito.he" <hxmhlt@163.com>
Date: Sun, 5 May 2019 13:51:58 +0800
Subject: [PATCH] Fix:bounds cache

---
 config/support/service_config.go | 23 ++++++++++++------
 config/url.go                    | 20 ++++++++++++----
 registry/directory/directory.go  | 10 ++++----
 registry/mock_registry.go        | 27 +++++++++++++++++++++
 registry/protocol/protocol.go    | 41 +++++++++++++++++++-------------
 registry/zookeeper/registry.go   |  6 ++---
 6 files changed, 90 insertions(+), 37 deletions(-)
 create mode 100644 registry/mock_registry.go

diff --git a/config/support/service_config.go b/config/support/service_config.go
index e9d514d02..f9a78637f 100644
--- a/config/support/service_config.go
+++ b/config/support/service_config.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"net/url"
 	"strconv"
+	"sync"
 	"time"
 )
 import (
@@ -33,12 +34,14 @@ type ServiceConfig struct {
 		Loadbalance string `yaml:"loadbalance"  json:"loadbalance,omitempty"`
 		Weight      int64  `yaml:"weight"  json:"weight,omitempty"`
 	} `yaml:"methods"  json:"methods,omitempty"`
-	Warmup     string `yaml:"warmup"  json:"warmup,omitempty"`
-	Retries    int64  `yaml:"retries"  json:"retries,omitempty"`
-	unexported *atomic.Bool
-	exported   *atomic.Bool
-	rpcService config.RPCService
-	exporters  []protocol.Exporter
+	Warmup        string `yaml:"warmup"  json:"warmup,omitempty"`
+	Retries       int64  `yaml:"retries"  json:"retries,omitempty"`
+	unexported    *atomic.Bool
+	exported      *atomic.Bool
+	rpcService    config.RPCService
+	exporters     []protocol.Exporter
+	cacheProtocol protocol.Protocol
+	cacheMutex    sync.Mutex
 }
 
 func NewServiceConfig() *ServiceConfig {
@@ -88,7 +91,13 @@ func (srvconfig *ServiceConfig) Export() error {
 		for _, regUrl := range regUrls {
 			regUrl.SubURL = url
 			invoker := protocol.NewBaseInvoker(*regUrl)
-			exporter := extension.GetProtocolExtension("registry").Export(invoker)
+			srvconfig.cacheMutex.Lock()
+			if srvconfig.cacheProtocol == nil {
+				log.Info("First load the registry protocol!")
+				srvconfig.cacheProtocol = extension.GetProtocolExtension("registry")
+			}
+			srvconfig.cacheMutex.Unlock()
+			exporter := srvconfig.cacheProtocol.Export(invoker)
 			srvconfig.exporters = append(srvconfig.exporters, exporter)
 		}
 	}
diff --git a/config/url.go b/config/url.go
index 69dd1c988..3f92085ca 100644
--- a/config/url.go
+++ b/config/url.go
@@ -3,6 +3,7 @@ package config
 import (
 	"context"
 	"fmt"
+	"github.com/dubbo/dubbo-go/common/constant"
 	"net"
 	"net/url"
 	"strconv"
@@ -173,11 +174,12 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error)
 	return s, nil
 }
 
-func (c URL) Key() string {
-	return fmt.Sprintf(
-		"%s://%s:%s@%s:%s/%s",
-		c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path)
-}
+//
+//func (c URL) Key() string {
+//	return fmt.Sprintf(
+//		"%s://%s:%s@%s:%s/%s",
+//		c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path)
+//}
 
 func (c URL) URLEqual(url URL) bool {
 
@@ -205,6 +207,14 @@ func (c URL) String() string {
 	return buildString
 }
 
+func (c URL) Key() string {
+	buildString := fmt.Sprintf(
+		"%s://%s:%s@%s:%s/%s?group=%s&version=%s",
+		c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path, c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION))
+
+	return buildString
+}
+
 func (c URL) Context() context.Context {
 	return c.ctx
 }
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 4e1967f50..72cb2f80a 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -167,9 +167,9 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc
 }
 
 func (dir *RegistryDirectory) uncacheInvoker(url config.URL) *sync.Map {
-	log.Debug("service will be deleted in cache invokers: invokers key is  %s!", url.String())
+	log.Debug("service will be deleted in cache invokers: invokers key is  %s!", url.Key())
 	newCacheInvokers := dir.cacheInvokersMap
-	newCacheInvokers.Delete(url.String())
+	newCacheInvokers.Delete(url.Key())
 	return newCacheInvokers
 }
 
@@ -181,10 +181,10 @@ func (dir *RegistryDirectory) cacheInvoker(url config.URL) *sync.Map {
 	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
 		url = mergeUrl(url, referenceUrl)
 
-		if _, ok := newCacheInvokers.Load(url.String()); !ok {
-			log.Debug("service will be added in cache invokers: invokers key is  %s!", url.String())
+		if _, ok := newCacheInvokers.Load(url.Key()); !ok {
+			log.Debug("service will be added in cache invokers: invokers key is  %s!", url.Key())
 			newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url)
-			newCacheInvokers.Store(url.String(), newInvoker)
+			newCacheInvokers.Store(url.Key(), newInvoker)
 		}
 	}
 	return newCacheInvokers
diff --git a/registry/mock_registry.go b/registry/mock_registry.go
new file mode 100644
index 000000000..65f17e0ff
--- /dev/null
+++ b/registry/mock_registry.go
@@ -0,0 +1,27 @@
+package registry
+
+import "github.com/dubbo/dubbo-go/config"
+
+type MockRegistry struct {
+}
+
+func (*MockRegistry) Register(url config.URL) error {
+	return nil
+}
+
+func (*MockRegistry) Close() {
+
+}
+func (*MockRegistry) IsClosed() bool {
+	return false
+}
+
+//func (*MockRegistry) Subscribe(config.URL) (Listener, error) {
+//
+//}
+//
+//type listener struct{}
+//
+//func Next() (*ServiceEvent, error) {
+//
+//}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index bf8c908b3..915b401b3 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -1,6 +1,7 @@
 package protocol
 
 import (
+	"github.com/dubbo/dubbo-go/protocol/protocolwrapper"
 	"sync"
 )
 
@@ -13,7 +14,6 @@ import (
 	"github.com/dubbo/dubbo-go/common/extension"
 	"github.com/dubbo/dubbo-go/config"
 	"github.com/dubbo/dubbo-go/protocol"
-	"github.com/dubbo/dubbo-go/protocol/protocolwrapper"
 	"github.com/dubbo/dubbo-go/registry"
 	directory2 "github.com/dubbo/dubbo-go/registry/directory"
 )
@@ -22,7 +22,10 @@ var registryProtocol *RegistryProtocol
 
 type RegistryProtocol struct {
 	// Registry  Map<RegistryAddress, Registry>
-	registies sync.Map
+	//registies sync.Map
+	//To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
+	//providerurl <--> exporter
+	bounds sync.Map
 }
 
 func init() {
@@ -31,7 +34,8 @@ func init() {
 
 func NewRegistryProtocol() *RegistryProtocol {
 	return &RegistryProtocol{
-		registies: sync.Map{},
+		//registies: sync.Map{},
+		bounds: sync.Map{},
 	}
 }
 func getRegistry(regUrl *config.URL) registry.Registry {
@@ -46,11 +50,7 @@ func (protocol *RegistryProtocol) Refer(url config.URL) protocol.Invoker {
 	var regUrl = url
 	var serviceUrl = regUrl.SubURL
 
-	var reg registry.Registry
-
-	regI, _ := protocol.registies.LoadOrStore(url.Key(),
-		getRegistry(&regUrl))
-	reg = regI.(registry.Registry)
+	reg := getRegistry(&regUrl)
 
 	//new registry directory for store service url from registry
 	directory := directory2.NewRegistryDirectory(&regUrl, reg)
@@ -61,23 +61,30 @@ func (protocol *RegistryProtocol) Refer(url config.URL) protocol.Invoker {
 	return cluster.Join(directory)
 }
 
-func (protocol *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
-	registryUrl := protocol.getRegistryUrl(invoker)
-	providerUrl := protocol.getProviderUrl(invoker)
+func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
+	registryUrl := proto.getRegistryUrl(invoker)
+	providerUrl := proto.getProviderUrl(invoker)
 
-	regI, _ := protocol.registies.LoadOrStore(providerUrl.Key(),
-		getRegistry(&registryUrl))
-
-	reg := regI.(registry.Registry)
+	reg := getRegistry(&registryUrl)
 
 	err := reg.Register(providerUrl)
 	if err != nil {
 		log.Error("provider service %v register registry %v error, error message is %v", providerUrl.String(), registryUrl.String(), err.Error())
 	}
 
-	wrappedInvoker := newWrappedInvoker(invoker, providerUrl)
+	key := providerUrl.Key()
+	log.Info("The cached exporter keys is %v !", key)
+	cachedExporter, loaded := proto.bounds.Load(key)
+	if loaded {
+		log.Info("The exporter has been cached, and will return cached exporter!")
+	} else {
+		wrappedInvoker := newWrappedInvoker(invoker, providerUrl)
+		cachedExporter = extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker)
+		proto.bounds.Store(key, cachedExporter)
+		log.Info("The exporter has not been cached, and will return a new  exporter!")
+	}
 
-	return extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker)
+	return cachedExporter.(protocol.Exporter)
 
 }
 
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 19a8990d1..ec57030f9 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -250,10 +250,10 @@ func (r *ZkRegistry) Register(conf config.URL) error {
 		r.cltLock.Lock()
 		// 娉ㄦ剰姝ゅ涓巆onsumerZookeeperRegistry鐨勫樊寮傦紝consumer鐢ㄧ殑鏄痗onf.Service锛�
 		// 鍥犱负consumer瑕佹彁渚泈atch鍔熻兘缁檚elector浣跨敤, provider鍏佽娉ㄥ唽鍚屼竴涓猻ervice鐨勫涓猤roup or version
-		_, ok = r.services[conf.String()]
+		_, ok = r.services[conf.Key()]
 		r.cltLock.Unlock()
 		if ok {
-			return jerrors.Errorf("Service{%s} has been registered", conf.String())
+			return jerrors.Errorf("Service{%s} has been registered", conf.Key())
 		}
 
 		err = r.register(conf)
@@ -262,7 +262,7 @@ func (r *ZkRegistry) Register(conf config.URL) error {
 		}
 
 		r.cltLock.Lock()
-		r.services[conf.String()] = conf
+		r.services[conf.Key()] = conf
 		r.cltLock.Unlock()
 
 		log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf)
-- 
GitLab