diff --git a/config_center/file/factory.go b/config_center/file/factory.go index 098ecaa5486eb16bc0598736f1aa03c8691d538f..2dda900e20cb7476b1d8da95e4b2b26fcb9dcefd 100644 --- a/config_center/file/factory.go +++ b/config_center/file/factory.go @@ -17,6 +17,10 @@ package file +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -26,17 +30,22 @@ import ( ) func init() { - extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory { return &fileDynamicConfigurationFactory{} }) + extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory { + return &fileDynamicConfigurationFactory{} + }) } type fileDynamicConfigurationFactory struct { } -func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { +// GetDynamicConfiguration Get Configuration with URL +func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, + error) { dynamicConfiguration, err := newFileSystemDynamicConfiguration(url) if err != nil { - return nil, err + return nil, perrors.WithStack(err) } + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) return dynamicConfiguration, err } diff --git a/config_center/file/impl.go b/config_center/file/impl.go index 5293b73eb7b03e0f1b752471814e4a3f4d092c4f..f423b259066e97285f455c622d29e5bd276dd6a0 100644 --- a/config_center/file/impl.go +++ b/config_center/file/impl.go @@ -106,7 +106,8 @@ func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationPars } // AddListener Add listener -func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) { +func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, + opts ...config_center.Option) { tmpOpts := &config_center.Options{} for _, opt := range opts { opt(tmpOpts) @@ -118,7 +119,8 @@ func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener con } // RemoveListener Remove listener -func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) { +func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, + opts ...config_center.Option) { tmpOpts := &config_center.Options{} for _, opt := range opts { opt(tmpOpts) @@ -137,11 +139,12 @@ func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...co } path := fsdc.GetPath(key, tmpOpts.Group) - if file, err := ioutil.ReadFile(path); err != nil { + file, err := ioutil.ReadFile(path) + if err != nil { return "", perrors.WithStack(err) - } else { - return string(file), nil } + + return string(file), nil } // GetRule get Router rule properties file @@ -150,7 +153,8 @@ func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_c } // GetInternalProperty get value by key in Default properties file(dubbo.properties) -func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { +func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, + error) { return fsdc.GetProperties(key, opts...) } diff --git a/config_center/file/impl_test.go b/config_center/file/impl_test.go index 5f5e648789d92d7a689bcf97eed772d0d16c6387..58892953d556512a88689baa5110995091d75f7b 100644 --- a/config_center/file/impl_test.go +++ b/config_center/file/impl_test.go @@ -113,6 +113,8 @@ func TestGetConfigKeysByGroup(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, gs.Size()) assert.Equal(t, key, gs.Values()[0]) + // remove need wait a moment + time.Sleep(time.Second) defer destroy(file.rootPath, file) } diff --git a/config_center/file/listener.go b/config_center/file/listener.go index fc1facf4dfb2c0ec6b357350666783c2ed19b913..d569030e5ac6a127a862c4d22d180f674cadce2d 100644 --- a/config_center/file/listener.go +++ b/config_center/file/listener.go @@ -56,12 +56,14 @@ func NewCacheListener(rootPath string) *CacheListener { logger.Debugf("watcher %s, event %v", cl.rootPath, event) if event.Op&fsnotify.Write == fsnotify.Write { if l, ok := cl.keyListeners.Load(key); ok { - dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeUpdate) + dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, + remoting.EventTypeUpdate) } } if event.Op&fsnotify.Create == fsnotify.Create { if l, ok := cl.keyListeners.Load(key); ok { - dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeAdd) + dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, + remoting.EventTypeAdd) } } if event.Op&fsnotify.Remove == fsnotify.Remove { @@ -89,6 +91,7 @@ func NewCacheListener(rootPath string) *CacheListener { func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) { if len(lmap) == 0 { logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event) + return } for l := range lmap { callback(l, key, "", event) @@ -98,6 +101,7 @@ func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key s func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) { if len(lmap) == 0 { logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event) + return } c := getFileContent(key) for l := range lmap { @@ -123,33 +127,36 @@ func (cl *CacheListener) Close() error { func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure // make a map[your type]struct{} like set in java - listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}}) + listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{ + listener: {}}) if loaded { listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{} cl.keyListeners.Store(key, listeners) - } else { - if err := cl.watch.Add(key); err != nil { - logger.Errorf("watcher add path:%s err:%v", key, err) - } + return + } + if err := cl.watch.Add(key); err != nil { + logger.Errorf("watcher add path:%s err:%v", key, err) } } // RemoveListener will delete a listener if loaded func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) { listeners, loaded := cl.keyListeners.Load(key) - if loaded { - delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener) - if err := cl.watch.Remove(key); err != nil { - logger.Errorf("watcher remove path:%s err:%v", key, err) - } + if !loaded { + return + } + delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener) + if err := cl.watch.Remove(key); err != nil { + logger.Errorf("watcher remove path:%s err:%v", key, err) } } func getFileContent(path string) string { - if c, err := ioutil.ReadFile(path); err != nil { + c, err := ioutil.ReadFile(path) + if err != nil { logger.Errorf("read file path:%s err:%v", path, err) return "" - } else { - return string(c) } + + return string(c) } diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go index d10de1818093ba4dffdc9d30a48cbb6ea475bdf2..02792f767aca727dc3c6de8a1d0038428796749c 100644 --- a/registry/file/service_discovery.go +++ b/registry/file/service_discovery.go @@ -82,37 +82,39 @@ func newFileSystemServiceDiscovery(name string) (registry.ServiceDiscovery, erro return nil, perrors.New("could not init the instance because the config is invalid") } - if rp, err := file.Home(); err != nil { + rp, err := file.Home() + if err != nil { return nil, perrors.WithStack(err) - } else { - fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY) - p := path.Join(rp, ".dubbo", "registry") - url, _ := common.NewURL("") - url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p) - if c, err := fdcf.GetDynamicConfiguration(&url); err != nil { - return nil, perrors.New("could not find the config for name: " + name) - } else { - sd := &fileSystemServiceDiscovery{ - dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration), - rootPath: p, - fileMap: make(map[string]string), - } - - extension.AddCustomShutdownCallback(func() { - sd.Destroy() - }) - - for _, v := range sd.GetServices().Values() { - for _, i := range sd.GetInstances(v.(string)) { - // like java do nothing - l := &RegistryConfigurationListener{} - sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i))) - } - } - - return sd, nil + } + + fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY) + p := path.Join(rp, ".dubbo", "registry") + url, _ := common.NewURL("") + url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p) + c, err := fdcf.GetDynamicConfiguration(&url) + if err != nil { + return nil, perrors.WithStack(err) + } + + sd := &fileSystemServiceDiscovery{ + dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration), + rootPath: p, + fileMap: make(map[string]string), + } + + extension.AddCustomShutdownCallback(func() { + sd.Destroy() + }) + + for _, v := range sd.GetServices().Values() { + for _, i := range sd.GetInstances(v.(string)) { + // like java do nothing + l := &RegistryConfigurationListener{} + sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i))) } } + + return sd, nil } // nolint @@ -143,16 +145,19 @@ func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file s func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error { id := getServiceInstanceId(instance) sn := getServiceName(instance) - if c, err := getContent(instance); err != nil { - return err - } else { - if err := fssd.dynamicConfiguration.PublishConfig(id, sn, c); err != nil { - return perrors.WithStack(err) - } else { - fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn) - } + + c, err := toJsonString(instance) + if err != nil { + return perrors.WithStack(err) + } + + err = fssd.dynamicConfiguration.PublishConfig(id, sn, c) + if err != nil { + return perrors.WithStack(err) } + fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn) + return nil } @@ -170,13 +175,14 @@ func getServiceName(si registry.ServiceInstance) string { return si.GetServiceName() } -// getContent json string -func getContent(si registry.ServiceInstance) (string, error) { - if bytes, err := json.Marshal(si); err != nil { - return "", err - } else { - return string(bytes), nil +// toJsonString to json string +func toJsonString(si registry.ServiceInstance) (string, error) { + bytes, err := json.Marshal(si) + if err != nil { + return "", perrors.WithStack(err) } + + return string(bytes), nil } // Update will update the data of the instance in registry @@ -188,12 +194,14 @@ func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error { id := getServiceInstanceId(instance) sn := getServiceName(instance) - if err := fssd.dynamicConfiguration.RemoveConfig(id, sn); err != nil { - return err - } else { - delete(fssd.fileMap, instance.GetId()) - return nil + + err := fssd.dynamicConfiguration.RemoveConfig(id, sn) + if err != nil { + return perrors.WithStack(err) } + + delete(fssd.fileMap, instance.GetId()) + return nil } // ----------------- discovery ------------------- @@ -219,33 +227,37 @@ func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet { // GetInstances will return all service instances with serviceName func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - if set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName); err != nil { + set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName) + if err != nil { logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ", serviceName, err) - return make([]registry.ServiceInstance, 0, 0) - } else { - si := make([]registry.ServiceInstance, 0, set.Size()) - for _, v := range set.Values() { - id := v.(string) - if p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)); err != nil { - logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+ - "error = err{%v} ", - id, serviceName, err) - } else { - dsi := ®istry.DefaultServiceInstance{} - if err := json.Unmarshal([]byte(p), dsi); err != nil { - logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+ - "error = err{%v} ", - id, serviceName, err) - } else { - si = append(si, dsi) - } - } + } + + res := make([]registry.ServiceInstance, 0, set.Size()) + for _, v := range set.Values() { + id := v.(string) + p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)) + if err != nil { + logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+ + "error = err{%v} ", + id, serviceName, err) + return make([]registry.ServiceInstance, 0, 0) } - return si + dsi := ®istry.DefaultServiceInstance{} + err = json.Unmarshal([]byte(p), dsi) + if err != nil { + logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+ + "error = err{%v} ", + id, serviceName, err) + return make([]registry.ServiceInstance, 0, 0) + } + + res = append(res, dsi) } + + return res } // GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName @@ -257,12 +269,14 @@ func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, o // GetHealthyInstancesByPage will return a page containing instances of ServiceInstance. // The param healthy indices that the instance should be healthy or not. // The page will start at offset -func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { +func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, + healthy bool) gxpage.Pager { return nil } // Batch get all instances by the specified service names -func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { +func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, + requestedSize int) map[string]gxpage.Pager { return nil } @@ -280,7 +294,8 @@ func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName s } // DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances -func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { +func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string, + instances []registry.ServiceInstance) error { return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) }