From 566cb83167c8f36d800359ab9eb0902001739820 Mon Sep 17 00:00:00 2001 From: AlexStocks <alexstocks@foxmail.com> Date: Sun, 31 Mar 2019 20:15:06 +0800 Subject: [PATCH] Rem: del service dir --- client/invoker/invoker.go | 57 ++++++++------- client/invoker/service_array.go | 28 ++++---- client/selector/random.go | 16 ++--- client/selector/round_robin.go | 19 +++-- client/selector/selector.go | 4 +- client/service_array.go | 12 ++-- dubbo/client.go | 10 +-- examples/dubbo/go-client/app/client.go | 2 +- examples/jsonrpc/go-client/app/client.go | 2 +- jsonrpc/http.go | 10 +-- jsonrpc/server.go | 14 ++-- registry/event.go | 32 ++++----- registry/registry.go | 14 ++-- {service => registry}/service.go | 71 ++++++++++++++++--- registry/zookeeper/consumer.go | 55 +++++++------- registry/zookeeper/listener.go | 37 +++++----- registry/zookeeper/provider.go | 27 +++---- .../zookeeper/{zookeeper.go => registry.go} | 70 ++++++++++-------- service/service_config.go | 52 -------------- 19 files changed, 266 insertions(+), 266 deletions(-) rename {service => registry}/service.go (68%) rename registry/zookeeper/{zookeeper.go => registry.go} (89%) delete mode 100644 service/service_config.go diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go index 3aac1702f..beadea380 100644 --- a/client/invoker/invoker.go +++ b/client/invoker/invoker.go @@ -2,7 +2,6 @@ package invoker import ( "context" - "github.com/dubbo/dubbo-go/dubbo" "sync" "time" ) @@ -14,9 +13,9 @@ import ( import ( "github.com/dubbo/dubbo-go/client/selector" + "github.com/dubbo/dubbo-go/dubbo" "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/registry" - "github.com/dubbo/dubbo-go/service" ) type Options struct { @@ -99,77 +98,77 @@ func (ivk *Invoker) listen() { } } -func (ivk *Invoker) update(res *registry.ServiceURLEvent) { +func (ivk *Invoker) update(res *registry.ServiceEvent) { if res == nil || res.Service == nil { return } log.Debug("registry update, result{%s}", res) - serviceKey := res.Service.ServiceConfig().Key() + registryKey := res.Service.ServiceConfig().Key() ivk.listenerLock.Lock() defer ivk.listenerLock.Unlock() - svcArr, ok := ivk.cacheServiceMap[serviceKey] - log.Debug("service name:%s, its current member lists:%+v", serviceKey, svcArr) + svcArr, ok := ivk.cacheServiceMap[registryKey] + log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr) switch res.Action { - case registry.ServiceURLAdd: + case registry.ServiceAdd: if ok { svcArr.add(res.Service, ivk.ServiceTTL) } else { - ivk.cacheServiceMap[serviceKey] = newServiceArray([]*service.ServiceURL{res.Service}) + ivk.cacheServiceMap[registryKey] = newServiceArray([]*registry.ServiceURL{res.Service}) } - case registry.ServiceURLDel: + case registry.ServiceDel: if ok { svcArr.del(res.Service, ivk.ServiceTTL) if len(svcArr.arr) == 0 { - delete(ivk.cacheServiceMap, serviceKey) - log.Warn("delete service %s from service map", serviceKey) + delete(ivk.cacheServiceMap, registryKey) + log.Warn("delete registry %s from registry map", registryKey) } } - log.Error("selector delete serviceURL{%s}", *res.Service) + log.Error("selector delete registryURL{%s}", *res.Service) } } -func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArray, error) { +func (ivk *Invoker) getService(registryConf *registry.ServiceConfig) (*ServiceArray, error) { defer ivk.listenerLock.Unlock() - serviceKey := serviceConf.Key() + registryKey := registryConf.Key() ivk.listenerLock.Lock() - svcArr, sok := ivk.cacheServiceMap[serviceKey] - log.Debug("r.svcArr[serviceString{%v}] = svcArr{%s}", serviceKey, svcArr) + svcArr, sok := ivk.cacheServiceMap[registryKey] + log.Debug("r.svcArr[registryString{%v}] = svcArr{%s}", registryKey, svcArr) if sok && time.Since(svcArr.birth) < ivk.Options.ServiceTTL { return svcArr, nil } ivk.listenerLock.Unlock() - svcs, err := ivk.registry.GetService(serviceConf) + svcs, err := ivk.registry.GetService(registryConf) ivk.listenerLock.Lock() if err != nil { log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}", - serviceConf, jerrors.ErrorStack(err), svcs) + registryConf, jerrors.ErrorStack(err), svcs) return nil, jerrors.Trace(err) } newSvcArr := newServiceArray(svcs) - ivk.cacheServiceMap[serviceKey] = newSvcArr + ivk.cacheServiceMap[registryKey] = newSvcArr return newSvcArr, nil } -func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error { +func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf *registry.ServiceConfig, req jsonrpc.Request, resp interface{}) error { - serviceArray, err := ivk.getService(serviceConf) + registryArray, err := ivk.getService(registryConf) if err != nil { return err } - if len(serviceArray.arr) == 0 { - return jerrors.New("cannot find svc " + serviceConf.String()) + if len(registryArray.arr) == 0 { + return jerrors.New("cannot find svc " + registryConf.String()) } - url, err := ivk.selector.Select(reqId, serviceArray) + url, err := ivk.selector.Select(reqId, registryArray) if err != nil { return err } @@ -181,16 +180,16 @@ func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *serv return nil } -func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { +func (ivk *Invoker) DubboCall(reqId int64, registryConf *registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { - serviceArray, err := ivk.getService(serviceConf) + registryArray, err := ivk.getService(registryConf) if err != nil { return err } - if len(serviceArray.arr) == 0 { - return jerrors.New("cannot find svc " + serviceConf.String()) + if len(registryArray.arr) == 0 { + return jerrors.New("cannot find svc " + registryConf.String()) } - url, err := ivk.selector.Select(reqId, serviceArray) + url, err := ivk.selector.Select(reqId, registryArray) if err != nil { return err } diff --git a/client/invoker/service_array.go b/client/invoker/service_array.go index a0fd49105..4516360f4 100644 --- a/client/invoker/service_array.go +++ b/client/invoker/service_array.go @@ -11,26 +11,26 @@ import ( ) import ( - "github.com/dubbo/dubbo-go/service" + "github.com/dubbo/dubbo-go/registry" ) ////////////////////////////////////////// -// service array +// registry array // should be returned by registry ,will be used by client & waiting to selector ////////////////////////////////////////// var ( - ErrServiceArrayEmpty = jerrors.New("serviceArray empty") - ErrServiceArrayTimeout = jerrors.New("serviceArray timeout") + ErrServiceArrayEmpty = jerrors.New("registryArray empty") + ErrServiceArrayTimeout = jerrors.New("registryArray timeout") ) type ServiceArray struct { - arr []*service.ServiceURL + arr []*registry.ServiceURL birth time.Time idx int64 } -func newServiceArray(arr []*service.ServiceURL) *ServiceArray { +func newServiceArray(arr []*registry.ServiceURL) *ServiceArray { return &ServiceArray{ arr: arr, birth: time.Now(), @@ -40,10 +40,12 @@ func newServiceArray(arr []*service.ServiceURL) *ServiceArray { func (s *ServiceArray) GetIdx() *int64 { return &s.idx } -func (s *ServiceArray) GetSize() int { - return len(s.arr) + +func (s *ServiceArray) GetSize() int64 { + return int64(len(s.arr)) } -func (s *ServiceArray) GetService(i int) *service.ServiceURL { + +func (s *ServiceArray) GetService(i int64) *registry.ServiceURL { return s.arr[i] } @@ -58,14 +60,14 @@ func (s *ServiceArray) String() string { return builder.String() } -func (s *ServiceArray) add(service *service.ServiceURL, ttl time.Duration) { - s.arr = append(s.arr, service) +func (s *ServiceArray) add(registry *registry.ServiceURL, ttl time.Duration) { + s.arr = append(s.arr, registry) s.birth = time.Now().Add(ttl) } -func (s *ServiceArray) del(service *service.ServiceURL, ttl time.Duration) { +func (s *ServiceArray) del(registry *registry.ServiceURL, ttl time.Duration) { for i, svc := range s.arr { - if svc.PrimitiveURL == service.PrimitiveURL { + if svc.PrimitiveURL == registry.PrimitiveURL { s.arr = append(s.arr[:i], s.arr[i+1:]...) s.birth = time.Now().Add(ttl) break diff --git a/client/selector/random.go b/client/selector/random.go index 3af3cc8e7..0161807c2 100644 --- a/client/selector/random.go +++ b/client/selector/random.go @@ -7,21 +7,21 @@ import ( import ( "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/service" + "github.com/dubbo/dubbo-go/registry" ) -type RandomSelector struct { -} +type RandomSelector struct{} func NewRandomSelector() Selector { return &RandomSelector{} } -func (s *RandomSelector) Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) { +func (s *RandomSelector) Select(ID int64, array client.ServiceArrayIf) (*registry.ServiceURL, error) { + if array.GetSize() == 0 { + return nil, ServiceArrayEmpty + } idx := atomic.AddInt64(array.GetIdx(), 1) - - idx = ((int64)(rand.Int()) + ID) % int64(array.GetSize()) - - return array.GetService(int(idx)), nil + idx = ((int64)(rand.Int()) + ID) % array.GetSize() + return array.GetService(idx), nil } diff --git a/client/selector/round_robin.go b/client/selector/round_robin.go index 8dfe0b71a..f9058974b 100644 --- a/client/selector/round_robin.go +++ b/client/selector/round_robin.go @@ -6,24 +6,21 @@ import ( import ( "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/service" + "github.com/dubbo/dubbo-go/registry" ) -type RoundRobinSelector struct { -} +type RoundRobinSelector struct{} func NewRoundRobinSelector() Selector { return &RoundRobinSelector{} } -func (s *RoundRobinSelector) Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) { +func (s *RoundRobinSelector) Select(ID int64, array client.ServiceArrayIf) (*registry.ServiceURL, error) { + if array.GetSize() == 0 { + return nil, ServiceArrayEmpty + } idx := atomic.AddInt64(array.GetIdx(), 1) - - idx = (ID + idx) % int64(array.GetSize()) - //default: // random - // idx = ((int64)(rand.Int()) + ID) % int64(arrSize) - //} - - return array.GetService(int(idx)), nil + idx = (ID + idx) % array.GetSize() + return array.GetService(idx), nil } diff --git a/client/selector/selector.go b/client/selector/selector.go index 3881d9d10..56eb82a02 100644 --- a/client/selector/selector.go +++ b/client/selector/selector.go @@ -6,7 +6,7 @@ import ( import ( "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/service" + "github.com/dubbo/dubbo-go/registry" ) var ( @@ -14,5 +14,5 @@ var ( ) type Selector interface { - Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) + Select(ID int64, array client.ServiceArrayIf) (*registry.ServiceURL, error) } diff --git a/client/service_array.go b/client/service_array.go index 6b775a91f..e24e4593b 100644 --- a/client/service_array.go +++ b/client/service_array.go @@ -1,9 +1,11 @@ package client -import "github.com/dubbo/dubbo-go/service" +import ( + "github.com/dubbo/dubbo-go/registry" +) -type ServiceArrayIf interface{ - GetIdx()*int64 - GetSize()int - GetService(i int)*service.ServiceURL +type ServiceArrayIf interface { + GetIdx() *int64 + GetSize() int64 + GetService(i int64) *registry.ServiceURL } diff --git a/dubbo/client.go b/dubbo/client.go index 958706a6b..08e0a72a3 100644 --- a/dubbo/client.go +++ b/dubbo/client.go @@ -16,7 +16,7 @@ import ( import ( "github.com/dubbo/dubbo-go/public" - svc "github.com/dubbo/dubbo-go/service" + "github.com/dubbo/dubbo-go/registry" ) var ( @@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) { } // call one way -func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error { +func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, a } // if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { +func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, r return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) } -func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{}, +func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{}, callback AsyncCallback, reply interface{}, opts ...CallOption) error { var copts CallOptions @@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, ar return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string, +func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { if opts.RequestTimeout == 0 { diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 43c3bae1e..b0e1a51f4 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -93,7 +93,7 @@ func initClient(clientConfig *examples.ClientConfig) { } for _, service := range clientConfig.Service_List { - err = clientRegistry.ConsumerRegister(&service) + err = clientRegistry.RegisterConsumer(&service) if err != nil { panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err))) return diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index 28f4f267e..bddf1da15 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -96,7 +96,7 @@ func initClient(clientConfig *examples.ClientConfig) { } for _, service := range clientConfig.Service_List { - err = clientRegistry.ConsumerRegister(&service) + err = clientRegistry.RegisterConsumer(&service) if err != nil { panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err))) return diff --git a/jsonrpc/http.go b/jsonrpc/http.go index 053a135bd..ad52fb664 100644 --- a/jsonrpc/http.go +++ b/jsonrpc/http.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "fmt" - "github.com/dubbo/dubbo-go/service" "io/ioutil" "net" "net/http" @@ -22,6 +21,7 @@ import ( import ( "github.com/dubbo/dubbo-go/public" + "github.com/dubbo/dubbo-go/registry" ) ////////////////////////////////////////////// @@ -39,8 +39,8 @@ type Request struct { contentType string } -func (r *Request) ServiceConfig() service.ServiceConfigIf { - return &service.ServiceConfig{ +func (r *Request) ServiceConfig() registry.ServiceConfigIf { + return ®istry.ServiceConfig{ Protocol: r.protocol, Service: r.service, Group: r.group, @@ -86,7 +86,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request { +func (c *HTTPClient) NewRequest(conf registry.ServiceConfig, method string, args interface{}) Request { return Request{ ID: atomic.AddInt64(&c.ID, 1), group: conf.Group, @@ -98,7 +98,7 @@ func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args } } -func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error { +func (c *HTTPClient) Call(ctx context.Context, service *registry.ServiceURL, req Request, rsp interface{}) error { // header httpHeader := http.Header{} httpHeader.Set("Content-Type", "application/json") diff --git a/jsonrpc/server.go b/jsonrpc/server.go index 142466074..a38241f1d 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -4,8 +4,6 @@ import ( "bufio" "bytes" "context" - "github.com/dubbo/dubbo-go/server" - "github.com/dubbo/dubbo-go/service" "io/ioutil" "net" "net/http" @@ -22,6 +20,10 @@ import ( jerrors "github.com/juju/errors" ) +import ( + "github.com/dubbo/dubbo-go/server" +) + const ( DefaultMaxSleepTime = 1 * time.Second // accept涓棿鏈€澶leep interval DefaultHTTPRspBufferSize = 1024 @@ -58,7 +60,7 @@ type Option func(*Options) type Options struct { Registry registry.Registry ConfList []server.ServerConfig - ServiceConfList []service.ServiceConfig + ServiceConfList []registry.ServiceConfig Timeout time.Duration } @@ -93,7 +95,7 @@ func ConfList(confList []server.ServerConfig) Option { } } -func ServiceConfList(confList []service.ServiceConfig) Option { +func ServiceConfList(confList []registry.ServiceConfig) Option { return func(o *Options) { o.ServiceConfList = confList } @@ -235,7 +237,7 @@ func (s *Server) Options() Options { func (s *Server) Handle(h Handler) error { var ( err error - serviceConf service.ServiceConfig + serviceConf registry.ProviderServiceConfig ) opts := s.Options() @@ -263,7 +265,7 @@ func (s *Server) Handle(h Handler) error { } serviceConf.Path = opts.ConfList[j].Address() - err = opts.Registry.ProviderRegister(s.opts.Registry.NewProviderServiceConfig(serviceConf)) + err = opts.Registry.RegisterProvider(registry.ProviderServiceConfig{serviceConf}) if err != nil { return err } diff --git a/registry/event.go b/registry/event.go index 97a88bc42..97f9a15c2 100644 --- a/registry/event.go +++ b/registry/event.go @@ -6,10 +6,6 @@ import ( "time" ) -import ( - "github.com/dubbo/dubbo-go/service" -) - func init() { rand.Seed(time.Now().UnixNano()) } @@ -18,31 +14,31 @@ func init() { // service url event type ////////////////////////////////////////// -type ServiceURLEventType int +type ServiceEventType int const ( - ServiceURLAdd = iota - ServiceURLDel + ServiceAdd = iota + ServiceDel ) -var serviceURLEventTypeStrings = [...]string{ - "add service url", - "delete service url", +var serviceEventTypeStrings = [...]string{ + "add service", + "delete service", } -func (t ServiceURLEventType) String() string { - return serviceURLEventTypeStrings[t] +func (t ServiceEventType) String() string { + return serviceEventTypeStrings[t] } ////////////////////////////////////////// -// service url event +// service event ////////////////////////////////////////// -type ServiceURLEvent struct { - Action ServiceURLEventType - Service *service.ServiceURL +type ServiceEvent struct { + Action ServiceEventType + Service *ServiceURL } -func (e ServiceURLEvent) String() string { - return fmt.Sprintf("ServiceURLEvent{Action{%s}, Service{%s}}", e.Action, e.Service) +func (e ServiceEvent) String() string { + return fmt.Sprintf("ServiceEvent{Action{%s}, Service{%s}}", e.Action, e.Service) } diff --git a/registry/registry.go b/registry/registry.go index 4f1e5b913..dc8bcea73 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,9 +1,5 @@ package registry -import ( - "github.com/dubbo/dubbo-go/service" -) - ////////////////////////////////////////////// // Registry Interface ////////////////////////////////////////////// @@ -12,16 +8,14 @@ import ( type Registry interface { //used for service provider calling , register services to registry - ProviderRegister(conf service.ServiceConfigIf) error + RegisterProvider(ServiceConfigIf) error //used for service consumer calling , register services cared about ,for dubbo's admin monitoring - ConsumerRegister(conf *service.ServiceConfig) error + RegisterConsumer(ServiceConfigIf) error //used for service consumer ,start listen goroutine - GetListenEvent() chan *ServiceURLEvent + GetListenEvent() chan *ServiceEvent //input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available - GetService(*service.ServiceConfig) ([]*service.ServiceURL, error) + GetService(*ServiceConfig) ([]*ServiceURL, error) Close() - //new Provider conf - NewProviderServiceConfig(service.ServiceConfig) service.ServiceConfigIf } diff --git a/service/service.go b/registry/service.go similarity index 68% rename from service/service.go rename to registry/service.go index a82e0583f..0d5749daa 100644 --- a/service/service.go +++ b/registry/service.go @@ -1,4 +1,4 @@ -package service +package registry import ( "fmt" @@ -12,6 +12,58 @@ import ( import ( jerrors "github.com/juju/errors" ) + +////////////////////////////////////////////// +// service config +////////////////////////////////////////////// + +type ServiceConfigIf interface { + Key() string + String() string + ServiceEqual(url *ServiceURL) bool +} + +type ServiceConfig struct { + Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"` + Service string `required:"true" yaml:"service" json:"service,omitempty"` + Group string `yaml:"group" json:"group,omitempty"` + Version string `yaml:"version" json:"version,omitempty"` +} + +func (c ServiceConfig) Key() string { + return fmt.Sprintf("%s@%s", c.Service, c.Protocol) +} + +func (c ServiceConfig) String() string { + return fmt.Sprintf("%s@%s-%s-%s", c.Service, c.Protocol, c.Group, c.Version) +} + +func (c ServiceConfig) ServiceEqual(url *ServiceURL) bool { + if c.Protocol != url.Protocol { + return false + } + + if c.Service != url.Query.Get("interface") { + return false + } + + if c.Group != url.Group { + return false + } + + if c.Version != url.Version { + return false + } + + return true +} + +type ProviderServiceConfig struct { + ServiceConfig + Path string `yaml:"path" json:"path,omitempty"` + Methods string `yaml:"methods" json:"methods,omitempty"` +} + ////////////////////////////////////////// // service url ////////////////////////////////////////// @@ -30,14 +82,6 @@ type ServiceURL struct { PrimitiveURL string } -func (s ServiceURL) String() string { - return fmt.Sprintf( - "ServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+ - "Timeout:%s, Version:%s, Group:%s, Weight:%d, Query:%+v}", - s.Protocol, s.Location, s.Path, s.Ip, s.Port, - s.Timeout, s.Version, s.Group, s.Weight, s.Query) -} - func NewServiceURL(urlString string) (*ServiceURL, error) { var ( err error @@ -87,6 +131,14 @@ func NewServiceURL(urlString string) (*ServiceURL, error) { return s, nil } +func (s ServiceURL) String() string { + return fmt.Sprintf( + "ServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+ + "Timeout:%s, Version:%s, Group:%s, Weight:%d, Query:%+v}", + s.Protocol, s.Location, s.Path, s.Ip, s.Port, + s.Timeout, s.Version, s.Group, s.Weight, s.Query) +} + func (s *ServiceURL) ServiceConfig() ServiceConfig { interfaceName := s.Query.Get("interface") return ServiceConfig{ @@ -111,4 +163,3 @@ func (s *ServiceURL) CheckMethod(method string) bool { return false } - diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go index f91693226..c5073db90 100644 --- a/registry/zookeeper/consumer.go +++ b/registry/zookeeper/consumer.go @@ -12,19 +12,24 @@ import ( import ( "github.com/dubbo/dubbo-go/registry" - "github.com/dubbo/dubbo-go/service" ) -func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error { +func (r *ZkRegistry) RegisterConsumer(regConf registry.ServiceConfigIf) error { var ( ok bool err error listener *zkEventListener + conf *registry.ServiceConfig ) + + if conf, ok = regConf.(*registry.ServiceConfig); !ok { + return jerrors.Errorf("the type of @regConf %T is not registry.ServiceConfig", regConf) + } + ok = false - r.Lock() + r.cltLock.Lock() _, ok = r.services[conf.Key()] - r.Unlock() + r.cltLock.Unlock() if ok { return jerrors.Errorf("Service{%s} has been registered", conf.Service) } @@ -34,9 +39,9 @@ func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error { return jerrors.Trace(err) } - r.Lock() + r.cltLock.Lock() r.services[conf.Key()] = conf - r.Unlock() + r.cltLock.Unlock() log.Debug("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) r.listenerLock.Lock() @@ -49,21 +54,21 @@ func (r *ZkRegistry) ConsumerRegister(conf *service.ServiceConfig) error { return nil } -func (r *ZkRegistry) GetListenEvent() chan *registry.ServiceURLEvent { +func (r *ZkRegistry) GetListenEvent() chan *registry.ServiceEvent { return r.outerEventCh } // name: service@protocol -func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.ServiceURL, error) { +func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.ServiceURL, error) { var ( ok bool err error dubboPath string nodes []string listener *zkEventListener - serviceURL *service.ServiceURL - serviceConfIf service.ServiceConfigIf - serviceConf *service.ServiceConfig + serviceURL *registry.ServiceURL + serviceConfIf registry.ServiceConfigIf + serviceConf *registry.ServiceConfig ) r.listenerLock.Lock() listener = r.listener @@ -73,13 +78,13 @@ func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.Service listener.listenServiceEvent(conf) } - r.Lock() + r.cltLock.Lock() serviceConfIf, ok = r.services[conf.Key()] - r.Unlock() + r.cltLock.Unlock() if !ok { return nil, jerrors.Errorf("Service{%s} has not been registered", conf.Key()) } - serviceConf, ok = serviceConfIf.(*service.ServiceConfig) + serviceConf, ok = serviceConfIf.(*registry.ServiceConfig) if !ok { return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", conf.Key()) } @@ -89,17 +94,17 @@ func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.Service if err != nil { return nil, jerrors.Trace(err) } - r.Lock() + r.cltLock.Lock() nodes, err = r.client.getChildren(dubboPath) - r.Unlock() + r.cltLock.Unlock() if err != nil { log.Warn("getChildren(dubboPath{%s}) = error{%v}", dubboPath, err) return nil, jerrors.Trace(err) } - var listenerServiceMap = make(map[string]*service.ServiceURL) + var listenerServiceMap = make(map[string]*registry.ServiceURL) for _, n := range nodes { - serviceURL, err = service.NewServiceURL(n) + serviceURL, err = registry.NewServiceURL(n) if err != nil { log.Error("NewServiceURL({%s}) = error{%v}", n, err) continue @@ -116,7 +121,7 @@ func (r *ZkRegistry) GetService(conf *service.ServiceConfig) ([]*service.Service } } - var services []*service.ServiceURL + var services []*registry.ServiceURL for _, service := range listenerServiceMap { services = append(services, service) } @@ -162,7 +167,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) { var ( ok bool zkListener *zkEventListener - serviceConf *service.ServiceConfig + serviceConf *registry.ServiceConfig ) r.listenerLock.Lock() @@ -172,9 +177,9 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) { return zkListener, nil } - r.Lock() + r.cltLock.Lock() client := r.client - r.Unlock() + r.cltLock.Unlock() if client == nil { return nil, jerrors.New("zk connection broken") } @@ -187,13 +192,13 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) { r.listenerLock.Unlock() // listen - r.Lock() + r.cltLock.Lock() for _, svs := range r.services { - if serviceConf, ok = svs.(*service.ServiceConfig); ok { + if serviceConf, ok = svs.(*registry.ServiceConfig); ok { go zkListener.listenServiceEvent(serviceConf) } } - r.Unlock() + r.cltLock.Unlock() return zkListener, nil } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 7fb21d52f..145e2e146 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -15,7 +15,6 @@ import ( import ( "github.com/dubbo/dubbo-go/registry" - "github.com/dubbo/dubbo-go/service" ) const ( @@ -23,7 +22,7 @@ const ( ) type zkEvent struct { - res *registry.ServiceURLEvent + res *registry.ServiceEvent err error } @@ -81,7 +80,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { return false } -func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *service.ServiceConfig) { +func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *registry.ServiceConfig) { contains := func(s []string, e string) bool { for _, a := range s { if a == e { @@ -101,7 +100,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co // a node was added -- listen the new node var ( newNode string - serviceURL *service.ServiceURL + serviceURL *registry.ServiceURL ) for _, n := range newChildren { if contains(children, n) { @@ -110,7 +109,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co newNode = path.Join(zkPath, n) log.Info("add zkNode{%s}", newNode) - serviceURL, err = service.NewServiceURL(n) + serviceURL, err = registry.NewServiceURL(n) if err != nil { log.Error("NewServiceURL(%s) = error{%v}", n, jerrors.ErrorStack(err)) continue @@ -120,13 +119,13 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co continue } log.Info("add serviceURL{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceURLEvent{registry.ServiceURLAdd, serviceURL}, nil} + l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil} // listen l service node - go func(node string, serviceURL *service.ServiceURL) { + go func(node string, serviceURL *registry.ServiceURL) { log.Info("delete zkNode{%s}", node) if l.listenServiceNodeEvent(node) { log.Info("delete serviceURL{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil} + l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} } log.Warn("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(newNode, serviceURL) @@ -141,7 +140,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co oldNode = path.Join(zkPath, n) log.Warn("delete zkPath{%s}", oldNode) - serviceURL, err = service.NewServiceURL(n) + serviceURL, err = registry.NewServiceURL(n) if !conf.ServiceEqual(serviceURL) { log.Warn("serviceURL{%s} has been deleted is not compatible with ServiceConfig{%#v}", serviceURL, conf) continue @@ -151,11 +150,11 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co log.Error("NewServiceURL(i{%s}) = error{%v}", n, jerrors.ErrorStack(err)) continue } - l.events <- zkEvent{®istry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil} + l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} } } -func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceConfig) { +func (l *zkEventListener) listenDirEvent(zkPath string, conf *registry.ServiceConfig) { l.wg.Add(1) defer l.wg.Done() @@ -221,13 +220,13 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf *service.ServiceCon // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | // --------> listenServiceNodeEvent -func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) { +func (l *zkEventListener) listenServiceEvent(conf *registry.ServiceConfig) { var ( err error zkPath string dubboPath string children []string - serviceURL *service.ServiceURL + serviceURL *registry.ServiceURL ) zkPath = fmt.Sprintf("/dubbo/%s/providers", conf.Service) @@ -253,7 +252,7 @@ func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) { for _, c := range children { - serviceURL, err = service.NewServiceURL(c) + serviceURL, err = registry.NewServiceURL(c) if err != nil { log.Error("NewServiceURL(r{%s}) = error{%v}", c, err) continue @@ -263,22 +262,22 @@ func (l *zkEventListener) listenServiceEvent(conf *service.ServiceConfig) { continue } log.Debug("add serviceUrl{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceURLEvent{registry.ServiceURLAdd, serviceURL}, nil} + l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil} // listen l service node dubboPath = path.Join(zkPath, c) log.Info("listen dubbo service key{%s}", dubboPath) - go func(zkPath string, serviceURL *service.ServiceURL) { + go func(zkPath string, serviceURL *registry.ServiceURL) { if l.listenServiceNodeEvent(dubboPath) { log.Debug("delete serviceUrl{%s}", serviceURL) - l.events <- zkEvent{®istry.ServiceURLEvent{registry.ServiceURLDel, serviceURL}, nil} + l.events <- zkEvent{®istry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil} } log.Warn("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(dubboPath, serviceURL) } log.Info("listen dubbo path{%s}", zkPath) - go func(zkPath string, conf *service.ServiceConfig) { + go func(zkPath string, conf *registry.ServiceConfig) { l.listenDirEvent(zkPath, conf) log.Warn("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(zkPath, conf) @@ -302,7 +301,7 @@ func (l *zkEventListener) listenEvent(r *ZkRegistry) error { if e.err != nil { return jerrors.Trace(e.err) } - if e.res.Action == registry.ServiceURLDel && !l.valid() { + if e.res.Action == registry.ServiceDel && !l.valid() { log.Warn("update @result{%s}. But its connection to registry is invalid", e.res) continue } diff --git a/registry/zookeeper/provider.go b/registry/zookeeper/provider.go index 30399e505..ef21783e2 100644 --- a/registry/zookeeper/provider.go +++ b/registry/zookeeper/provider.go @@ -6,37 +6,28 @@ import ( ) import ( - "github.com/dubbo/dubbo-go/service" + "github.com/dubbo/dubbo-go/registry" ) -type ProviderServiceConfig struct { - service.ServiceConfig -} - -func (r *ZkRegistry) NewProviderServiceConfig(config service.ServiceConfig) service.ServiceConfigIf { - return ProviderServiceConfig{ - config, - } -} -func (r *ZkRegistry) ProviderRegister(c service.ServiceConfigIf) error { +func (r *ZkRegistry) RegisterProvider(regConf registry.ServiceConfigIf) error { var ( ok bool err error - conf ProviderServiceConfig + conf registry.ProviderServiceConfig ) - if conf, ok = c.(ProviderServiceConfig); !ok { - return jerrors.Errorf("@c{%v} type is not ServiceConfig", c) + if conf, ok = regConf.(registry.ProviderServiceConfig); !ok { + return jerrors.Errorf("the tyep of @regConf{%v} is not ProviderServiceConfig", regConf) } // 妫€楠屾湇鍔℃槸鍚﹀凡缁忔敞鍐岃繃 ok = false - r.Lock() + r.cltLock.Lock() // 娉ㄦ剰姝ゅ涓巆onsumerZookeeperRegistry鐨勫樊寮傦紝consumer鐢ㄧ殑鏄痗onf.Service锛� // 鍥犱负consumer瑕佹彁渚泈atch鍔熻兘缁檚elector浣跨敤, provider鍏佽娉ㄥ唽鍚屼竴涓猻ervice鐨勫涓猤roup or version _, ok = r.services[conf.String()] - r.Unlock() + r.cltLock.Unlock() if ok { return jerrors.Errorf("Service{%s} has been registered", conf.String()) } @@ -46,9 +37,9 @@ func (r *ZkRegistry) ProviderRegister(c service.ServiceConfigIf) error { return jerrors.Annotatef(err, "register(conf:%+v)", conf) } - r.Lock() + r.cltLock.Lock() r.services[conf.String()] = conf - r.Unlock() + r.cltLock.Unlock() log.Debug("(ZkProviderRegistry)Register(conf{%#v})", conf) diff --git a/registry/zookeeper/zookeeper.go b/registry/zookeeper/registry.go similarity index 89% rename from registry/zookeeper/zookeeper.go rename to registry/zookeeper/registry.go index 659505708..521d8aad3 100644 --- a/registry/zookeeper/zookeeper.go +++ b/registry/zookeeper/registry.go @@ -18,7 +18,6 @@ import ( import ( "github.com/dubbo/dubbo-go/registry" - "github.com/dubbo/dubbo-go/service" "github.com/dubbo/dubbo-go/version" ) @@ -68,20 +67,26 @@ func WithRegistryConf(conf ZkRegistryConfig) Option { } } +///////////////////////////////////// +// zookeeper registry +///////////////////////////////////// + type ZkRegistry struct { Options birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart done chan struct{} - sync.Mutex - client *zookeeperClient - services map[string]service.ServiceConfigIf // service name + protocol -> service config + cltLock sync.Mutex + client *zookeeperClient + services map[string]registry.ServiceConfigIf // service name + protocol -> service config + listenerLock sync.Mutex listener *zkEventListener + //for provider zkPath map[string]int // key = protocol://ip:port/interface - outerEventCh chan *registry.ServiceURLEvent + outerEventCh chan *registry.ServiceEvent } func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { @@ -93,9 +98,9 @@ func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { r = &ZkRegistry{ birth: time.Now().UnixNano(), done: make(chan struct{}), - services: make(map[string]service.ServiceConfigIf), + services: make(map[string]registry.ServiceConfigIf), zkPath: make(map[string]int), - outerEventCh: make(chan *registry.ServiceURLEvent), + outerEventCh: make(chan *registry.ServiceEvent), } for _, opt := range opts { @@ -136,6 +141,7 @@ func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { return r, nil } + func (r *ZkRegistry) Close() { close(r.done) r.wg.Wait() @@ -148,8 +154,8 @@ func (r *ZkRegistry) validateZookeeperClient() error { ) err = nil - r.Lock() - defer r.Unlock() + r.cltLock.Lock() + defer r.cltLock.Unlock() if r.client == nil { r.client, err = newZookeeperClient(RegistryZkClient, r.Address, r.ZkRegistryConfig.Timeout) if err != nil { @@ -174,8 +180,8 @@ func (r *ZkRegistry) handleZkRestart() { err error flag bool failTimes int - confIf service.ServiceConfigIf - services []service.ServiceConfigIf + confIf registry.ServiceConfigIf + services []registry.ServiceConfigIf ) defer r.wg.Done() @@ -187,10 +193,10 @@ LOOP: break LOOP // re-register all services case <-r.client.done(): - r.Lock() + r.cltLock.Lock() r.client.Close() r.client = nil - r.Unlock() + r.cltLock.Unlock() // 鎺k锛岀洿鑷虫垚鍔� failTimes = 0 @@ -206,11 +212,11 @@ LOOP: r.client.zkAddrs, jerrors.ErrorStack(err)) if err == nil { // copy r.services - r.Lock() + r.cltLock.Lock() for _, confIf = range r.services { services = append(services, confIf) } - r.Unlock() + r.cltLock.Unlock() flag = true for _, confIf = range services { @@ -235,7 +241,7 @@ LOOP: } } -func (r *ZkRegistry) register(c interface{}) error { +func (r *ZkRegistry) register(c registry.ServiceConfigIf) error { var ( err error revision string @@ -245,6 +251,7 @@ func (r *ZkRegistry) register(c interface{}) error { encodedURL string dubboPath string ) + err = r.validateZookeeperClient() if err != nil { return jerrors.Trace(err) @@ -269,16 +276,19 @@ func (r *ZkRegistry) register(c interface{}) error { params.Add("revision", revision) // revision鏄痯ox.xml涓璦pplication鐨剉ersion灞炴€х殑鍊� if r.DubboType == registry.PROVIDER { + conf, ok := c.(*registry.ProviderServiceConfig) + if ok { + return fmt.Errorf("the type of @c:%+v is not *registry.ProviderServiceConfig", c) + } - conf := c.(ProviderServiceConfig) if conf.Service == "" || conf.Methods == "" { return jerrors.Errorf("conf{Service:%s, Methods:%s}", conf.Service, conf.Methods) } // 鍏堝垱寤烘湇鍔′笅闈㈢殑provider node dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, registry.DubboNodes[registry.PROVIDER]) - r.Lock() + r.cltLock.Lock() err = r.client.Create(dubboPath) - r.Unlock() + r.cltLock.Unlock() if err != nil { log.Error("zkClient.create(path{%s}) = error{%#v}", dubboPath, jerrors.ErrorStack(err)) return jerrors.Annotatef(err, "zkclient.Create(path:%s)", dubboPath) @@ -321,19 +331,23 @@ func (r *ZkRegistry) register(c interface{}) error { log.Debug("provider path:%s, url:%s", dubboPath, rawURL) } else if r.DubboType == registry.CONSUMER { - conf := c.(*service.ServiceConfig) + conf, ok := c.(*registry.ServiceConfig) + if ok { + return fmt.Errorf("the type of @c:%+v is not *registry.ServiceConfig", c) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, registry.DubboNodes[registry.CONSUMER]) - r.Lock() + r.cltLock.Lock() err = r.client.Create(dubboPath) - r.Unlock() + r.cltLock.Unlock() if err != nil { log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) return jerrors.Trace(err) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", conf.Service, registry.DubboNodes[registry.PROVIDER]) - r.Lock() + r.cltLock.Lock() err = r.client.Create(dubboPath) - r.Unlock() + r.cltLock.Unlock() if err != nil { log.Error("zkClient.create(path{%s}) = error{%v}", dubboPath, jerrors.ErrorStack(err)) return jerrors.Trace(err) @@ -378,8 +392,8 @@ func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error { zkPath string ) - r.Lock() - defer r.Unlock() + r.cltLock.Lock() + defer r.cltLock.Unlock() err = r.client.Create(root) if err != nil { log.Error("zk.Create(root{%s}) = err{%v}", root, jerrors.ErrorStack(err)) @@ -396,8 +410,8 @@ func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error { } func (r *ZkRegistry) closeRegisters() { - r.Lock() - defer r.Unlock() + r.cltLock.Lock() + defer r.cltLock.Unlock() log.Info("begin to close provider zk client") // 鍏堝叧闂棫client锛屼互鍏抽棴tmp node r.client.Close() diff --git a/service/service_config.go b/service/service_config.go deleted file mode 100644 index 8f63a151d..000000000 --- a/service/service_config.go +++ /dev/null @@ -1,52 +0,0 @@ -package service - -import "fmt" - -////////////////////////////////////////////// -// service config -////////////////////////////////////////////// - - -type ServiceConfigIf interface { - Key() string - String() string - ServiceEqual(url *ServiceURL) bool -} -type ServiceConfig struct { - Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"` - Service string `required:"true" yaml:"service" json:"service,omitempty"` - Group string `yaml:"group" json:"group,omitempty"` - Version string `yaml:"version" json:"version,omitempty"` - //add for provider - Path string `yaml:"path" json:"path,omitempty"` - Methods string `yaml:"methods" json:"methods,omitempty"` -} - -func (c ServiceConfig) Key() string { - return fmt.Sprintf("%s@%s", c.Service, c.Protocol) -} - -func (c ServiceConfig) String() string { - return fmt.Sprintf("%s@%s-%s-%s", c.Service, c.Protocol, c.Group, c.Version) -} - -func (c ServiceConfig) ServiceEqual(url *ServiceURL) bool { - if c.Protocol != url.Protocol { - return false - } - - if c.Service != url.Query.Get("interface") { - return false - } - - if c.Group != url.Group { - return false - } - - if c.Version != url.Version { - return false - } - - return true -} - -- GitLab