diff --git a/plugins/plugins.go b/plugins/plugins.go index 82c7983db26f7ea686a39103616156aaec0368f2..740a1e9fa407b94d30fe554c5bbb84ae27010a71 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -6,11 +6,11 @@ import ( "github.com/dubbo/dubbo-go/registry/zookeeper" ) -var PluggableRegistries = map[string]func(...registry.OptionInf) (registry.Registry,error){ - "zookeeper":zookeeper.NewZkRegistry, +var PluggableRegistries = map[string]func(...registry.RegistryOption) (registry.Registry, error){ + "zookeeper": zookeeper.NewZkRegistry, } -var PluggableLoadbalance = map[string]func()loadBalance.Selector{ - "round_robin":loadBalance.NewRoundRobinSelector, - "random":loadBalance.NewRandomSelector, +var PluggableLoadbalance = map[string]func() loadBalance.Selector{ + "round_robin": loadBalance.NewRoundRobinSelector, + "random": loadBalance.NewRandomSelector, } diff --git a/registry/event.go b/registry/event.go index 95e381e00662d509c2a2d68970e9afdb0fc80ca6..97a88bc42a69c85a771f0e3b712b913dc80e32c1 100644 --- a/registry/event.go +++ b/registry/event.go @@ -44,5 +44,5 @@ type ServiceURLEvent struct { } func (e ServiceURLEvent) String() string { - return fmt.Sprintf("ServiceURLEvent{Action{%s}, Service{%s}}", e.Action.String(), e.Service) + return fmt.Sprintf("ServiceURLEvent{Action{%s}, Service{%s}}", e.Action, e.Service) } diff --git a/registry/options.go b/registry/options.go index d286d48c5aeb90efd2fbbd407a53a502d3fce3ac..28197a7af04972c0893d3ee2d685d9b1eaab77b2 100644 --- a/registry/options.go +++ b/registry/options.go @@ -1,5 +1,13 @@ package registry +import ( + "fmt" +) + +///////////////////////////////// +// dubbo role type +///////////////////////////////// + const ( CONSUMER = iota CONFIGURATOR @@ -22,6 +30,14 @@ func (t DubboType) Role() string { return DubboRole[t] } +///////////////////////////////// +// dubbo config & options +///////////////////////////////// + +type RegistryOption interface { + Name() string +} + type ApplicationConfig struct { Organization string `yaml:"organization" json:"organization,omitempty"` Name string `yaml:"name" json:"name,omitempty"` @@ -31,28 +47,25 @@ type ApplicationConfig struct { Environment string `yaml:"environment" json:"environment,omitempty"` } -type OptionInf interface { - OptionName() string -} type Options struct { ApplicationConfig DubboType DubboType } -//func (c *ApplicationConfig) ToString() string { -// return fmt.Sprintf("ApplicationConfig is {name:%s, version:%s, owner:%s, module:%s, organization:%s}", -// c.Name, c.Version, c.Owner, c.Module, c.Organization) -//} +func (o *Options) String() string { + return fmt.Sprintf("name:%s, version:%s, owner:%s, module:%s, organization:%s, type:%s", + o.Name, o.Version, o.Owner, o.Module, o.Organization, o.DubboType) +} type Option func(*Options) -func (Option) OptionName() string { - return "Abstact option func" +func (Option) Name() string { + return "dubbogo-registry-option" } -func WithDubboType(tp DubboType) Option { +func WithDubboType(typ DubboType) Option { return func(o *Options) { - o.DubboType = tp + o.DubboType = typ } } diff --git a/registry/registry.go b/registry/registry.go index 6e8531c4792f92401b943bf7b104da7eddd0c6ac..4f1e5b9132309c74f1797898c25ed4fe313c1c5d 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -8,8 +8,6 @@ import ( // Registry Interface ////////////////////////////////////////////// - - // for service discovery/registry type Registry interface { @@ -17,19 +15,13 @@ type Registry interface { ProviderRegister(conf service.ServiceConfigIf) error //used for service consumer calling , register services cared about ,for dubbo's admin monitoring ConsumerRegister(conf *service.ServiceConfig) error - //unregister service for service provider - //Unregister(conf interface{}) error //used for service consumer ,start listen goroutine - GetListenEvent()(chan *ServiceURLEvent) + GetListenEvent() chan *ServiceURLEvent //input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available GetService(*service.ServiceConfig) ([]*service.ServiceURL, error) - //input service config & request id, should return url which registry used - //Filter(ServiceConfigIf, int64) (*ServiceURL, error) Close() //new Provider conf - NewProviderServiceConfig(service.ServiceConfig)service.ServiceConfigIf + NewProviderServiceConfig(service.ServiceConfig) service.ServiceConfigIf } - - diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go index 1bbcfbf234579b89ca0c9468c9ded8cc0f8dd427..f91693226b89fe92c1129c04b752d876ea516a0d 100644 --- a/registry/zookeeper/consumer.go +++ b/registry/zookeeper/consumer.go @@ -140,7 +140,7 @@ func (r *ZkRegistry) listen() { return } log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) - time.Sleep(timeSecondDuration(REGISTRY_CONN_DELAY)) + time.Sleep(timeSecondDuration(RegistryConnDelay)) continue } if err = listener.listenEvent(r); err != nil { @@ -152,7 +152,7 @@ func (r *ZkRegistry) listen() { listener.close() - time.Sleep(timeSecondDuration(REGISTRY_CONN_DELAY)) + time.Sleep(timeSecondDuration(RegistryConnDelay)) continue } } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index ec3000be2634fe4885204e849e893c48ed8b2f90..7fb21d52f0db1e64a22b4d1a38c382119d63a81f 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -19,9 +19,7 @@ import ( ) const ( - MAX_TIMES = 15 - ZkEvent_Channel_Size = 32 - ZKCLIENT_EVENT_CHANNEL_SIZE = 4 + MaxFailTimes = 15 ) type zkEvent struct { @@ -44,7 +42,7 @@ type zkEventListener struct { func newZkEventListener(client *zookeeperClient) *zkEventListener { return &zkEventListener{ client: client, - events: make(chan zkEvent, ZkEvent_Channel_Size), + events: make(chan zkEvent, 32), serviceMap: make(map[string]struct{}), } } @@ -166,15 +164,15 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceCon event chan struct{} zkEvent zk.Event ) - event = make(chan struct{}, ZKCLIENT_EVENT_CHANNEL_SIZE) + event = make(chan struct{}, 4) defer close(event) for { // get current children for a zkPath children, childEventCh, err := l.client.getChildrenW(zkPath) if err != nil { failTimes++ - if MAX_TIMES <= failTimes { - failTimes = MAX_TIMES + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes } log.Error("listenDirEvent(path{%s}) = error{%v}", zkPath, err) // clear the event channel @@ -188,7 +186,7 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceCon } l.client.registerEvent(zkPath, &event) select { - case <-time.After(timeSecondDuration(failTimes * REGISTRY_CONN_DELAY)): + case <-time.After(timeSecondDuration(failTimes * RegistryConnDelay)): l.client.unregisterEvent(zkPath, &event) continue case <-l.client.done(): diff --git a/registry/zookeeper/zookeeper.go b/registry/zookeeper/zookeeper.go index 2fc7f92decb00bf9ffb957241ea621a81dcb9162..659505708ce623e66c978a0d38783b34e8b27212 100644 --- a/registry/zookeeper/zookeeper.go +++ b/registry/zookeeper/zookeeper.go @@ -23,10 +23,9 @@ import ( ) const ( - defaultTimeout = int64(10e9) - RegistryZkClient = "zk registry" - DEFAULT_REGISTRY_TIMEOUT = 1 * time.Second - REGISTRY_CONN_DELAY = 3 + defaultTimeout = int64(10e9) + RegistryZkClient = "zk registry" + RegistryConnDelay = 3 ) var ( @@ -34,6 +33,11 @@ var ( localIP = "" ) +func init() { + processID = fmt.Sprintf("%d", os.Getpid()) + localIP, _ = gxnet.GetLocalIP() +} + type ZkRegistryConfig struct { Address []string `required:"true" yaml:"address" json:"address,omitempty"` UserName string `yaml:"user_name" json:"user_name,omitempty"` @@ -47,10 +51,21 @@ type Options struct { ZkRegistryConfig } +func (o Options) ToString() string { + return fmt.Sprintf("%s, address:%+v, user:%s, password:%s, conn-timeout:%s", + o.Options, o.Address, o.UserName, o.Password, o.Timeout) +} + type Option func(*Options) -func (Option) OptionName() string { - return "zk's option func" +func (Option) Name() string { + return "dubbogo-zookeeper-registry-option" +} + +func WithRegistryConf(conf ZkRegistryConfig) Option { + return func(o *Options) { + o.ZkRegistryConfig = conf + } } type ZkRegistry struct { @@ -69,18 +84,7 @@ type ZkRegistry struct { outerEventCh chan *registry.ServiceURLEvent } -func init() { - processID = fmt.Sprintf("%d", os.Getpid()) - localIP, _ = gxnet.GetLocalIP() -} - -func WithRegistryConf(conf ZkRegistryConfig) Option { - return func(o *Options) { - o.ZkRegistryConfig = conf - } -} - -func NewZkRegistry(opts ...registry.OptionInf) (registry.Registry, error) { +func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { var ( err error r *ZkRegistry @@ -115,7 +119,7 @@ func NewZkRegistry(opts ...registry.OptionInf) (registry.Registry, error) { } if r.ZkRegistryConfig.Timeout == 0 { - r.ZkRegistryConfig.Timeout = DEFAULT_REGISTRY_TIMEOUT + r.ZkRegistryConfig.Timeout = 1e9 } err = r.validateZookeeperClient() if err != nil { @@ -195,7 +199,7 @@ LOOP: case <-r.done: log.Warn("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP - case <-time.After(time.Duration(1e9 * failTimes * REGISTRY_CONN_DELAY)): // 闃叉鐤媯閲嶈繛zk + case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 闃叉鐤媯閲嶈繛zk } err = r.validateZookeeperClient() log.Info("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}", @@ -223,8 +227,8 @@ LOOP: } } failTimes++ - if MAX_TIMES <= failTimes { - failTimes = MAX_TIMES + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes } } }