Skip to content
Snippets Groups Projects
Commit 678f5056 authored by tiecheng's avatar tiecheng
Browse files

file system service discovery perfect

parent ce868208
No related branches found
No related tags found
No related merge requests found
Showing with 892 additions and 159 deletions
......@@ -169,6 +169,10 @@ const (
NACOS_USERNAME = "username"
)
const (
FILE_KEY = "file"
)
const (
ZOOKEEPER_KEY = "zookeeper"
)
......
......@@ -396,6 +396,17 @@ func (c *URL) AddParam(key string, value string) {
c.params.Add(key, value)
}
// AddParamAvoidNil will add key-value pair
// Not thread-safe
// think twice before using it.
func (c *URL) AddParamAvoidNil(key string, value string) {
if c.params == nil {
c.params = url.Values{}
}
c.params.Add(key, value)
}
// SetParam will put the key-value pair into url
// it's not thread safe.
// think twice before you want to use this method
......
......@@ -45,6 +45,7 @@ const (
)
type apolloConfiguration struct {
cc.BaseDynamicConfiguration
url *common.URL
listeners sync.Map
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
// BaseDynamicConfiguration will default implementation DynamicConfiguration some method
type BaseDynamicConfiguration struct {
}
// RemoveConfig
func (bdc *BaseDynamicConfiguration) RemoveConfig(string, string) error {
return nil
}
......@@ -57,6 +57,8 @@ type DynamicConfiguration interface {
// PublishConfig will publish the config with the (key, group, value) pair
PublishConfig(string, string, string) error
// PublishConfig will remove the config white the (key, group) pair
RemoveConfig(string, string) error
// GetConfigKeysByGroup will return all keys with the group
GetConfigKeysByGroup(group string) (*gxset.HashSet, error)
......
......@@ -19,13 +19,14 @@ package file
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
func init() {
extension.SetConfigCenterFactory("file", func() config_center.DynamicConfigurationFactory { return &fileDynamicConfigurationFactory{} })
extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory { return &fileDynamicConfigurationFactory{} })
}
type fileDynamicConfigurationFactory struct {
......@@ -38,5 +39,4 @@ func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.UR
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
......@@ -18,109 +18,287 @@
package file
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"bytes"
"errors"
"io/ioutil"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"runtime"
"strings"
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)
type fileSystemDynamicConfiguration struct {
const (
PARAM_NAME_PREFIX = "dubbo.config-center."
CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir"
CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding"
DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8"
)
// FileSystemDynamicConfiguration
type FileSystemDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootDirectory os.File // the root directory for config center
keyListeners sync.Map
encoding string
rootPath string
encoding string
cacheListener *CacheListener
parser parser.ConfigurationParser
}
func newFileSystemDynamicConfiguration(url *common.URL) (*fileSystemDynamicConfiguration, error) {
c := &fileSystemDynamicConfiguration{
func newFileSystemDynamicConfiguration(url *common.URL) (*FileSystemDynamicConfiguration, error) {
encode := url.GetParam(CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING)
root := url.GetParam(CONFIG_CENTER_DIR_PARAM_NAME, "")
var c *FileSystemDynamicConfiguration
if _, err := os.Stat(root); err != nil {
// not exist, use default, /XXX/xx/.dubbo/config-center
if rp, err := Home(); err != nil {
return nil, perrors.WithStack(err)
} else {
root = path.Join(rp, ".dubbo", "config-center")
}
}
if _, err := os.Stat(root); err != nil {
// it must be dir, if not exist, will create
if err = createDir(root); err != nil {
return nil, perrors.WithStack(err)
}
}
c = &FileSystemDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
rootPath: root,
encoding: encode,
}
c.cacheListener = NewCacheListener(c.rootPath)
return c, nil
}
func (fsdc *fileSystemDynamicConfiguration) Parser() parser.ConfigurationParser {
return nil
func (fsdc *FileSystemDynamicConfiguration) RootPath() string {
return fsdc.rootPath
}
func (fsdc *fileSystemDynamicConfiguration) SetParser(parser.ConfigurationParser) {
// Parser Get Parser
func (fsdc *FileSystemDynamicConfiguration) Parser() parser.ConfigurationParser {
return fsdc.parser
}
func (fsdc *fileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
fsdc.addListener(key, listener)
// SetParser Set Parser
func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
fsdc.parser = p
}
// AddListener Add listener
func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.AddListener(path, listener)
}
func (fsdc *fileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
// RemoveListener Remove listener
func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.RemoveListener(path, listener)
}
// GetProperties get properties file
func (fsdc *fileSystemDynamicConfiguration) GetProperties(string, ...config_center.Option) (string, error) {
return "", nil
func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
if file, err := ioutil.ReadFile(path); err != nil {
return "", perrors.WithStack(err)
} else {
return string(file), nil
}
}
// GetRule get Router rule properties file
func (fsdc *fileSystemDynamicConfiguration) GetRule(string, ...config_center.Option) (string, error) {
return "", nil
func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) {
return fsdc.GetProperties(key, opts...)
}
// GetInternalProperty get value by key in Default properties file(dubbo.properties)
func (fsdc *fileSystemDynamicConfiguration) GetInternalProperty(string, ...config_center.Option) (string, error) {
return "", nil
func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) {
return fsdc.GetProperties(key, opts...)
}
// PublishConfig will publish the config with the (key, group, value) pair
func (fsdc *fileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error {
path := fsdc.buildPath(key, group)
return write(path, value)
func (fsdc *FileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error {
path := fsdc.GetPath(key, group)
return fsdc.write2File(path, value)
}
func (fsdc *fileSystemDynamicConfiguration) buildPath(key string, group string) string {
return strings.Join([]string{fsdc.rootPath, group, key}, "/")
// GetConfigKeysByGroup will return all keys with the group
func (fsdc *FileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
path := fsdc.GetPath("", group)
r := gxset.NewSet()
fileInfo, _ := ioutil.ReadDir(path)
for _, file := range fileInfo {
// list file
if !file.IsDir() {
r.Add(file.Name())
}
}
return r, nil
}
func write(path string, value string) error {
return nil
// RemoveConfig will remove the config whit hte (key, group)
func (fsdc *FileSystemDynamicConfiguration) RemoveConfig(key string, group string) error {
path := fsdc.GetPath(key, group)
_, err := fsdc.deleteDelay(path)
return err
}
// GetConfigKeysByGroup will return all keys with the group
func (fsdc *fileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
return nil, nil
// Close close file watcher
func (fsdc *FileSystemDynamicConfiguration) Close() error {
return fsdc.cacheListener.watch.Close()
}
// RemoveConfig will remove the config whit hte (key, group)
func (fsdc *fileSystemDynamicConfiguration) RemoveConfig(string, string) error {
return nil
func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) string {
if len(key) == 0 {
return path.Join(fsdc.rootPath, group)
}
if len(group) == 0 {
group = config_center.DEFAULT_GROUP
}
return path.Join(fsdc.rootPath, group, key)
}
func (fsdc *fileSystemDynamicConfiguration) GetConfigGroups() *gxset.HashSet {
var cg []string
filepath.Walk(fsdc.rootDirectory.Name(), func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
cg = append(cg, info.Name())
}
func (fsdc *FileSystemDynamicConfiguration) deleteDelay(path string) (bool, error) {
if path == "" {
return false, nil
}
return nil
})
if err := os.RemoveAll(path); err != nil {
return false, err
}
return gxset.NewSet(cg)
return true, nil
}
func (fsdc *fileSystemDynamicConfiguration) getRootDirectory() os.File {
return fsdc.rootDirectory
func (fsdc *FileSystemDynamicConfiguration) write2File(fp string, value string) error {
if err := forceMkdirParent(fp); err != nil {
return perrors.WithStack(err)
}
return ioutil.WriteFile(fp, []byte(value), os.ModePerm)
}
func (fsdc *fileSystemDynamicConfiguration) ConfigFile(key string, group string) *os.File {
func forceMkdirParent(fp string) error {
pd := getParentDirectory(fp)
return createDir(pd)
}
func createDir(path string) error {
// create dir, chmod is drwxrwxrwx(0777)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
return nil
}
func getParentDirectory(fp string) string {
return substr(fp, 0, strings.LastIndex(fp, string(filepath.Separator)))
}
func substr(s string, pos, length int) string {
runes := []rune(s)
l := pos + length
if l > len(runes) {
l = len(runes)
}
return string(runes[pos:l])
}
// Home returns the home directory for the executing user.
//
// This uses an OS-specific method for discovering the home directory.
// An error is returned if a home directory cannot be detected.
func Home() (string, error) {
user, err := user.Current()
if nil == err {
return user.HomeDir, nil
}
// cross compile support
if "windows" == runtime.GOOS {
return homeWindows()
}
// Unix-like system, so just assume Unix
return homeUnix()
}
func homeUnix() (string, error) {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); home != "" {
return home, nil
}
// If that fails, try the shell
var stdout bytes.Buffer
cmd := exec.Command("sh", "-c", "eval echo ~$USER")
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return "", err
}
result := strings.TrimSpace(stdout.String())
if result == "" {
return "", errors.New("blank output when reading home directory")
}
return result, nil
}
func homeWindows() (string, error) {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home := drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
if home == "" {
return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank")
}
return home, nil
}
......@@ -19,29 +19,149 @@ package file
import (
"fmt"
"os"
"sync"
"testing"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
import (
"github.com/stretchr/testify/assert"
)
const (
dubboPropertyFileName = "dubbo.properties"
key = "com.dubbo.go"
)
func TestListener(t *testing.T) {
fsdc := &fileSystemDynamicConfiguration{
rootPath: "/Users/tc/Documents/workspace_2020/dubbo/dubbo-common/target/test-classes/config-center",
func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) {
urlString := "registry://127.0.0.1:2181"
regurl, err := common.NewURL(urlString)
assert.NoError(t, err)
dc, err := extension.GetConfigCenterFactory("file").GetDynamicConfiguration(&regurl)
assert.NoError(t, err)
return dc.(*FileSystemDynamicConfiguration), err
}
func TestPublishAndGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
if err := file.PublishConfig(key, "", "A"); err != nil {
t.Fatal(err)
}
if prop, err := file.GetProperties(key); err != nil {
assert.Equal(t, "A", prop)
}
defer destroy(t, file.rootPath, file)
}
func TestAddListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
listener.wg.Add(1)
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener.wg.Add(1)
value = "Test Value 3"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener.wg.Wait()
time.Sleep(time.Second)
defer destroy(t, file.rootPath, file)
}
func TestAddAndRemoveListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
listener.wg.Add(1)
fsdc.addListener("abc-def-ghi", listener)
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// sleep, make sure callback run success, do `l.wg.Done()`
time.Sleep(time.Second)
file.RemoveListener(key, listener, config_center.WithGroup(group))
listener.wg.Add(1)
value = "Test Value 3"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener.wg.Done()
listener.wg.Wait()
fsdc.close()
time.Sleep(time.Second)
defer destroy(t, file.rootPath, file)
}
func TestGetConfigKeysByGroup(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
gs, err := file.GetConfigKeysByGroup(group)
assert.NoError(t, err)
assert.Equal(t, 1, gs.Size())
defer destroy(t, file.rootPath, file)
}
func TestGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetProperties(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(t, file.rootPath, file)
}
func TestPublishConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetInternalProperty(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(t, file.rootPath, file)
}
func destroy(t *testing.T, path string, fdc *FileSystemDynamicConfiguration) {
if err := os.RemoveAll(path); err != nil {
t.Error(err)
}
fdc.Close()
}
type mockDataListener struct {
......
......@@ -18,144 +18,123 @@
package file
import (
"bytes"
"errors"
"os"
"os/exec"
"os/user"
"runtime"
"strings"
"io/ioutil"
"sync"
)
import (
"github.com/fsnotify/fsnotify"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
var (
watchStartOnce = sync.Once{}
watch *fsnotify.Watcher
callback map[string]config_center.ConfigurationListener
//path = strings.Join([]string{h, ".dubbo", "registry"}, string(filepath.Separator))
import (
"github.com/fsnotify/fsnotify"
)
func init() {
type CacheListener struct {
watch *fsnotify.Watcher
keyListeners sync.Map
rootPath string
}
// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
cl := &CacheListener{rootPath: rootPath}
// start watcher
watch, err := fsnotify.NewWatcher()
if err != nil {
logger.Errorf("file : listen config fail, error:%v ", err)
return
}
watchStartOnce.Do(func() {
go func() {
for {
select {
case event := <-watch.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
go func() {
for {
select {
case event := <-watch.Events:
key := event.Name
logger.Debugf("watcher %s, event %v", cl.rootPath, event)
if event.Op&fsnotify.Write == fsnotify.Write {
if l, ok := cl.keyListeners.Load(key); ok {
allCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeUpdate)
}
if event.Op&fsnotify.Create == fsnotify.Create {
}
if event.Op&fsnotify.Create == fsnotify.Create {
if l, ok := cl.keyListeners.Load(key); ok {
allCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeAdd)
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
if l, ok := cl.keyListeners.Load(key); ok {
allCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel)
}
case err := <-watch.Errors:
logger.Errorf("file : listen watch fail:", err)
}
case err := <-watch.Errors:
logger.Errorf("file : listen watch fail:", err)
}
}()
})
}
func (fsdc *fileSystemDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) {
path := fsdc.buildPath(key, config_center.DEFAULT_GROUP)
_, loaded := fsdc.keyListeners.Load(path)
if !loaded {
if err := watch.Add(path); err != nil {
logger.Errorf("file : listen watch: %s add fail:", key, err)
} else {
fsdc.keyListeners.Store(key, listener)
}
} else {
logger.Infof("profile:%s. this profile is already listening", key)
}
}
}()
cl.watch = watch
func (fsdc *fileSystemDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) {
path := fsdc.buildPath(key, config_center.DEFAULT_GROUP)
_, loaded := fsdc.keyListeners.Load(path)
if !loaded {
if err := watch.Remove(path); err != nil {
logger.Errorf("file : listen watch: %s remove fail:", key, err)
} else {
fsdc.keyListeners.Delete(path)
}
} else {
logger.Infof("profile:%s. this profile is not exist", key)
}
}
extension.AddCustomShutdownCallback(func() {
cl.watch.Close()
})
func (fsdc *fileSystemDynamicConfiguration) close() {
watch.Close()
return cl
}
// Home returns the home directory for the executing user.
//
// This uses an OS-specific method for discovering the home directory.
// An error is returned if a home directory cannot be detected.
func Home() (string, error) {
user, err := user.Current()
if nil == err {
return user.HomeDir, nil
func allCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
}
// cross compile support
if "windows" == runtime.GOOS {
return homeWindows()
c := getFileContent(key)
for l := range lmap {
callback(l, key, c, event)
}
}
// Unix-like system, so just assume Unix
return homeUnix()
func callback(listener config_center.ConfigurationListener, path, data string, event remoting.EventType) {
listener.Process(&config_center.ConfigChangeEvent{Key: path, Value: data, ConfigType: event})
}
func homeUnix() (string, error) {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); home != "" {
return home, nil
}
func (cl *CacheListener) Close() {
cl.keyListeners.Range(func(key, value interface{}) bool {
cl.keyListeners.Delete(key)
return true
})
cl.watch.Close()
}
// If that fails, try the shell
var stdout bytes.Buffer
cmd := exec.Command("sh", "-c", "eval echo ~$USER")
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return "", err
// AddListener will add a listener if loaded
func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
cl.keyListeners.Store(key, listeners)
} else {
if err := cl.watch.Add(key); err != nil {
logger.Errorf("watcher add path:%s err:%v", key, err)
}
}
}
result := strings.TrimSpace(stdout.String())
if result == "" {
return "", errors.New("blank output when reading home directory")
// RemoveListener will delete a listener if loaded
func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) {
listeners, loaded := cl.keyListeners.Load(key)
if loaded {
delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
}
return result, nil
}
func homeWindows() (string, error) {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home := drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
if home == "" {
return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank")
func getFileContent(path string) string {
if c, err := ioutil.ReadFile(path); err != nil {
logger.Errorf("read file path:%s err:%v", path, err)
return ""
} else {
return string(c)
}
return home, nil
}
......@@ -98,6 +98,7 @@ func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.Ha
// MockDynamicConfiguration uses to parse content and defines listener
type MockDynamicConfiguration struct {
BaseDynamicConfiguration
parser parser.ConfigurationParser
content string
listener map[string]ConfigurationListener
......
......@@ -47,6 +47,7 @@ const (
// nacosDynamicConfiguration is the implementation of DynamicConfiguration based on nacos
type nacosDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
......@@ -44,6 +44,7 @@ const (
)
type zookeeperDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import "github.com/apache/dubbo-go/config_center"
// RegistryConfigurationListener represent the processor of flie watcher
type RegistryConfigurationListener struct {
}
// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/file"
"github.com/apache/dubbo-go/registry"
)
import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
perrors "github.com/pkg/errors"
)
var (
// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
instanceMap = make(map[string]registry.ServiceDiscovery, 16)
initLock sync.Mutex
)
// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.FILE_KEY, newFileSystemServiceDiscovery)
}
// fileServiceDiscovery is the implementation of service discovery based on file.
type fileSystemServiceDiscovery struct {
dynamicConfiguration file.FileSystemDynamicConfiguration
rootPath string
fileMap map[string]string
}
func newFileSystemServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
instance, ok := instanceMap[name]
if ok {
return instance, nil
}
initLock.Lock()
defer initLock.Unlock()
// double check
instance, ok = instanceMap[name]
if ok {
return instance, nil
}
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
if !ok || sdc.Protocol != constant.FILE_KEY {
return nil, perrors.New("could not init the instance because the config is invalid")
}
if rp, err := file.Home(); err != nil {
return nil, perrors.WithStack(err)
} else {
fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY)
p := path.Join(rp, ".dubbo", "registry")
url, _ := common.NewURL("")
url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p)
if c, err := fdcf.GetDynamicConfiguration(&url); err != nil {
return nil, perrors.New("could not find the config for name: " + name)
} else {
sd := &fileSystemServiceDiscovery{
dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration),
rootPath: p,
fileMap: make(map[string]string),
}
extension.AddCustomShutdownCallback(func() {
sd.Destroy()
})
for _, v := range sd.GetServices().Values() {
for _, i := range sd.GetInstances(v.(string)) {
// like java do nothing
l := &RegistryConfigurationListener{}
sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i)))
}
}
return sd, nil
}
}
}
// nolint
func (fssd *fileSystemServiceDiscovery) String() string {
return fmt.Sprintf("file-system-service-discovery")
}
// Destroy will destroy the service discovery.
// If the discovery cannot be destroy, it will return an error.
func (fssd *fileSystemServiceDiscovery) Destroy() error {
fssd.dynamicConfiguration.Close()
for _, f := range fssd.fileMap {
fssd.releaseAndRemoveRegistrationFiles(f)
}
return nil
}
// nolint
func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) {
os.Remove(file)
}
// ----------------- registration ----------------
// Register will register an instance of ServiceInstance to registry
func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error {
id := getServiceInstanceId(instance)
sn := getServiceName(instance)
if c, err := getContent(instance); err != nil {
return err
} else {
if err := fssd.dynamicConfiguration.PublishConfig(id, sn, c); err != nil {
return perrors.WithStack(err)
} else {
fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn)
}
}
return nil
}
// nolint
func getServiceInstanceId(si registry.ServiceInstance) string {
if si.GetId() == "" {
return si.GetHost() + "." + strconv.Itoa(si.GetPort())
}
return si.GetId()
}
// nolint
func getServiceName(si registry.ServiceInstance) string {
return si.GetServiceName()
}
// getContent json string
func getContent(si registry.ServiceInstance) (string, error) {
if bytes, err := json.Marshal(si); err != nil {
return "", err
} else {
return string(bytes), nil
}
}
// Update will update the data of the instance in registry
func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error {
return fssd.Register(instance)
}
// Unregister will unregister this instance from registry
func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
id := getServiceInstanceId(instance)
sn := getServiceName(instance)
if err := fssd.dynamicConfiguration.RemoveConfig(id, sn); err != nil {
return err
} else {
delete(fssd.fileMap, instance.GetId())
return nil
}
}
// ----------------- discovery -------------------
// GetDefaultPageSize will return the default page size
func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int {
return 100
}
// GetServices will return the all service names.
func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet {
r := gxset.NewSet()
// dynamicConfiguration root path is the actual root path
fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath())
for _, file := range fileInfo {
if file.IsDir() {
r.Add(file.Name())
}
}
return r
}
// GetInstances will return all service instances with serviceName
func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
if set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName); err != nil {
logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
} else {
si := make([]registry.ServiceInstance, 0, set.Size())
for _, v := range set.Values() {
id := v.(string)
if p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)); err != nil {
logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
} else {
dsi := &registry.DefaultServiceInstance{}
if err := json.Unmarshal([]byte(p), dsi); err != nil {
logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
} else {
si = append(si, dsi)
}
}
}
return si
}
}
// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName
// the page will start at offset
func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
return nil
}
// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
// The param healthy indices that the instance should be healthy or not.
// The page will start at offset
func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
return nil
}
// Batch get all instances by the specified service names
func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
return nil
}
// ----------------- event ----------------------
// AddListener adds a new ServiceInstancesChangedListener
// client
func (fssd *fileSystemServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
//fssd.dynamicConfiguration.AddListener(listener.ServiceName)
return nil
}
// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, fssd.GetInstances(serviceName)))
}
// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances
func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
// DispatchEvent dispatches the event
func (fssd *fileSystemServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"math/rand"
"strconv"
"testing"
"time"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
import (
"github.com/stretchr/testify/assert"
)
var (
testName = "test"
)
func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) {
serviceDiscovery, err := newFileSystemServiceDiscovery(testName)
assert.NoError(t, err)
assert.NotNil(t, serviceDiscovery)
defer serviceDiscovery.Destroy()
}
func TestCURDFileSystemServiceDiscovery(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY, testName)
assert.NoError(t, err)
md := make(map[string]string)
rand.Seed(time.Now().Unix())
serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
md["t1"] = "test1"
r1 := &registry.DefaultServiceInstance{
Id: "123456789",
ServiceName: serviceName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: md,
}
err = serviceDiscovery.Register(r1)
assert.NoError(t, err)
instances := serviceDiscovery.GetInstances(r1.ServiceName)
assert.Equal(t, 1, len(instances))
assert.Equal(t, r1.Id, instances[0].GetId())
assert.Equal(t, r1.ServiceName, instances[0].GetServiceName())
assert.Equal(t, r1.Port, instances[0].GetPort())
err = serviceDiscovery.Unregister(r1)
assert.NoError(t, err)
err = serviceDiscovery.Register(r1)
defer serviceDiscovery.Destroy()
}
func prepareData() {
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "file",
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment