diff --git a/common/constant/key.go b/common/constant/key.go index 7c45a1397d8767510f1f8b92f4e82f0ece05c810..7a9eb683a8eaafa54967bb0ffac595ce396aee88 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -169,6 +169,10 @@ const ( NACOS_USERNAME = "username" ) +const ( + FILE_KEY = "file" +) + const ( ZOOKEEPER_KEY = "zookeeper" ) diff --git a/common/url.go b/common/url.go index c355857b9f0bd1d003172035bc84c873229442e8..1b6e826a088f3216cac7c5ba55d6b66e1469c096 100644 --- a/common/url.go +++ b/common/url.go @@ -396,6 +396,17 @@ func (c *URL) AddParam(key string, value string) { c.params.Add(key, value) } +// AddParamAvoidNil will add key-value pair +// Not thread-safe +// think twice before using it. +func (c *URL) AddParamAvoidNil(key string, value string) { + if c.params == nil { + c.params = url.Values{} + } + + c.params.Add(key, value) +} + // SetParam will put the key-value pair into url // it's not thread safe. // think twice before you want to use this method diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index 8030a2c800c67d47a27e7aaa5d6f1bb39a83cdc9..ac5328c27a95425333276be58e6dd614e23bb5ac 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -45,6 +45,7 @@ const ( ) type apolloConfiguration struct { + cc.BaseDynamicConfiguration url *common.URL listeners sync.Map diff --git a/config_center/base_dynamic_configuration.go b/config_center/base_dynamic_configuration.go new file mode 100644 index 0000000000000000000000000000000000000000..3d6757852ad83d54338b721d0cb617772f40b6b7 --- /dev/null +++ b/config_center/base_dynamic_configuration.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config_center + +// BaseDynamicConfiguration will default implementation DynamicConfiguration some method +type BaseDynamicConfiguration struct { +} + +// RemoveConfig +func (bdc *BaseDynamicConfiguration) RemoveConfig(string, string) error { + return nil +} diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 540febc9d38e164afcc62538478df140b7d671c7..a80a9aeb65a6ae0c2e00c5bff4b1c0752d60e5e4 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -57,6 +57,8 @@ type DynamicConfiguration interface { // PublishConfig will publish the config with the (key, group, value) pair PublishConfig(string, string, string) error + // PublishConfig will remove the config white the (key, group) pair + RemoveConfig(string, string) error // GetConfigKeysByGroup will return all keys with the group GetConfigKeysByGroup(group string) (*gxset.HashSet, error) diff --git a/config_center/file/factory.go b/config_center/file/factory.go index 5cc1e2805c3736bc186c61df2aaaaca55c560a85..098ecaa5486eb16bc0598736f1aa03c8691d538f 100644 --- a/config_center/file/factory.go +++ b/config_center/file/factory.go @@ -19,13 +19,14 @@ package file import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center/parser" ) func init() { - extension.SetConfigCenterFactory("file", func() config_center.DynamicConfigurationFactory { return &fileDynamicConfigurationFactory{} }) + extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory { return &fileDynamicConfigurationFactory{} }) } type fileDynamicConfigurationFactory struct { @@ -38,5 +39,4 @@ func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.UR } dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) return dynamicConfiguration, err - } diff --git a/config_center/file/impl.go b/config_center/file/impl.go index e0469682f1cae5604d85c35692c5f58666169291..defd180c8eb9130ea20770ecd93d78a8fd976d13 100644 --- a/config_center/file/impl.go +++ b/config_center/file/impl.go @@ -18,109 +18,287 @@ package file import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" + "bytes" + "errors" + "io/ioutil" "os" + "os/exec" + "os/user" + "path" "path/filepath" + "runtime" "strings" - "sync" ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center/parser" ) import ( gxset "github.com/dubbogo/gost/container/set" + perrors "github.com/pkg/errors" ) -type fileSystemDynamicConfiguration struct { +const ( + PARAM_NAME_PREFIX = "dubbo.config-center." + CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir" + CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding" + DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8" +) + +// FileSystemDynamicConfiguration +type FileSystemDynamicConfiguration struct { + config_center.BaseDynamicConfiguration url *common.URL - rootDirectory os.File // the root directory for config center - keyListeners sync.Map - encoding string rootPath string + encoding string + cacheListener *CacheListener + parser parser.ConfigurationParser } -func newFileSystemDynamicConfiguration(url *common.URL) (*fileSystemDynamicConfiguration, error) { - c := &fileSystemDynamicConfiguration{ +func newFileSystemDynamicConfiguration(url *common.URL) (*FileSystemDynamicConfiguration, error) { + encode := url.GetParam(CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING) + + root := url.GetParam(CONFIG_CENTER_DIR_PARAM_NAME, "") + var c *FileSystemDynamicConfiguration + if _, err := os.Stat(root); err != nil { + // not exist, use default, /XXX/xx/.dubbo/config-center + if rp, err := Home(); err != nil { + return nil, perrors.WithStack(err) + } else { + root = path.Join(rp, ".dubbo", "config-center") + } + } + + if _, err := os.Stat(root); err != nil { + // it must be dir, if not exist, will create + if err = createDir(root); err != nil { + return nil, perrors.WithStack(err) + } + } + + c = &FileSystemDynamicConfiguration{ url: url, - rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + rootPath: root, + encoding: encode, } + c.cacheListener = NewCacheListener(c.rootPath) + return c, nil } -func (fsdc *fileSystemDynamicConfiguration) Parser() parser.ConfigurationParser { - return nil +func (fsdc *FileSystemDynamicConfiguration) RootPath() string { + return fsdc.rootPath } -func (fsdc *fileSystemDynamicConfiguration) SetParser(parser.ConfigurationParser) { +// Parser Get Parser +func (fsdc *FileSystemDynamicConfiguration) Parser() parser.ConfigurationParser { + return fsdc.parser } -func (fsdc *fileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { - fsdc.addListener(key, listener) + +// SetParser Set Parser +func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationParser) { + fsdc.parser = p +} + +// AddListener Add listener +func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + + path := fsdc.GetPath(key, tmpOpts.Group) + + fsdc.cacheListener.AddListener(path, listener) } -func (fsdc *fileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { +// RemoveListener Remove listener +func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + + path := fsdc.GetPath(key, tmpOpts.Group) + + fsdc.cacheListener.RemoveListener(path, listener) } // GetProperties get properties file -func (fsdc *fileSystemDynamicConfiguration) GetProperties(string, ...config_center.Option) (string, error) { - return "", nil +func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + + path := fsdc.GetPath(key, tmpOpts.Group) + if file, err := ioutil.ReadFile(path); err != nil { + return "", perrors.WithStack(err) + } else { + return string(file), nil + } } // GetRule get Router rule properties file -func (fsdc *fileSystemDynamicConfiguration) GetRule(string, ...config_center.Option) (string, error) { - return "", nil +func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { + return fsdc.GetProperties(key, opts...) } // GetInternalProperty get value by key in Default properties file(dubbo.properties) -func (fsdc *fileSystemDynamicConfiguration) GetInternalProperty(string, ...config_center.Option) (string, error) { - return "", nil +func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { + return fsdc.GetProperties(key, opts...) } // PublishConfig will publish the config with the (key, group, value) pair -func (fsdc *fileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error { - path := fsdc.buildPath(key, group) - return write(path, value) +func (fsdc *FileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error { + path := fsdc.GetPath(key, group) + return fsdc.write2File(path, value) } -func (fsdc *fileSystemDynamicConfiguration) buildPath(key string, group string) string { - return strings.Join([]string{fsdc.rootPath, group, key}, "/") +// GetConfigKeysByGroup will return all keys with the group +func (fsdc *FileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { + path := fsdc.GetPath("", group) + r := gxset.NewSet() + + fileInfo, _ := ioutil.ReadDir(path) + + for _, file := range fileInfo { + // list file + if !file.IsDir() { + r.Add(file.Name()) + } + } + + return r, nil } -func write(path string, value string) error { - return nil +// RemoveConfig will remove the config whit hte (key, group) +func (fsdc *FileSystemDynamicConfiguration) RemoveConfig(key string, group string) error { + path := fsdc.GetPath(key, group) + _, err := fsdc.deleteDelay(path) + return err } -// GetConfigKeysByGroup will return all keys with the group -func (fsdc *fileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { - return nil, nil +// Close close file watcher +func (fsdc *FileSystemDynamicConfiguration) Close() error { + return fsdc.cacheListener.watch.Close() } -// RemoveConfig will remove the config whit hte (key, group) -func (fsdc *fileSystemDynamicConfiguration) RemoveConfig(string, string) error { - return nil +func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) string { + if len(key) == 0 { + return path.Join(fsdc.rootPath, group) + } + + if len(group) == 0 { + group = config_center.DEFAULT_GROUP + } + + return path.Join(fsdc.rootPath, group, key) } -func (fsdc *fileSystemDynamicConfiguration) GetConfigGroups() *gxset.HashSet { - var cg []string - filepath.Walk(fsdc.rootDirectory.Name(), func(path string, info os.FileInfo, err error) error { - if info.IsDir() { - cg = append(cg, info.Name()) - } +func (fsdc *FileSystemDynamicConfiguration) deleteDelay(path string) (bool, error) { + if path == "" { + return false, nil + } - return nil - }) + if err := os.RemoveAll(path); err != nil { + return false, err + } - return gxset.NewSet(cg) + return true, nil } -func (fsdc *fileSystemDynamicConfiguration) getRootDirectory() os.File { - return fsdc.rootDirectory +func (fsdc *FileSystemDynamicConfiguration) write2File(fp string, value string) error { + if err := forceMkdirParent(fp); err != nil { + return perrors.WithStack(err) + } + + return ioutil.WriteFile(fp, []byte(value), os.ModePerm) } -func (fsdc *fileSystemDynamicConfiguration) ConfigFile(key string, group string) *os.File { +func forceMkdirParent(fp string) error { + pd := getParentDirectory(fp) + + return createDir(pd) +} + +func createDir(path string) error { + // create dir, chmod is drwxrwxrwx(0777) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return err + } + return nil } + +func getParentDirectory(fp string) string { + return substr(fp, 0, strings.LastIndex(fp, string(filepath.Separator))) +} + +func substr(s string, pos, length int) string { + runes := []rune(s) + l := pos + length + if l > len(runes) { + l = len(runes) + } + return string(runes[pos:l]) +} + +// Home returns the home directory for the executing user. +// +// This uses an OS-specific method for discovering the home directory. +// An error is returned if a home directory cannot be detected. +func Home() (string, error) { + user, err := user.Current() + if nil == err { + return user.HomeDir, nil + } + + // cross compile support + if "windows" == runtime.GOOS { + return homeWindows() + } + + // Unix-like system, so just assume Unix + return homeUnix() +} + +func homeUnix() (string, error) { + // First prefer the HOME environmental variable + if home := os.Getenv("HOME"); home != "" { + return home, nil + } + + // If that fails, try the shell + var stdout bytes.Buffer + cmd := exec.Command("sh", "-c", "eval echo ~$USER") + cmd.Stdout = &stdout + if err := cmd.Run(); err != nil { + return "", err + } + + result := strings.TrimSpace(stdout.String()) + if result == "" { + return "", errors.New("blank output when reading home directory") + } + + return result, nil +} + +func homeWindows() (string, error) { + drive := os.Getenv("HOMEDRIVE") + path := os.Getenv("HOMEPATH") + home := drive + path + if drive == "" || path == "" { + home = os.Getenv("USERPROFILE") + } + if home == "" { + return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank") + } + + return home, nil +} diff --git a/config_center/file/impl_test.go b/config_center/file/impl_test.go index e4924db48aa483500205cef3c452a89338adcc8a..63c7d00bca6fa2a0e198520f844785d26e7dbe34 100644 --- a/config_center/file/impl_test.go +++ b/config_center/file/impl_test.go @@ -19,29 +19,149 @@ package file import ( "fmt" + "os" "sync" "testing" + "time" ) import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" ) +import ( + "github.com/stretchr/testify/assert" +) + const ( - dubboPropertyFileName = "dubbo.properties" + key = "com.dubbo.go" ) -func TestListener(t *testing.T) { - fsdc := &fileSystemDynamicConfiguration{ - rootPath: "/Users/tc/Documents/workspace_2020/dubbo/dubbo-common/target/test-classes/config-center", +func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) { + urlString := "registry://127.0.0.1:2181" + regurl, err := common.NewURL(urlString) + assert.NoError(t, err) + dc, err := extension.GetConfigCenterFactory("file").GetDynamicConfiguration(®url) + assert.NoError(t, err) + + return dc.(*FileSystemDynamicConfiguration), err +} + +func TestPublishAndGetConfig(t *testing.T) { + file, err := initFileData(t) + assert.NoError(t, err) + if err := file.PublishConfig(key, "", "A"); err != nil { + t.Fatal(err) + } + + if prop, err := file.GetProperties(key); err != nil { + assert.Equal(t, "A", prop) } + defer destroy(t, file.rootPath, file) +} + +func TestAddListener(t *testing.T) { + file, err := initFileData(t) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + listener := &mockDataListener{} + file.AddListener(key, listener, config_center.WithGroup(group)) + + listener.wg.Add(1) + value = "Test Value 2" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + listener.wg.Add(1) + value = "Test Value 3" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + listener.wg.Wait() + + time.Sleep(time.Second) + defer destroy(t, file.rootPath, file) +} + +func TestAddAndRemoveListener(t *testing.T) { + file, err := initFileData(t) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + listener := &mockDataListener{} + file.AddListener(key, listener, config_center.WithGroup(group)) + listener.wg.Add(1) - fsdc.addListener("abc-def-ghi", listener) + value = "Test Value 2" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + // sleep, make sure callback run success, do `l.wg.Done()` + time.Sleep(time.Second) + file.RemoveListener(key, listener, config_center.WithGroup(group)) + + listener.wg.Add(1) + value = "Test Value 3" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + listener.wg.Done() listener.wg.Wait() - fsdc.close() + + time.Sleep(time.Second) + defer destroy(t, file.rootPath, file) +} + +func TestGetConfigKeysByGroup(t *testing.T) { + file, err := initFileData(t) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + gs, err := file.GetConfigKeysByGroup(group) + assert.NoError(t, err) + assert.Equal(t, 1, gs.Size()) + defer destroy(t, file.rootPath, file) +} + +func TestGetConfig(t *testing.T) { + file, err := initFileData(t) + assert.NoError(t, err) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + prop, err := file.GetProperties(key, config_center.WithGroup(group)) + assert.NoError(t, err) + assert.Equal(t, value, prop) + defer destroy(t, file.rootPath, file) +} + +func TestPublishConfig(t *testing.T) { + file, err := initFileData(t) + assert.NoError(t, err) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + prop, err := file.GetInternalProperty(key, config_center.WithGroup(group)) + assert.NoError(t, err) + assert.Equal(t, value, prop) + defer destroy(t, file.rootPath, file) +} + +func destroy(t *testing.T, path string, fdc *FileSystemDynamicConfiguration) { + if err := os.RemoveAll(path); err != nil { + t.Error(err) + } + fdc.Close() } type mockDataListener struct { diff --git a/config_center/file/listener.go b/config_center/file/listener.go index 2ca473343a3cbe0aba7b13257c27a9c6bfd2a81f..6a0c012609983d35799b6bd40ca5c35b1f98b012 100644 --- a/config_center/file/listener.go +++ b/config_center/file/listener.go @@ -18,144 +18,123 @@ package file import ( - "bytes" - "errors" - "os" - "os/exec" - "os/user" - "runtime" - "strings" + "io/ioutil" "sync" ) import ( - "github.com/fsnotify/fsnotify" -) - -import ( + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" ) -var ( - watchStartOnce = sync.Once{} - watch *fsnotify.Watcher - callback map[string]config_center.ConfigurationListener - //path = strings.Join([]string{h, ".dubbo", "registry"}, string(filepath.Separator)) +import ( + "github.com/fsnotify/fsnotify" ) -func init() { +type CacheListener struct { + watch *fsnotify.Watcher + keyListeners sync.Map + rootPath string +} + +// NewCacheListener creates a new CacheListener +func NewCacheListener(rootPath string) *CacheListener { + cl := &CacheListener{rootPath: rootPath} + // start watcher watch, err := fsnotify.NewWatcher() if err != nil { logger.Errorf("file : listen config fail, error:%v ", err) - return } - - watchStartOnce.Do(func() { - go func() { - for { - select { - case event := <-watch.Events: - if event.Op&fsnotify.Write == fsnotify.Write { - + go func() { + for { + select { + case event := <-watch.Events: + key := event.Name + logger.Debugf("watcher %s, event %v", cl.rootPath, event) + if event.Op&fsnotify.Write == fsnotify.Write { + if l, ok := cl.keyListeners.Load(key); ok { + allCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeUpdate) } - if event.Op&fsnotify.Create == fsnotify.Create { - + } + if event.Op&fsnotify.Create == fsnotify.Create { + if l, ok := cl.keyListeners.Load(key); ok { + allCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeAdd) } - if event.Op&fsnotify.Remove == fsnotify.Remove { - + } + if event.Op&fsnotify.Remove == fsnotify.Remove { + if l, ok := cl.keyListeners.Load(key); ok { + allCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel) } - case err := <-watch.Errors: - logger.Errorf("file : listen watch fail:", err) } + case err := <-watch.Errors: + logger.Errorf("file : listen watch fail:", err) } - }() - }) -} - -func (fsdc *fileSystemDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { - path := fsdc.buildPath(key, config_center.DEFAULT_GROUP) - _, loaded := fsdc.keyListeners.Load(path) - if !loaded { - if err := watch.Add(path); err != nil { - logger.Errorf("file : listen watch: %s add fail:", key, err) - } else { - fsdc.keyListeners.Store(key, listener) } - } else { - logger.Infof("profile:%s. this profile is already listening", key) - } -} + }() + cl.watch = watch -func (fsdc *fileSystemDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { - path := fsdc.buildPath(key, config_center.DEFAULT_GROUP) - _, loaded := fsdc.keyListeners.Load(path) - if !loaded { - if err := watch.Remove(path); err != nil { - logger.Errorf("file : listen watch: %s remove fail:", key, err) - } else { - fsdc.keyListeners.Delete(path) - } - } else { - logger.Infof("profile:%s. this profile is not exist", key) - } -} + extension.AddCustomShutdownCallback(func() { + cl.watch.Close() + }) -func (fsdc *fileSystemDynamicConfiguration) close() { - watch.Close() + return cl } -// Home returns the home directory for the executing user. -// -// This uses an OS-specific method for discovering the home directory. -// An error is returned if a home directory cannot be detected. -func Home() (string, error) { - user, err := user.Current() - if nil == err { - return user.HomeDir, nil +func allCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) { + if len(lmap) == 0 { + logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event) } - - // cross compile support - if "windows" == runtime.GOOS { - return homeWindows() + c := getFileContent(key) + for l := range lmap { + callback(l, key, c, event) } +} - // Unix-like system, so just assume Unix - return homeUnix() +func callback(listener config_center.ConfigurationListener, path, data string, event remoting.EventType) { + listener.Process(&config_center.ConfigChangeEvent{Key: path, Value: data, ConfigType: event}) } -func homeUnix() (string, error) { - // First prefer the HOME environmental variable - if home := os.Getenv("HOME"); home != "" { - return home, nil - } +func (cl *CacheListener) Close() { + cl.keyListeners.Range(func(key, value interface{}) bool { + cl.keyListeners.Delete(key) + return true + }) + cl.watch.Close() +} - // If that fails, try the shell - var stdout bytes.Buffer - cmd := exec.Command("sh", "-c", "eval echo ~$USER") - cmd.Stdout = &stdout - if err := cmd.Run(); err != nil { - return "", err +// AddListener will add a listener if loaded +func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { + // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure + // make a map[your type]struct{} like set in java + listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}}) + if loaded { + listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{} + cl.keyListeners.Store(key, listeners) + } else { + if err := cl.watch.Add(key); err != nil { + logger.Errorf("watcher add path:%s err:%v", key, err) + } } +} - result := strings.TrimSpace(stdout.String()) - if result == "" { - return "", errors.New("blank output when reading home directory") +// RemoveListener will delete a listener if loaded +func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) { + listeners, loaded := cl.keyListeners.Load(key) + if loaded { + delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener) + if err := cl.watch.Remove(key); err != nil { + logger.Errorf("watcher remove path:%s err:%v", key, err) + } } - - return result, nil } -func homeWindows() (string, error) { - drive := os.Getenv("HOMEDRIVE") - path := os.Getenv("HOMEPATH") - home := drive + path - if drive == "" || path == "" { - home = os.Getenv("USERPROFILE") - } - if home == "" { - return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank") +func getFileContent(path string) string { + if c, err := ioutil.ReadFile(path); err != nil { + logger.Errorf("read file path:%s err:%v", path, err) + return "" + } else { + return string(c) } - - return home, nil } diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 8fe0a251239f7bfc6a3f70c3834da1b3af8484ba..9bebd600c6ba9e09f172f9260a920b6572fa694c 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -98,6 +98,7 @@ func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.Ha // MockDynamicConfiguration uses to parse content and defines listener type MockDynamicConfiguration struct { + BaseDynamicConfiguration parser parser.ConfigurationParser content string listener map[string]ConfigurationListener diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index bbf707b93811663d0a259c6704e1008bfa91c5c1..be94b9a2e356b797b580ef4861895259ca7a4315 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -47,6 +47,7 @@ const ( // nacosDynamicConfiguration is the implementation of DynamicConfiguration based on nacos type nacosDynamicConfiguration struct { + config_center.BaseDynamicConfiguration url *common.URL rootPath string wg sync.WaitGroup diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index ef579eb2d11cf5f5bafb132c3e201c12ee7845c0..485abcb5f0403050f0ed4797d8cce408a23eed62 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -44,6 +44,7 @@ const ( ) type zookeeperDynamicConfiguration struct { + config_center.BaseDynamicConfiguration url *common.URL rootPath string wg sync.WaitGroup diff --git a/registry/file/listener.go b/registry/file/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..3fe7400226067f1232ed7993b1fe1b5575b870df --- /dev/null +++ b/registry/file/listener.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import "github.com/apache/dubbo-go/config_center" + +// RegistryConfigurationListener represent the processor of flie watcher +type RegistryConfigurationListener struct { +} + +// Process submit the ConfigChangeEvent to the event chan to notify all observer +func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { + +} diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..8c0c2b93a2716490a6edeba849f5efb83af92350 --- /dev/null +++ b/registry/file/service_discovery.go @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path" + "strconv" + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/file" + "github.com/apache/dubbo-go/registry" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" + perrors "github.com/pkg/errors" +) + +var ( + // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition + instanceMap = make(map[string]registry.ServiceDiscovery, 16) + initLock sync.Mutex +) + +// init will put the service discovery into extension +func init() { + extension.SetServiceDiscovery(constant.FILE_KEY, newFileSystemServiceDiscovery) +} + +// fileServiceDiscovery is the implementation of service discovery based on file. +type fileSystemServiceDiscovery struct { + dynamicConfiguration file.FileSystemDynamicConfiguration + rootPath string + fileMap map[string]string +} + +func newFileSystemServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + instance, ok := instanceMap[name] + if ok { + return instance, nil + } + + initLock.Lock() + defer initLock.Unlock() + + // double check + instance, ok = instanceMap[name] + if ok { + return instance, nil + } + + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || sdc.Protocol != constant.FILE_KEY { + return nil, perrors.New("could not init the instance because the config is invalid") + } + + if rp, err := file.Home(); err != nil { + return nil, perrors.WithStack(err) + } else { + fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY) + p := path.Join(rp, ".dubbo", "registry") + url, _ := common.NewURL("") + url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p) + if c, err := fdcf.GetDynamicConfiguration(&url); err != nil { + return nil, perrors.New("could not find the config for name: " + name) + } else { + sd := &fileSystemServiceDiscovery{ + dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration), + rootPath: p, + fileMap: make(map[string]string), + } + + extension.AddCustomShutdownCallback(func() { + sd.Destroy() + }) + + for _, v := range sd.GetServices().Values() { + for _, i := range sd.GetInstances(v.(string)) { + // like java do nothing + l := &RegistryConfigurationListener{} + sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i))) + } + } + + return sd, nil + } + } +} + +// nolint +func (fssd *fileSystemServiceDiscovery) String() string { + return fmt.Sprintf("file-system-service-discovery") +} + +// Destroy will destroy the service discovery. +// If the discovery cannot be destroy, it will return an error. +func (fssd *fileSystemServiceDiscovery) Destroy() error { + fssd.dynamicConfiguration.Close() + + for _, f := range fssd.fileMap { + fssd.releaseAndRemoveRegistrationFiles(f) + } + + return nil +} + +// nolint +func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) { + os.Remove(file) +} + +// ----------------- registration ---------------- + +// Register will register an instance of ServiceInstance to registry +func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error { + id := getServiceInstanceId(instance) + sn := getServiceName(instance) + if c, err := getContent(instance); err != nil { + return err + } else { + if err := fssd.dynamicConfiguration.PublishConfig(id, sn, c); err != nil { + return perrors.WithStack(err) + } else { + fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn) + } + } + + return nil +} + +// nolint +func getServiceInstanceId(si registry.ServiceInstance) string { + if si.GetId() == "" { + return si.GetHost() + "." + strconv.Itoa(si.GetPort()) + } + + return si.GetId() +} + +// nolint +func getServiceName(si registry.ServiceInstance) string { + return si.GetServiceName() +} + +// getContent json string +func getContent(si registry.ServiceInstance) (string, error) { + if bytes, err := json.Marshal(si); err != nil { + return "", err + } else { + return string(bytes), nil + } +} + +// Update will update the data of the instance in registry +func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error { + return fssd.Register(instance) +} + +// Unregister will unregister this instance from registry +func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + id := getServiceInstanceId(instance) + sn := getServiceName(instance) + if err := fssd.dynamicConfiguration.RemoveConfig(id, sn); err != nil { + return err + } else { + delete(fssd.fileMap, instance.GetId()) + return nil + } +} + +// ----------------- discovery ------------------- +// GetDefaultPageSize will return the default page size +func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int { + return 100 +} + +// GetServices will return the all service names. +func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet { + r := gxset.NewSet() + // dynamicConfiguration root path is the actual root path + fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath()) + + for _, file := range fileInfo { + if file.IsDir() { + r.Add(file.Name()) + } + } + + return r +} + +// GetInstances will return all service instances with serviceName +func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + if set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName); err != nil { + logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ", + serviceName, err) + + return make([]registry.ServiceInstance, 0, 0) + } else { + si := make([]registry.ServiceInstance, 0, set.Size()) + for _, v := range set.Values() { + id := v.(string) + if p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)); err != nil { + logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+ + "error = err{%v} ", + id, serviceName, err) + } else { + dsi := ®istry.DefaultServiceInstance{} + if err := json.Unmarshal([]byte(p), dsi); err != nil { + logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+ + "error = err{%v} ", + id, serviceName, err) + } else { + si = append(si, dsi) + } + } + } + + return si + } +} + +// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName +// the page will start at offset +func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return nil +} + +// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance. +// The param healthy indices that the instance should be healthy or not. +// The page will start at offset +func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + return nil +} + +// Batch get all instances by the specified service names +func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + return nil +} + +// ----------------- event ---------------------- +// AddListener adds a new ServiceInstancesChangedListener +// client +func (fssd *fileSystemServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + //fssd.dynamicConfiguration.AddListener(listener.ServiceName) + return nil +} + +// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName +func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, fssd.GetInstances(serviceName))) +} + +// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances +func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) +} + +// DispatchEvent dispatches the event +func (fssd *fileSystemServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + extension.GetGlobalDispatcher().Dispatch(event) + return nil +} diff --git a/registry/file/service_discovery_test.go b/registry/file/service_discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c940e33b157cf2b870250332a1b08aaa275339a8 --- /dev/null +++ b/registry/file/service_discovery_test.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "math/rand" + "strconv" + "testing" + "time" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" +) + +import ( + "github.com/stretchr/testify/assert" +) + +var ( + testName = "test" +) + +func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) { + serviceDiscovery, err := newFileSystemServiceDiscovery(testName) + assert.NoError(t, err) + assert.NotNil(t, serviceDiscovery) + defer serviceDiscovery.Destroy() +} + +func TestCURDFileSystemServiceDiscovery(t *testing.T) { + prepareData() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY, testName) + assert.NoError(t, err) + md := make(map[string]string) + + rand.Seed(time.Now().Unix()) + serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) + md["t1"] = "test1" + r1 := ®istry.DefaultServiceInstance{ + Id: "123456789", + ServiceName: serviceName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: md, + } + err = serviceDiscovery.Register(r1) + assert.NoError(t, err) + + instances := serviceDiscovery.GetInstances(r1.ServiceName) + assert.Equal(t, 1, len(instances)) + assert.Equal(t, r1.Id, instances[0].GetId()) + assert.Equal(t, r1.ServiceName, instances[0].GetServiceName()) + assert.Equal(t, r1.Port, instances[0].GetPort()) + + err = serviceDiscovery.Unregister(r1) + assert.NoError(t, err) + + err = serviceDiscovery.Register(r1) + + defer serviceDiscovery.Destroy() +} + +func prepareData() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "file", + } +}