Skip to content
Snippets Groups Projects
Commit 4a4656c0 authored by tiecheng's avatar tiecheng
Browse files

format else and length

parent 40f3a1e4
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,10 @@
package file
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -26,17 +30,22 @@ import (
)
func init() {
extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory { return &fileDynamicConfigurationFactory{} })
extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory {
return &fileDynamicConfigurationFactory{}
})
}
type fileDynamicConfigurationFactory struct {
}
func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) {
// GetDynamicConfiguration Get Configuration with URL
func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration,
error) {
dynamicConfiguration, err := newFileSystemDynamicConfiguration(url)
if err != nil {
return nil, err
return nil, perrors.WithStack(err)
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
......@@ -106,7 +106,8 @@ func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationPars
}
// AddListener Add listener
func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) {
func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
......@@ -118,7 +119,8 @@ func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener con
}
// RemoveListener Remove listener
func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opts ...config_center.Option) {
func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
......@@ -137,11 +139,12 @@ func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...co
}
path := fsdc.GetPath(key, tmpOpts.Group)
if file, err := ioutil.ReadFile(path); err != nil {
file, err := ioutil.ReadFile(path)
if err != nil {
return "", perrors.WithStack(err)
} else {
return string(file), nil
}
return string(file), nil
}
// GetRule get Router rule properties file
......@@ -150,7 +153,8 @@ func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_c
}
// GetInternalProperty get value by key in Default properties file(dubbo.properties)
func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) {
func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string,
error) {
return fsdc.GetProperties(key, opts...)
}
......
......@@ -113,6 +113,8 @@ func TestGetConfigKeysByGroup(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, gs.Size())
assert.Equal(t, key, gs.Values()[0])
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
......
......@@ -56,12 +56,14 @@ func NewCacheListener(rootPath string) *CacheListener {
logger.Debugf("watcher %s, event %v", cl.rootPath, event)
if event.Op&fsnotify.Write == fsnotify.Write {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeUpdate)
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeUpdate)
}
}
if event.Op&fsnotify.Create == fsnotify.Create {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeAdd)
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeAdd)
}
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
......@@ -89,6 +91,7 @@ func NewCacheListener(rootPath string) *CacheListener {
func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
for l := range lmap {
callback(l, key, "", event)
......@@ -98,6 +101,7 @@ func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key s
func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
c := getFileContent(key)
for l := range lmap {
......@@ -123,33 +127,36 @@ func (cl *CacheListener) Close() error {
func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}})
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{
listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
cl.keyListeners.Store(key, listeners)
} else {
if err := cl.watch.Add(key); err != nil {
logger.Errorf("watcher add path:%s err:%v", key, err)
}
return
}
if err := cl.watch.Add(key); err != nil {
logger.Errorf("watcher add path:%s err:%v", key, err)
}
}
// RemoveListener will delete a listener if loaded
func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) {
listeners, loaded := cl.keyListeners.Load(key)
if loaded {
delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
if !loaded {
return
}
delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
}
func getFileContent(path string) string {
if c, err := ioutil.ReadFile(path); err != nil {
c, err := ioutil.ReadFile(path)
if err != nil {
logger.Errorf("read file path:%s err:%v", path, err)
return ""
} else {
return string(c)
}
return string(c)
}
......@@ -82,37 +82,39 @@ func newFileSystemServiceDiscovery(name string) (registry.ServiceDiscovery, erro
return nil, perrors.New("could not init the instance because the config is invalid")
}
if rp, err := file.Home(); err != nil {
rp, err := file.Home()
if err != nil {
return nil, perrors.WithStack(err)
} else {
fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY)
p := path.Join(rp, ".dubbo", "registry")
url, _ := common.NewURL("")
url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p)
if c, err := fdcf.GetDynamicConfiguration(&url); err != nil {
return nil, perrors.New("could not find the config for name: " + name)
} else {
sd := &fileSystemServiceDiscovery{
dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration),
rootPath: p,
fileMap: make(map[string]string),
}
extension.AddCustomShutdownCallback(func() {
sd.Destroy()
})
for _, v := range sd.GetServices().Values() {
for _, i := range sd.GetInstances(v.(string)) {
// like java do nothing
l := &RegistryConfigurationListener{}
sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i)))
}
}
return sd, nil
}
fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY)
p := path.Join(rp, ".dubbo", "registry")
url, _ := common.NewURL("")
url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p)
c, err := fdcf.GetDynamicConfiguration(&url)
if err != nil {
return nil, perrors.WithStack(err)
}
sd := &fileSystemServiceDiscovery{
dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration),
rootPath: p,
fileMap: make(map[string]string),
}
extension.AddCustomShutdownCallback(func() {
sd.Destroy()
})
for _, v := range sd.GetServices().Values() {
for _, i := range sd.GetInstances(v.(string)) {
// like java do nothing
l := &RegistryConfigurationListener{}
sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i)))
}
}
return sd, nil
}
// nolint
......@@ -143,16 +145,19 @@ func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file s
func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error {
id := getServiceInstanceId(instance)
sn := getServiceName(instance)
if c, err := getContent(instance); err != nil {
return err
} else {
if err := fssd.dynamicConfiguration.PublishConfig(id, sn, c); err != nil {
return perrors.WithStack(err)
} else {
fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn)
}
c, err := toJsonString(instance)
if err != nil {
return perrors.WithStack(err)
}
err = fssd.dynamicConfiguration.PublishConfig(id, sn, c)
if err != nil {
return perrors.WithStack(err)
}
fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn)
return nil
}
......@@ -170,13 +175,14 @@ func getServiceName(si registry.ServiceInstance) string {
return si.GetServiceName()
}
// getContent json string
func getContent(si registry.ServiceInstance) (string, error) {
if bytes, err := json.Marshal(si); err != nil {
return "", err
} else {
return string(bytes), nil
// toJsonString to json string
func toJsonString(si registry.ServiceInstance) (string, error) {
bytes, err := json.Marshal(si)
if err != nil {
return "", perrors.WithStack(err)
}
return string(bytes), nil
}
// Update will update the data of the instance in registry
......@@ -188,12 +194,14 @@ func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance
func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
id := getServiceInstanceId(instance)
sn := getServiceName(instance)
if err := fssd.dynamicConfiguration.RemoveConfig(id, sn); err != nil {
return err
} else {
delete(fssd.fileMap, instance.GetId())
return nil
err := fssd.dynamicConfiguration.RemoveConfig(id, sn)
if err != nil {
return perrors.WithStack(err)
}
delete(fssd.fileMap, instance.GetId())
return nil
}
// ----------------- discovery -------------------
......@@ -219,33 +227,37 @@ func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet {
// GetInstances will return all service instances with serviceName
func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
if set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName); err != nil {
set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName)
if err != nil {
logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
} else {
si := make([]registry.ServiceInstance, 0, set.Size())
for _, v := range set.Values() {
id := v.(string)
if p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)); err != nil {
logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
} else {
dsi := &registry.DefaultServiceInstance{}
if err := json.Unmarshal([]byte(p), dsi); err != nil {
logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
} else {
si = append(si, dsi)
}
}
}
res := make([]registry.ServiceInstance, 0, set.Size())
for _, v := range set.Values() {
id := v.(string)
p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName))
if err != nil {
logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
}
return si
dsi := &registry.DefaultServiceInstance{}
err = json.Unmarshal([]byte(p), dsi)
if err != nil {
logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+
"error = err{%v} ",
id, serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
}
res = append(res, dsi)
}
return res
}
// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName
......@@ -257,12 +269,14 @@ func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, o
// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
// The param healthy indices that the instance should be healthy or not.
// The page will start at offset
func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int,
healthy bool) gxpage.Pager {
return nil
}
// Batch get all instances by the specified service names
func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int,
requestedSize int) map[string]gxpage.Pager {
return nil
}
......@@ -280,7 +294,8 @@ func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName s
}
// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances
func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string,
instances []registry.ServiceInstance) error {
return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment