Skip to content
Snippets Groups Projects
Commit 2fe1edfd authored by 邹毅贤's avatar 邹毅贤
Browse files

fix problems

parent f9ed6b53
No related branches found
No related tags found
No related merge requests found
package cluster
import (
"sort"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"sort"
)
type RouterChain struct {
......
......@@ -31,7 +31,6 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -64,8 +63,8 @@ type registryDirectory struct {
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
Options
serviceKey string
forbidden atomic.Bool
serviceKey string
forbidden atomic.Bool
}
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
......@@ -93,38 +92,9 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
//subscribe from registry
func (dir *registryDirectory) Subscribe(url *common.URL) {
for {
if !dir.registry.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
listener, err := dir.registry.Subscribe(url)
if err != nil {
if !dir.registry.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
return
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
go dir.update(serviceEvent)
}
}
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}
func (dir *registryDirectory) Notify(event *registry.ServiceEvent) {
......@@ -157,34 +127,34 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
//TODO: router
}
switch res.Action {
case remoting.EventTypeAdd:
url := dir.GetUrl()
var urls []*common.URL
for _, v := range url.GetBackupUrls() {
p := v.Protocol
category := v.GetParam(constant.CATEGORY_KEY, constant.PROVIDERS_CATEGORY)
if strings.EqualFold(category, constant.ROUTERS_CATEGORY) || strings.EqualFold(constant.ROUTE_PROTOCOL, p) {
urls = append(urls, v)
case remoting.EventTypeAdd:
url := dir.GetUrl()
var urls []*common.URL
for _, v := range url.GetBackupUrls() {
p := v.Protocol
category := v.GetParam(constant.CATEGORY_KEY, constant.PROVIDERS_CATEGORY)
if strings.EqualFold(category, constant.ROUTERS_CATEGORY) || strings.EqualFold(constant.ROUTE_PROTOCOL, p) {
urls = append(urls, v)
}
}
}
if len(urls) > 0 {
routers := toRouters(urls)
if len(routers) > 0 {
dir.SetRouters(routers)
if len(urls) > 0 {
routers := toRouters(urls)
if len(routers) > 0 {
dir.SetRouters(routers)
}
}
}
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
}
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(&res.Service)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(&res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
}
}
newInvokers := dir.toGroupInvokers()
......@@ -364,16 +334,3 @@ func (l *consumerConfigurationListener) Process(event *config_center.ConfigChang
l.BaseConfigurationListener.Process(event)
l.directory.refreshInvokers(nil)
}
func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
listener,
extension.GetDefaultConfiguratorFunc(),
)
return listener
}
func (l *consumerConfigurationListener) addNotifyListener(listener registry.NotifyListener) {
l.listeners = appe
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment