diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index d9ded5e8ab856eea0476e0865ba04dd1c85ed440..6b43afae2199a810d49f30112d51073caeaf67bb 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -34,7 +34,6 @@ type BaseDirectory struct { destroyed *atomic.Bool routers []cluster.Router mutex sync.Mutex - once sync.Once } func NewBaseDirectory(url *common.URL) BaseDirectory { @@ -53,17 +52,15 @@ func (dir *BaseDirectory) SetRouters(routers []cluster.Router) { dir.mutex.Lock() defer dir.mutex.Unlock() - dir.once.Do(func() { - routerKey := dir.GetUrl().GetParam(constant.ROUTER_KEY, "") - if len(routerKey) > 0 { - factory := extension.GetRouterFactory(dir.GetUrl().Protocol) - url := dir.GetUrl() - router, err := factory.Router(&url) - if err == nil { - routers = append(routers, router) - } + routerKey := dir.GetUrl().GetParam(constant.ROUTER_KEY, "") + if len(routerKey) > 0 { + factory := extension.GetRouterFactory(dir.GetUrl().Protocol) + url := dir.GetUrl() + router, err := factory.Router(&url) + if err == nil { + routers = append(routers, router) } - }) + } dir.routers = routers } diff --git a/common/url.go b/common/url.go index 63cd2a44cbccf30865dd408474428713319e0b83..ef0c68295fe4a96c17262bb2755ca6dd7f45e4ec 100644 --- a/common/url.go +++ b/common/url.go @@ -50,7 +50,7 @@ const ( var ( DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} - DubboRole = [...]string{"consumer", "", "", "provider"} + DubboRole = [...]string{"consumer", "", "routers", "provider"} ) type RoleType int diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index e2fca972306aa380e3f584b093e1315e1468a45e..422cd896caad2fe2bb0e5e9a054775b420d694f6 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -80,7 +80,6 @@ type zkRegistry struct { configListener *RegistryConfigurationListener //for provider zkPath map[string]int // key = protocol://ip:port/interface - } func newZkRegistry(url *common.URL) (registry.Registry, error) { @@ -225,7 +224,7 @@ func (r *zkRegistry) Register(conf common.URL) error { r.cltLock.Lock() r.services[conf.Key()] = conf r.cltLock.Unlock() - logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) + logger.Debugf("(ZkConsumerRegistry)Register(conf{%#v})", conf) case common.PROVIDER: @@ -250,6 +249,25 @@ func (r *zkRegistry) Register(conf common.URL) error { r.cltLock.Unlock() logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf) + + case common.ROUTER: + + key := conf.String() + r.cltLock.Lock() + _, ok = r.services[key] + r.cltLock.Unlock() + if ok { + return perrors.Errorf("Path{%s} has been registered", conf.Path) + } + err = r.register(conf) + if err != nil { + return perrors.WithMessagef(err, "register(conf:%+v)", conf) + } + r.cltLock.Lock() + r.services[conf.Key()] = conf + r.cltLock.Unlock() + logger.Debugf("(ZkRouterRegistry)Register(conf{%#v})", conf) + } return nil @@ -354,6 +372,23 @@ func (r *zkRegistry) register(c common.URL) error { dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.CONSUMER)).String()) logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) + case common.ROUTER: + //todo + dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.ROUTER]) + r.cltLock.Lock() + err = r.client.Create(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) + return perrors.WithStack(err) + } + params.Add("protocol", c.Protocol) + params.Add("category", (common.RoleType(common.ROUTER)).String()) + + rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, c.Location, c.Path, params.Encode()) + encodedURL = url.QueryEscape(rawURL) + logger.Debugf("router path:%s, url:%s", dubboPath, rawURL) + default: return perrors.Errorf("@c{%v} type is not referencer or provider", c) }