Skip to content
Snippets Groups Projects
Commit 943e6da6 authored by vito.he's avatar vito.he
Browse files

Mod: based on AlexStock's review

parent 5a634a9d
No related branches found
No related tags found
No related merge requests found
......@@ -34,9 +34,10 @@ type Environment struct {
externalConfigMap sync.Map
}
var instance *Environment
var once sync.Once
var (
instance *Environment
once sync.Once
)
func GetEnvInstance() *Environment {
once.Do(func() {
......@@ -82,9 +83,9 @@ func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) {
v, ok := conf.store.Load(key)
if ok {
return true, v.(string)
} else {
return false, ""
}
return false, ""
}
func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} {
......
......@@ -17,7 +17,9 @@
package constant
const DUBBO = "dubbo"
const (
DUBBO = "dubbo"
)
const (
DEFAULT_WEIGHT = 100 //
DEFAULT_WARMUP = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
......
......@@ -34,5 +34,4 @@ func GetConfigCenterFactory(name string) config_center.DynamicConfigurationFacto
panic("config center for " + name + " is not existing, make sure you have import the package.")
}
return configCenterFactories[name]()
}
......@@ -78,8 +78,10 @@ func (c *BaseConfig) prepareEnvironment() error {
}
func getKeyPrefix(val reflect.Value, id reflect.Value) string {
var prefix string
var idStr string
var (
prefix string
idStr string
)
if id.Kind() == reflect.String {
idStr = id.Interface().(string)
}
......
......@@ -58,8 +58,10 @@ func init() {
// Dubbo Init
func Load() {
var refMap map[string]*ReferenceConfig
var srvMap map[string]*ServiceConfig
var (
refMap map[string]*ReferenceConfig
srvMap map[string]*ServiceConfig
)
// reference config
if consumerConfig == nil {
......
package config_center
import (
"github.com/stretchr/testify/assert"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestDefaultConfigurationParser_Parser(t *testing.T) {
parser := &DefaultConfigurationParser{}
......
......@@ -17,7 +17,9 @@
package config_center
import "github.com/apache/dubbo-go/common"
import (
"github.com/apache/dubbo-go/common"
)
type DynamicConfigurationFactory interface {
GetDynamicConfiguration(*common.URL) (DynamicConfiguration, error)
......
......@@ -25,11 +25,12 @@ import (
"github.com/apache/dubbo-go/remoting"
)
type MockDynamicConfigurationFactory struct {
}
type MockDynamicConfigurationFactory struct{}
var once sync.Once
var dynamicConfiguration *mockDynamicConfiguration
var (
once sync.Once
dynamicConfiguration *mockDynamicConfiguration
)
func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) {
var err error
......
......@@ -131,11 +131,11 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case remoting.Add:
//dir.cacheService.Add(res.Path, dir.serviceTTL)
case remoting.EventTypeAdd:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case remoting.Del:
//dir.cacheService.Del(res.Path, dir.serviceTTL)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
default:
......
......@@ -51,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))})
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeDel, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
......@@ -81,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"),
common.WithParams(urlmap))})
}
//for group2
......@@ -89,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"),
common.WithParams(urlmap2))})
}
......@@ -129,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
go registryDirectory.Subscribe(*common.NewURLWithOptions(common.WithPath("testservice")))
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: *common.NewURLWithOptions(common.WithPath("TEST"+strconv.FormatInt(int64(i), 10)), common.WithProtocol("dubbo"))})
}
return registryDirectory, mockRegistry.(*registry.MockRegistry)
}
......@@ -89,7 +89,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.ConfigType == remoting.Del && !l.valid() {
if e.ConfigType == remoting.EventTypeDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
continue
}
......
......@@ -44,9 +44,9 @@ func (c ConfigChangeEvent) String() string {
type EventType int
const (
Add = iota
Del
Mod
EventTypeAdd = iota
EventTypeDel
EvnetTypeUpdate
)
var serviceEventTypeStrings = [...]string{
......
......@@ -48,23 +48,6 @@ func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.
}
}
func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventType, source string) {
for _, e := range expectedEvent {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
if event.Type != e {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, event, event.Type)
}
break
}
}
}
//func Test_newZookeeperClient(t *testing.T) {
// ts, err := zk.StartTestCluster(1, nil, nil)
// if err != nil {
......
......@@ -69,14 +69,14 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remo
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.Mod, Content: string(content)})
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EvnetTypeUpdate, Content: string(content)})
}
case zk.EventNodeCreated:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.Add, Content: string(content)})
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)})
}
case zk.EventNotWatching:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
......@@ -125,7 +125,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: string(content)}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
......@@ -133,7 +133,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", n)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
......@@ -153,7 +153,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.Del})
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
......@@ -212,13 +212,13 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
logger.Infof("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Add, Content: string(content)}) {
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Del})
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
......@@ -246,7 +246,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
//
//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) {
// l.wg.Add(1)
// l.wg.EventTypeAdd(1)
// defer l.wg.Done()
//
// var (
......@@ -347,13 +347,13 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Add, Content: string(content)}) {
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.Del})
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment