Skip to content
Snippets Groups Projects
Commit b2e91522 authored by pantianying's avatar pantianying
Browse files

Merge branch 'develop' of https://github.com/apache/dubbo-go into apache-develop

parents d3036ed0 77e5aac4
No related branches found
No related tags found
No related merge requests found
......@@ -20,11 +20,17 @@
// Package filter is a generated GoMock package.
package impl
import (
reflect "reflect"
)
import (
gomock "github.com/golang/mock/gomock"
)
import (
common "github.com/apache/dubbo-go/common"
protocol "github.com/apache/dubbo-go/protocol"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockTpsLimiter is a mock of TpsLimiter interface
......
......@@ -136,10 +136,10 @@ func NewClient(opt Options) *Client {
switch {
case opt.ConnectTimeout == 0:
opt.ConnectTimeout = 3e9
opt.ConnectTimeout = 3 * time.Second
fallthrough
case opt.RequestTimeout == 0:
opt.RequestTimeout = 3e9
opt.RequestTimeout = 3 * time.Second
}
c := &Client{
......
......@@ -42,8 +42,8 @@ var (
type DubboInvoker struct {
protocol.BaseInvoker
client *Client
destroyLock sync.Mutex
client *Client
quitOnce sync.Once
}
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
......@@ -97,19 +97,11 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
func (di *DubboInvoker) Destroy() {
if di.IsDestroyed() {
return
}
di.destroyLock.Lock()
defer di.destroyLock.Unlock()
if di.IsDestroyed() {
return
}
di.quitOnce.Do(func() {
di.BaseInvoker.Destroy()
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close() // close client
}
if di.client != nil {
di.client.Close()
}
})
}
......@@ -47,7 +47,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
}
bcl.defaultConfiguratorFunc = f
bcl.dynamicConfiguration.AddListener(key, listener)
if rawConfig, err := bcl.dynamicConfiguration.GetConfig(key, config_center.WithGroup(constant.DUBBO)); err != nil {
if rawConfig, err := bcl.dynamicConfiguration.GetInternalProperty(key, config_center.WithGroup(constant.DUBBO)); err != nil {
//set configurators to empty
bcl.configurators = []config_center.Configurator{}
return
......
......@@ -74,11 +74,12 @@ type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
isClosed bool
}
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.wg.Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
}
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
......@@ -114,6 +115,7 @@ func (l *RegistryConfigurationListener) Close() {
* if the registry is not available, it means that the registry has been destroy
* so we don't need to call Done(), or it will cause the negative count panic for registry.wg
*/
l.isClosed = true
l.registry.wg.Done()
}
}
......
......@@ -46,6 +46,7 @@ import (
const (
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
MaxWaitInterval = time.Duration(3e9)
)
var (
......@@ -403,6 +404,13 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return r.getListener(conf)
}
func sleepWait(n int) {
wait := time.Duration((n + 1) * 2e8)
if wait > MaxWaitInterval {
wait = MaxWaitInterval
}
time.Sleep(wait)
}
//subscribe from registry
func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
......@@ -424,13 +432,11 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for i := 0; ; i++ {
for {
if serviceEvent, err := listener.Next(); err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
if i == 0 {
listener.Close()
break
}
listener.Close()
break
} else {
logger.Infof("update begin, service event: %v", serviceEvent.String())
......@@ -438,7 +444,6 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi
}
}
logger.Infof("wait for get subscribe listener, key{%v}", url.Key())
sleepWait(n)
}
}
......@@ -449,6 +454,10 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
)
r.listenerLock.Lock()
if r.configListener.isClosed {
r.listenerLock.Unlock()
return nil, perrors.New("configListener already been closed")
}
zkListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
......@@ -494,10 +503,3 @@ func (r *zkRegistry) IsAvailable() bool {
return true
}
}
func sleepWait(n int) {
wait := time.Duration(200*n) * time.Millisecond
if wait > 3*time.Second {
wait = 3 * time.Second
}
time.Sleep(wait)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment