Skip to content
Snippets Groups Projects
Commit 967f1fdd authored by flycash's avatar flycash
Browse files

Fix ZK BUG

parent 8b065479
No related branches found
No related tags found
No related merge requests found
......@@ -29,8 +29,10 @@ var (
metadataServiceProxyFactoryMap = make(map[string]func() service.MetadataServiceProxyFactory)
)
type MetadataServiceProxyFactoryFunc func() service.MetadataServiceProxyFactory
// SetMetadataServiceProxyFactory store the name-creator pair
func SetMetadataServiceProxyFactory(name string, creator func() service.MetadataServiceProxyFactory) {
func SetMetadataServiceProxyFactory(name string, creator MetadataServiceProxyFactoryFunc) {
metadataServiceProxyFactoryMap[name] = creator
}
......
......@@ -116,12 +116,12 @@ func TestRefresh(t *testing.T) {
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig:baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
References: baseMockRef,
Registries: baseRegistries,
References: baseMockRef,
ShutdownConfig: &ShutdownConfig{
Timeout: "12s",
StepTimeout: "2s",
......@@ -150,12 +150,12 @@ func TestAppExternalRefresh(t *testing.T) {
mockMap["dubbo.consumer.check"] = "true"
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig:baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
References: baseMockRef,
Registries: baseRegistries,
References: baseMockRef,
}
c.SetFatherConfig(father)
......@@ -178,12 +178,12 @@ func TestAppExternalWithoutIDRefresh(t *testing.T) {
mockMap["dubbo.consumer.check"] = "true"
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig:baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
References: baseMockRef,
Registries: baseRegistries,
References: baseMockRef,
}
c.SetFatherConfig(father)
......@@ -208,13 +208,13 @@ func TestRefreshSingleRegistry(t *testing.T) {
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig: baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: map[string]*RegistryConfig{},
Registry: &RegistryConfig{},
References: baseMockRef,
Registries: map[string]*RegistryConfig{},
Registry: &RegistryConfig{},
References: baseMockRef,
}
c.SetFatherConfig(father)
......@@ -242,7 +242,7 @@ func TestRefreshProvider(t *testing.T) {
BaseConfig: BaseConfig{
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
Registries: baseRegistries,
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
......
......@@ -148,9 +148,9 @@ func loadConsumerConfig() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
// errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
// logger.Error(errMsg)
// panic(errMsg)
}
time.Sleep(time.Second * 1)
break
......
......@@ -158,7 +158,7 @@ func (e *etcdV3ServiceDiscovery) GetInstances(serviceName string) []registry.Ser
}
return serviceInstances
}
perrors.New(fmt.Sprintf("could not getChildrenKVList the err is:%v", err))
logger.Infof("could not getChildrenKVList the err is:%v", err)
}
return make([]registry.ServiceInstance, 0, 0)
......
......@@ -20,8 +20,6 @@ package event
import (
"reflect"
"testing"
"github.com/apache/dubbo-go/metadata/mapping"
)
import (
......@@ -36,6 +34,7 @@ import (
"github.com/apache/dubbo-go/common/observer"
dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/apache/dubbo-go/registry"
)
......
......@@ -300,6 +300,7 @@ func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool
err := zksd.DispatchEventByServiceName(serviceName)
if err != nil {
logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
return false
}
return true
}
......
......@@ -18,8 +18,6 @@
package zookeeper
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"strconv"
"sync"
"testing"
......@@ -31,6 +29,8 @@ import (
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
......
......@@ -60,11 +60,5 @@ func (consulAgent *ConsulAgent) Close() error {
if err != nil {
return err
}
err = os.RemoveAll(consulAgent.dataDir)
if err != nil {
return err
}
return nil
return os.RemoveAll(consulAgent.dataDir)
}
......@@ -156,7 +156,7 @@ func ValidateZookeeperClient(container ZkClientFacade, opts ...Option) error {
}
if connected {
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
logger.Infof("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
}
......@@ -433,6 +433,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
// CreateTempWithValue will create the node recursively, which means that if the parent node is absent,
// it will create parent node first,and set value in last child path
// If the path exist, it will update data
func (z *ZookeeperClient) CreateTempWithValue(basePath string, value []byte) error {
var (
err error
......@@ -453,6 +454,9 @@ func (z *ZookeeperClient) CreateTempWithValue(basePath string, value []byte) err
// last child need be ephemeral
if i == length-1 {
_, err = conn.Create(tmpPath, value, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == zk.ErrNodeExists {
return err
}
} else {
_, err = conn.Create(tmpPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
}
......
......@@ -22,6 +22,8 @@ import (
"path"
"strings"
"sync"
"github.com/dubbogo/go-zookeeper/zk"
)
import (
......@@ -71,6 +73,16 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
return err
}
err = sd.client.CreateTempWithValue(path, data)
if err == zk.ErrNodeExists {
_, state, _ := sd.client.GetContent(path)
if state != nil {
_, err = sd.client.SetContent(path, data, state.Version+1)
if err != nil {
logger.Debugf("Try to update the node data failed. In most cases, it's not a problem. ")
}
}
return nil
}
if err != nil {
return err
}
......
......@@ -18,7 +18,6 @@
package zookeeper
import (
"github.com/apache/dubbo-go/common"
"sync"
)
import (
......@@ -27,6 +26,7 @@ import (
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
......@@ -34,8 +34,8 @@ type ZkClientFacade interface {
ZkClient() *ZookeeperClient
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
Done() chan struct{} //for zk client control
WaitGroup() *sync.WaitGroup // for wait group control, zk client listener & zk client container
Done() chan struct{} // for zk client control
RestartCallBack() bool
GetUrl() common.URL
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment