diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 8237495f55f3cf9444058712d3c2fa23471377bf..d9ded5e8ab856eea0476e0865ba04dd1c85ed440 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -19,6 +19,8 @@ package directory import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" "go.uber.org/atomic" "sync" ) @@ -32,6 +34,7 @@ type BaseDirectory struct { destroyed *atomic.Bool routers []cluster.Router mutex sync.Mutex + once sync.Once } func NewBaseDirectory(url *common.URL) BaseDirectory { @@ -47,6 +50,21 @@ func (dir *BaseDirectory) GetUrl() common.URL { return *dir.url } func (dir *BaseDirectory) SetRouters(routers []cluster.Router) { + dir.mutex.Lock() + defer dir.mutex.Unlock() + + dir.once.Do(func() { + routerKey := dir.GetUrl().GetParam(constant.ROUTER_KEY, "") + if len(routerKey) > 0 { + factory := extension.GetRouterFactory(dir.GetUrl().Protocol) + url := dir.GetUrl() + router, err := factory.Router(&url) + if err == nil { + routers = append(routers, router) + } + } + }) + dir.routers = routers } func (dir *BaseDirectory) Routers() []cluster.Router { diff --git a/cluster/directory/static_directory_test.go b/cluster/directory/static_directory_test.go index 3ce772e8a6287aebef3fdad039e2b12be421c4e3..76beebc944145274f07f99b7151ca5655f23a1a5 100644 --- a/cluster/directory/static_directory_test.go +++ b/cluster/directory/static_directory_test.go @@ -41,7 +41,9 @@ func Test_StaticDirList(t *testing.T) { } staticDir := NewStaticDirectory(invokers) - assert.Len(t, staticDir.List(&invocation.RPCInvocation{}), 10) + list, _ := staticDir.List(&invocation.RPCInvocation{}) + + assert.Len(t, list, 10) } func Test_StaticDirDestroy(t *testing.T) { diff --git a/common/constant/key.go b/common/constant/key.go index f2e9c222d1c72b34e55fc0ec869869cb1badf1da..1045e90959b39edcba08da5df67c498be1109306 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -63,18 +63,24 @@ const ( ) const ( - APPLICATION_KEY = "application" - ORGANIZATION_KEY = "organization" - NAME_KEY = "name" - MODULE_KEY = "module" - APP_VERSION_KEY = "app.version" - OWNER_KEY = "owner" - ENVIRONMENT_KEY = "environment" - METHOD_KEY = "method" - METHOD_KEYS = "methods" - RULE_KEY = "rule" - RUNTIME_KEY = "runtime" - BACKUP_KEY = "backup" + APPLICATION_KEY = "application" + ORGANIZATION_KEY = "organization" + NAME_KEY = "name" + MODULE_KEY = "module" + APP_VERSION_KEY = "app.version" + OWNER_KEY = "owner" + ENVIRONMENT_KEY = "environment" + METHOD_KEY = "method" + METHOD_KEYS = "methods" + RULE_KEY = "rule" + RUNTIME_KEY = "runtime" + BACKUP_KEY = "backup" + ROUTERS_CATEGORY = "routers" + ROUTE_PROTOCOL = "route" + CATEGORY_KEY = "category" + PROVIDERS_CATEGORY = "providers" + EMPTY_PROTOCOL = "empty" + ROUTER_KEY = "router" ) const ( diff --git a/common/url.go b/common/url.go index 7c38f8869db3739e3ed8a0199e44dae6828e1397..63cd2a44cbccf30865dd408474428713319e0b83 100644 --- a/common/url.go +++ b/common/url.go @@ -243,8 +243,8 @@ func (c URL) Key() string { } //todo -func (c URL) GetBackupUrls() []URL { - var urls []URL +func (c *URL) GetBackupUrls() []*URL { + var urls []*URL var host string urls = append(urls, c) backups := strings.Split(c.GetParam(constant.BACKUP_KEY, ""), "") @@ -255,10 +255,18 @@ func (c URL) GetBackupUrls() []URL { host = address[:index] port = address[index+1:] } else { - host = string(append([]byte(host), []byte(port)...)) + host = address } //todo - newURL, _ := NewURL(c.ctx, address) + newURL := NewURLWithOptions( + WithProtocol(c.Protocol), + WithPath(c.Path), + WithIp(host), + WithUsername(c.Username), + WithPassword(c.Password), + WithPort(port), + WithParams(c.Params)) + urls = append(urls, newURL) } return urls diff --git a/registry/directory/directory.go b/registry/directory/directory.go index b2421d7ec188c9a5da75a9a1bb99b242ac5211b5..6d93445fcbfed3e27cc1a9bb4391c32243b5bb8d 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -20,6 +20,7 @@ package directory import ( "fmt" "reflect" + "strings" "sync" "time" ) @@ -30,6 +31,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -101,22 +103,23 @@ func (dir *registryDirectory) Subscribe(url common.URL) { logger.Warnf("event listener game over.") return } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) time.Sleep(time.Duration(RegistryConnDelay) * time.Second) continue } for { - if serviceEvent, err := listener.Next(); err != nil { + serviceEvent, err := listener.Next() + if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() time.Sleep(time.Duration(RegistryConnDelay) * time.Second) return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - go dir.update(serviceEvent) } + logger.Infof("update begin, service event: %v", serviceEvent.String()) + go dir.update(serviceEvent) } } @@ -132,9 +135,22 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { logger.Debugf("update service name: %s!", res.Service) //todo - _ = dir.GetUrl() + url := dir.GetUrl() + var urls []*common.URL + + for _, v := range url.GetBackupUrls() { + p := v.Protocol - if len(dir.Routers()) > 0 { + category := v.GetParam(constant.CATEGORY_KEY, constant.PROVIDERS_CATEGORY) + if strings.EqualFold(category, constant.ROUTERS_CATEGORY) || strings.EqualFold(constant.ROUTE_PROTOCOL, p) { + urls = append(urls, v) + } + } + if len(urls) > 0 { + routers := toRouters(urls) + if len(routers) > 0 { + dir.SetRouters(routers) + } } dir.refreshInvokers(res) } @@ -222,8 +238,8 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) ([]protocol.I if dir.Destroyed() { return nil, fmt.Errorf("directory already destroyed .url: %s", dir.GetUrl().String()) } + if dir.forbidden.Load() { - //todo error localIP, _ := utils.GetLocalIP() return nil, fmt.Errorf("no provider available from registry %s for service %s on consumer %s use dubbo version %s, please check status of providers(disabled, not registered or in blacklist)", dir.GetUrl().Location, dir.ConsumerUrl.ServiceKey(), localIP, version.Version) } @@ -238,6 +254,7 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) ([]protocol.I } } } + return invokers, nil } @@ -264,3 +281,34 @@ func (dir *registryDirectory) Destroy() { dir.cacheInvokers = []protocol.Invoker{} }) } + +func toRouters(urls []*common.URL) []cluster.Router { + var routers = []cluster.Router{} + if len(urls) == 0 { + return routers + } + for _, url := range urls { + if strings.EqualFold(constant.EMPTY_PROTOCOL, url.Protocol) { + continue + } + routerType := url.GetParam(constant.ROUTER_KEY, "") + if len(routerType) > 0 { + url = common.NewURLWithOptions( + common.WithProtocol(routerType), + common.WithPath(url.Path), + common.WithIp(url.Ip), + common.WithUsername(url.Username), + common.WithPassword(url.Password), + common.WithPort(url.Port), + common.WithParams(url.Params)) + routerFactory := extension.GetRouterFactory(url.Protocol) + router, err := routerFactory.Router(url) + if err != nil { + logger.Errorf("convert router url to router error, url:%s", url.String()) + continue + } + routers = append(routers, router) + } + } + return routers +}