Skip to content
Snippets Groups Projects
Commit 37144431 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #732 from DogBaoBao/feature/file_service_discovery

Ftr: File system service discovery 
parents 3bf554ac af20d89d
No related branches found
No related tags found
No related merge requests found
Showing with 1134 additions and 0 deletions
......@@ -169,6 +169,10 @@ const (
NACOS_USERNAME = "username"
)
const (
FILE_KEY = "file"
)
const (
ZOOKEEPER_KEY = "zookeeper"
)
......
......@@ -396,6 +396,17 @@ func (c *URL) AddParam(key string, value string) {
c.params.Add(key, value)
}
// AddParamAvoidNil will add key-value pair
// Not thread-safe
// think twice before using it.
func (c *URL) AddParamAvoidNil(key string, value string) {
if c.params == nil {
c.params = url.Values{}
}
c.params.Add(key, value)
}
// SetParam will put the key-value pair into url
// it's not thread safe.
// think twice before you want to use this method
......
......@@ -45,6 +45,7 @@ const (
)
type apolloConfiguration struct {
cc.BaseDynamicConfiguration
url *common.URL
listeners sync.Map
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
// BaseDynamicConfiguration will default implementation DynamicConfiguration some method
type BaseDynamicConfiguration struct {
}
// RemoveConfig
func (bdc *BaseDynamicConfiguration) RemoveConfig(string, string) error {
return nil
}
......@@ -58,6 +58,9 @@ type DynamicConfiguration interface {
// PublishConfig will publish the config with the (key, group, value) pair
PublishConfig(string, string, string) error
// RemoveConfig will remove the config white the (key, group) pair
RemoveConfig(string, string) error
// GetConfigKeysByGroup will return all keys with the group
GetConfigKeysByGroup(group string) (*gxset.HashSet, error)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
func init() {
extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory {
return &fileDynamicConfigurationFactory{}
})
}
type fileDynamicConfigurationFactory struct {
}
// GetDynamicConfiguration Get Configuration with URL
func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration,
error) {
dynamicConfiguration, err := newFileSystemDynamicConfiguration(url)
if err != nil {
return nil, perrors.WithStack(err)
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"bytes"
"errors"
"io/ioutil"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"runtime"
"strings"
)
import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
const (
PARAM_NAME_PREFIX = "dubbo.config-center."
CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir"
CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding"
DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8"
)
// FileSystemDynamicConfiguration
type FileSystemDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
encoding string
cacheListener *CacheListener
parser parser.ConfigurationParser
}
func newFileSystemDynamicConfiguration(url *common.URL) (*FileSystemDynamicConfiguration, error) {
encode := url.GetParam(CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING)
root := url.GetParam(CONFIG_CENTER_DIR_PARAM_NAME, "")
var c *FileSystemDynamicConfiguration
if _, err := os.Stat(root); err != nil {
// not exist, use default, /XXX/xx/.dubbo/config-center
if rp, err := Home(); err != nil {
return nil, perrors.WithStack(err)
} else {
root = path.Join(rp, ".dubbo", "config-center")
}
}
if _, err := os.Stat(root); err != nil {
// it must be dir, if not exist, will create
if err = createDir(root); err != nil {
return nil, perrors.WithStack(err)
}
}
c = &FileSystemDynamicConfiguration{
url: url,
rootPath: root,
encoding: encode,
}
c.cacheListener = NewCacheListener(c.rootPath)
return c, nil
}
// RootPath get root path
func (fsdc *FileSystemDynamicConfiguration) RootPath() string {
return fsdc.rootPath
}
// Parser Get Parser
func (fsdc *FileSystemDynamicConfiguration) Parser() parser.ConfigurationParser {
return fsdc.parser
}
// SetParser Set Parser
func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
fsdc.parser = p
}
// AddListener Add listener
func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.AddListener(path, listener)
}
// RemoveListener Remove listener
func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.RemoveListener(path, listener)
}
// GetProperties get properties file
func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
file, err := ioutil.ReadFile(path)
if err != nil {
return "", perrors.WithStack(err)
}
return string(file), nil
}
// GetRule get Router rule properties file
func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) {
return fsdc.GetProperties(key, opts...)
}
// GetInternalProperty get value by key in Default properties file(dubbo.properties)
func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string,
error) {
return fsdc.GetProperties(key, opts...)
}
// PublishConfig will publish the config with the (key, group, value) pair
func (fsdc *FileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error {
path := fsdc.GetPath(key, group)
return fsdc.write2File(path, value)
}
// GetConfigKeysByGroup will return all keys with the group
func (fsdc *FileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
path := fsdc.GetPath("", group)
r := gxset.NewSet()
fileInfo, _ := ioutil.ReadDir(path)
for _, file := range fileInfo {
// list file
if file.IsDir() {
continue
}
r.Add(file.Name())
}
return r, nil
}
// RemoveConfig will remove the config whit hte (key, group)
func (fsdc *FileSystemDynamicConfiguration) RemoveConfig(key string, group string) error {
path := fsdc.GetPath(key, group)
_, err := fsdc.deleteDelay(path)
return err
}
// Close close file watcher
func (fsdc *FileSystemDynamicConfiguration) Close() error {
return fsdc.cacheListener.Close()
}
// GetPath get path
func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) string {
if len(key) == 0 {
return path.Join(fsdc.rootPath, group)
}
if len(group) == 0 {
group = config_center.DEFAULT_GROUP
}
return path.Join(fsdc.rootPath, group, key)
}
func (fsdc *FileSystemDynamicConfiguration) deleteDelay(path string) (bool, error) {
if path == "" {
return false, nil
}
if err := os.RemoveAll(path); err != nil {
return false, err
}
return true, nil
}
func (fsdc *FileSystemDynamicConfiguration) write2File(fp string, value string) error {
if err := forceMkdirParent(fp); err != nil {
return perrors.WithStack(err)
}
return ioutil.WriteFile(fp, []byte(value), os.ModePerm)
}
func forceMkdirParent(fp string) error {
pd := getParentDirectory(fp)
return createDir(pd)
}
func createDir(path string) error {
// create dir, chmod is drwxrwxrwx(0777)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
return nil
}
func getParentDirectory(fp string) string {
return substr(fp, 0, strings.LastIndex(fp, string(filepath.Separator)))
}
func substr(s string, pos, length int) string {
runes := []rune(s)
l := pos + length
if l > len(runes) {
l = len(runes)
}
return string(runes[pos:l])
}
// Home returns the home directory for the executing user.
//
// This uses an OS-specific method for discovering the home directory.
// An error is returned if a home directory cannot be detected.
func Home() (string, error) {
user, err := user.Current()
if nil == err {
return user.HomeDir, nil
}
// cross compile support
if "windows" == runtime.GOOS {
return homeWindows()
}
// Unix-like system, so just assume Unix
return homeUnix()
}
func homeUnix() (string, error) {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); home != "" {
return home, nil
}
// If that fails, try the shell
var stdout bytes.Buffer
cmd := exec.Command("sh", "-c", "eval echo ~$USER")
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return "", err
}
result := strings.TrimSpace(stdout.String())
if result == "" {
return "", errors.New("blank output when reading home directory")
}
return result, nil
}
func homeWindows() (string, error) {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home := drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
if home == "" {
return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank")
}
return home, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"fmt"
"os"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
const (
key = "com.dubbo.go"
)
func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) {
urlString := "registry://127.0.0.1:2181"
regurl, err := common.NewURL(urlString)
assert.NoError(t, err)
dc, err := extension.GetConfigCenterFactory("file").GetDynamicConfiguration(&regurl)
assert.NoError(t, err)
return dc.(*FileSystemDynamicConfiguration), err
}
func TestPublishAndGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
err = file.PublishConfig(key, "", "A")
assert.NoError(t, err)
prop, err := file.GetProperties(key)
assert.NoError(t, err)
assert.Equal(t, "A", prop)
defer destroy(file.rootPath, file)
}
func TestAddListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestRemoveListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// make sure callback before RemoveListener
time.Sleep(time.Second)
file.RemoveListener(key, listener, config_center.WithGroup(group))
value = "Test Value 3"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestGetConfigKeysByGroup(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
gs, err := file.GetConfigKeysByGroup(group)
assert.NoError(t, err)
assert.Equal(t, 1, gs.Size())
assert.Equal(t, key, gs.Values()[0])
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetProperties(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(file.rootPath, file)
}
func TestPublishConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetInternalProperty(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(file.rootPath, file)
}
func destroy(path string, fdc *FileSystemDynamicConfiguration) {
fdc.Close()
os.RemoveAll(path)
}
type mockDataListener struct{}
func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) {
fmt.Printf("process!!!!! %v", configType)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"io/ioutil"
"sync"
)
import (
"github.com/fsnotify/fsnotify"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
// CacheListener is file watcher
type CacheListener struct {
watch *fsnotify.Watcher
keyListeners sync.Map
rootPath string
}
// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
cl := &CacheListener{rootPath: rootPath}
// start watcher
watch, err := fsnotify.NewWatcher()
if err != nil {
logger.Errorf("file : listen config fail, error:%v ", err)
}
go func() {
for {
select {
case event := <-watch.Events:
key := event.Name
logger.Debugf("watcher %s, event %v", cl.rootPath, event)
if event.Op&fsnotify.Write == fsnotify.Write {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeUpdate)
}
}
if event.Op&fsnotify.Create == fsnotify.Create {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeAdd)
}
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
if l, ok := cl.keyListeners.Load(key); ok {
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel)
}
}
case err := <-watch.Errors:
// err may be nil, ignore
if err != nil {
logger.Warnf("file : listen watch fail:%+v", err)
}
}
}
}()
cl.watch = watch
extension.AddCustomShutdownCallback(func() {
cl.watch.Close()
})
return cl
}
func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
for l := range lmap {
callback(l, key, "", event)
}
}
func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
c := getFileContent(key)
for l := range lmap {
callback(l, key, c, event)
}
}
func callback(listener config_center.ConfigurationListener, path, data string, event remoting.EventType) {
listener.Process(&config_center.ConfigChangeEvent{Key: path, Value: data, ConfigType: event})
}
// Close will remove key listener and close watcher
func (cl *CacheListener) Close() error {
cl.keyListeners.Range(func(key, value interface{}) bool {
cl.keyListeners.Delete(key)
return true
})
return cl.watch.Close()
}
// AddListener will add a listener if loaded
// if you watcher a file or directory not exist, will error with no such file or directory
func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{
listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
cl.keyListeners.Store(key, listeners)
return
}
if err := cl.watch.Add(key); err != nil {
logger.Errorf("watcher add path:%s err:%v", key, err)
}
}
// RemoveListener will delete a listener if loaded
func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) {
listeners, loaded := cl.keyListeners.Load(key)
if !loaded {
return
}
delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
}
func getFileContent(path string) string {
c, err := ioutil.ReadFile(path)
if err != nil {
logger.Errorf("read file path:%s err:%v", path, err)
return ""
}
return string(c)
}
......@@ -98,6 +98,7 @@ func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.Ha
// MockDynamicConfiguration uses to parse content and defines listener
type MockDynamicConfiguration struct {
BaseDynamicConfiguration
parser parser.ConfigurationParser
content string
listener map[string]ConfigurationListener
......
......@@ -47,6 +47,7 @@ const (
// nacosDynamicConfiguration is the implementation of DynamicConfiguration based on nacos
type nacosDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
......@@ -44,6 +44,7 @@ const (
)
type zookeeperDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
......@@ -18,6 +18,7 @@ require (
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
github.com/frankban/quicktest v1.4.1 // indirect
github.com/fsnotify/fsnotify v1.4.7
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.1.0
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import "github.com/apache/dubbo-go/config_center"
// RegistryConfigurationListener represent the processor of flie watcher
type RegistryConfigurationListener struct {
}
// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
)
import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/file"
"github.com/apache/dubbo-go/registry"
)
// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.FILE_KEY, newFileSystemServiceDiscovery)
}
// fileServiceDiscovery is the implementation of service discovery based on file.
type fileSystemServiceDiscovery struct {
dynamicConfiguration file.FileSystemDynamicConfiguration
rootPath string
fileMap map[string]string
}
func newFileSystemServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
if !ok || sdc.Protocol != constant.FILE_KEY {
return nil, perrors.New("could not init the instance because the config is invalid")
}
rp, err := file.Home()
if err != nil {
return nil, perrors.WithStack(err)
}
fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY)
p := path.Join(rp, ".dubbo", constant.REGISTRY_KEY)
url, _ := common.NewURL("")
url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p)
c, err := fdcf.GetDynamicConfiguration(&url)
if err != nil {
return nil, perrors.WithStack(err)
}
sd := &fileSystemServiceDiscovery{
dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration),
rootPath: p,
fileMap: make(map[string]string),
}
extension.AddCustomShutdownCallback(func() {
sd.Destroy()
})
for _, v := range sd.GetServices().Values() {
for _, i := range sd.GetInstances(v.(string)) {
// like java do nothing
l := &RegistryConfigurationListener{}
sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i)))
}
}
return sd, nil
}
// nolint
func (fssd *fileSystemServiceDiscovery) String() string {
return fmt.Sprintf("file-system-service-discovery")
}
// Destroy will destroy the service discovery.
// If the discovery cannot be destroy, it will return an error.
func (fssd *fileSystemServiceDiscovery) Destroy() error {
fssd.dynamicConfiguration.Close()
for _, f := range fssd.fileMap {
fssd.releaseAndRemoveRegistrationFiles(f)
}
return nil
}
// nolint
func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) {
os.RemoveAll(file)
}
// ----------------- registration ----------------
// Register will register an instance of ServiceInstance to registry
func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error {
id := getServiceInstanceId(instance)
sn := getServiceName(instance)
c, err := toJsonString(instance)
if err != nil {
return perrors.WithStack(err)
}
err = fssd.dynamicConfiguration.PublishConfig(id, sn, c)
if err != nil {
return perrors.WithStack(err)
}
fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn)
return nil
}
// nolint
func getServiceInstanceId(si registry.ServiceInstance) string {
if si.GetId() == "" {
return si.GetHost() + "." + strconv.Itoa(si.GetPort())
}
return si.GetId()
}
// nolint
func getServiceName(si registry.ServiceInstance) string {
return si.GetServiceName()
}
// toJsonString to json string
func toJsonString(si registry.ServiceInstance) (string, error) {
bytes, err := json.Marshal(si)
if err != nil {
return "", perrors.WithStack(err)
}
return string(bytes), nil
}
// Update will update the data of the instance in registry
func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error {
return fssd.Register(instance)
}
// Unregister will unregister this instance from registry
func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
id := getServiceInstanceId(instance)
sn := getServiceName(instance)
err := fssd.dynamicConfiguration.RemoveConfig(id, sn)
if err != nil {
return perrors.WithStack(err)
}
delete(fssd.fileMap, instance.GetId())
return nil
}
// ----------------- discovery -------------------
// GetDefaultPageSize will return the default page size
func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int {
return 100
}
// GetServices will return the all service names.
func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet {
r := gxset.NewSet()
// dynamicConfiguration root path is the actual root path
fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath())
for _, file := range fileInfo {
if file.IsDir() {
r.Add(file.Name())
}
}
return r
}
// GetInstances will return all service instances with serviceName
func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName)
if err != nil {
logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
}
res := make([]registry.ServiceInstance, 0, set.Size())
for _, v := range set.Values() {
id := v.(string)
p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName))
if err != nil {
logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
}
dsi := &registry.DefaultServiceInstance{}
err = json.Unmarshal([]byte(p), dsi)
if err != nil {
logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
}
res = append(res, dsi)
}
return res
}
// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName
// the page will start at offset
func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
return nil
}
// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
// The param healthy indices that the instance should be healthy or not.
// The page will start at offset
func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int,
healthy bool) gxpage.Pager {
return nil
}
// Batch get all instances by the specified service names
func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int,
requestedSize int) map[string]gxpage.Pager {
return nil
}
// ----------------- event ----------------------
// AddListener adds a new ServiceInstancesChangedListener
// client
func (fssd *fileSystemServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
//fssd.dynamicConfiguration.AddListener(listener.ServiceName)
return nil
}
// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, fssd.GetInstances(serviceName)))
}
// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances
func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string,
instances []registry.ServiceInstance) error {
return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
// DispatchEvent dispatches the event
func (fssd *fileSystemServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"math/rand"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
var (
testName = "test"
)
func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) {
prepareData()
serviceDiscovery, err := newFileSystemServiceDiscovery(testName)
assert.NoError(t, err)
assert.NotNil(t, serviceDiscovery)
defer serviceDiscovery.Destroy()
}
func TestCURDFileSystemServiceDiscovery(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY, testName)
assert.NoError(t, err)
md := make(map[string]string)
rand.Seed(time.Now().Unix())
serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
md["t1"] = "test1"
r1 := &registry.DefaultServiceInstance{
Id: "123456789",
ServiceName: serviceName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: md,
}
err = serviceDiscovery.Register(r1)
assert.NoError(t, err)
instances := serviceDiscovery.GetInstances(r1.ServiceName)
assert.Equal(t, 1, len(instances))
assert.Equal(t, r1.Id, instances[0].GetId())
assert.Equal(t, r1.ServiceName, instances[0].GetServiceName())
assert.Equal(t, r1.Port, instances[0].GetPort())
err = serviceDiscovery.Unregister(r1)
assert.NoError(t, err)
err = serviceDiscovery.Register(r1)
defer serviceDiscovery.Destroy()
}
func prepareData() {
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "file",
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment