Skip to content
Snippets Groups Projects
Unverified Commit c1343978 authored by ken.lj's avatar ken.lj Committed by GitHub
Browse files

Fix polaris Subscriber #2026

parents 19ddecf8 f5a933d0
No related branches found
No related tags found
No related merge requests found
......@@ -32,13 +32,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)
type subscriber func(remoting.EventType, []model.Instance)
type item func(remoting.EventType, []model.Instance)
type PolarisServiceWatcher struct {
consumer api.ConsumerAPI
subscribeParam *api.WatchServiceRequest
lock *sync.RWMutex
subscribers []subscriber
subscribers []item
execOnce *sync.Once
}
......@@ -48,7 +48,7 @@ func newPolarisWatcher(param *api.WatchServiceRequest, consumer api.ConsumerAPI)
subscribeParam: param,
consumer: consumer,
lock: &sync.RWMutex{},
subscribers: make([]subscriber, 0),
subscribers: make([]item, 0),
execOnce: &sync.Once{},
}
return watcher, nil
......
......@@ -51,6 +51,7 @@ func NewPolarisListener(url *common.URL) (*polarisListener, error) {
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
}
return listener, nil
}
......
......@@ -150,6 +150,11 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) error {
// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
var (
newParam api.WatchServiceRequest
newConsumer api.ConsumerAPI
)
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
......@@ -163,8 +168,17 @@ func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.No
continue
}
watcher, err := newPolarisWatcher(&newParam, newConsumer)
if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
......@@ -172,6 +186,7 @@ func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.No
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
watcher.startWatch()
}
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment