diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go index c0c0238cb0184683cd64c023f8820d1ce983e6fb..7a9aa10c6aa96f0ef87e315db944b0075764459a 100644 --- a/cluster/support/failover_cluster_invoker.go +++ b/cluster/support/failover_cluster_invoker.go @@ -18,5 +18,5 @@ func NewFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) invokers[0].GetUrl() - return + return &protocol.RPCResult{} } diff --git a/config/config_url.go b/config/config_url.go index 4a2e4f5570f5ff139abbb5268f34c06815fefb6c..e605d6b5155a22b818af28b214ea0fe34ac93524 100644 --- a/config/config_url.go +++ b/config/config_url.go @@ -50,6 +50,11 @@ type URL struct { Cluster string } +type method struct { + Name string + Retries int +} + func NewURL(ctx context.Context, urlString string) (*URL, error) { var ( diff --git a/registry/directory.go b/registry/directory/directory.go similarity index 88% rename from registry/directory.go rename to registry/directory/directory.go index f9cfa12c06aafaf897d2ce38e5be14e6cfcde3a2..3474aebef2a3aa7e3a5c0be190220d00467ecba5 100644 --- a/registry/directory.go +++ b/registry/directory/directory.go @@ -1,6 +1,8 @@ -package registry +package directory import ( + "github.com/dubbo/dubbo-go/registry" + protocol2 "github.com/dubbo/dubbo-go/registry/protocol" "sync" "time" ) @@ -33,13 +35,13 @@ type RegistryDirectory struct { cacheInvokers []protocol.Invoker listenerLock sync.Mutex serviceType string - registry Registry + registry registry.Registry cacheInvokersMap sync.Map //use sync.map //cacheInvokersMap map[string]protocol.Invoker Options } -func NewRegistryDirectory(url *config.RegistryURL, registry Registry, opts ...Option) *RegistryDirectory { +func NewRegistryDirectory(url *config.RegistryURL, registry registry.Registry, opts ...Option) *RegistryDirectory { options := Options{ //default 300s serviceTTL: time.Duration(300e9), @@ -59,7 +61,7 @@ func NewRegistryDirectory(url *config.RegistryURL, registry Registry, opts ...Op } //subscibe from registry -func (dir *RegistryDirectory) subscribe(url config.URL) { +func (dir *RegistryDirectory) Subscribe(url config.URL) { for { if dir.registry.IsClosed() { log.Warn("event listener game over.") @@ -73,7 +75,7 @@ func (dir *RegistryDirectory) subscribe(url config.URL) { return } log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + time.Sleep(time.Duration(protocol2.RegistryConnDelay) * time.Second) continue } @@ -81,7 +83,7 @@ func (dir *RegistryDirectory) subscribe(url config.URL) { if serviceEvent, err := listener.Next(); err != nil { log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err)) listener.Close() - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + time.Sleep(time.Duration(protocol2.RegistryConnDelay) * time.Second) return } else { go dir.update(serviceEvent) @@ -93,7 +95,7 @@ func (dir *RegistryDirectory) subscribe(url config.URL) { } //subscribe service from registry , and update the cacheServices -func (dir *RegistryDirectory) update(res *ServiceEvent) { +func (dir *RegistryDirectory) update(res *registry.ServiceEvent) { if res == nil { return } @@ -105,14 +107,14 @@ func (dir *RegistryDirectory) update(res *ServiceEvent) { dir.refreshInvokers(res) } -func (dir *RegistryDirectory) refreshInvokers(res *ServiceEvent) { +func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { var newCacheInvokersMap sync.Map switch res.Action { - case ServiceAdd: + case registry.ServiceAdd: //dir.cacheService.Add(res.Service, dir.serviceTTL) newCacheInvokersMap = dir.cacheInvoker(res.Service) - case ServiceDel: + case registry.ServiceDel: //dir.cacheService.Del(res.Service, dir.serviceTTL) newCacheInvokersMap = dir.uncacheInvoker(res.Service) log.Info("selector delete service url{%s}", res.Service) diff --git a/registry/protocol.go b/registry/protocol/protocol.go similarity index 80% rename from registry/protocol.go rename to registry/protocol/protocol.go index 957be807c565f4a50ccafbae44e53b98ae8cdb0d..2bec716611648cf6c1a22c005b97372630df5a26 100644 --- a/registry/protocol.go +++ b/registry/protocol/protocol.go @@ -1,6 +1,8 @@ -package registry +package protocol import ( + "github.com/dubbo/dubbo-go/registry" + directory2 "github.com/dubbo/dubbo-go/registry/directory" "sync" ) @@ -18,7 +20,7 @@ const RegistryConnDelay = 3 type RegistryProtocol struct { // Registry Map<RegistryAddress, Registry> - registies map[string]Registry + registies map[string]registry.Registry registiesMutex sync.Mutex } @@ -28,7 +30,7 @@ func init() { func NewRegistryProtocol() protocol.Protocol { return &RegistryProtocol{ - registies: make(map[string]Registry), + registies: make(map[string]registry.Registry), } } @@ -38,7 +40,7 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker { protocol.registiesMutex.Lock() defer protocol.registiesMutex.Unlock() - var reg Registry + var reg registry.Registry var ok bool if reg, ok = protocol.registies[url.Key()]; !ok { @@ -52,8 +54,8 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker { } } //new registry directory for store service url from registry - directory := NewRegistryDirectory(regUrl, reg) - go directory.subscribe(serviceUrl) + directory := directory2.NewRegistryDirectory(regUrl, reg) + go directory.Subscribe(serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.Cluster)