diff --git a/client/client_transport.go b/client/client_transport.go deleted file mode 100644 index cee1a1739cee38149d336f40e6ccfdce17900de4..0000000000000000000000000000000000000000 --- a/client/client_transport.go +++ /dev/null @@ -1,22 +0,0 @@ -package client - -import ( - "context" -) - -import ( - "github.com/dubbo/dubbo-go/registry" -) - -type Transport interface { - Call(ctx context.Context, url config.ConfigURL, request Request, resp interface{}) error - NewRequest(conf registry.ReferenceConfig, method string, args interface{}) (Request, error) -} - -////////////////////////////////////////////// -// Request -////////////////////////////////////////////// - -type Request interface { - ServiceConfig() registry.ReferenceConfig -} diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go deleted file mode 100644 index 2e4a5d48842825213c4d60a2aa48ac28ed1d05d0..0000000000000000000000000000000000000000 --- a/client/invoker/invoker.go +++ /dev/null @@ -1,224 +0,0 @@ -package invoker - -import ( - "context" - "sync" - "time" -) - -import ( - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/client/selector" - "github.com/dubbo/dubbo-go/dubbo" - "github.com/dubbo/dubbo-go/jsonrpc" - "github.com/dubbo/dubbo-go/registry" -) - -const RegistryConnDelay = 3 - -type Options struct { - ServiceTTL time.Duration - selector selector.Selector - //TODO:we should provider a transport client interface - HttpClient *jsonrpc.HTTPClient - DubboClient *dubbo.Client -} -type Option func(*Options) - -func WithServiceTTL(ttl time.Duration) Option { - return func(o *Options) { - o.ServiceTTL = ttl - } -} - -func WithHttpClient(client *jsonrpc.HTTPClient) Option { - return func(o *Options) { - o.HttpClient = client - } -} -func WithDubboClient(client *dubbo.Client) Option { - return func(o *Options) { - o.DubboClient = client - } -} - -func WithLBSelector(selector selector.Selector) Option { - return func(o *Options) { - o.selector = selector - } -} - -type Invoker struct { - Options - cacheServiceMap map[string]*ServiceArray - registry registry.Registry - listenerLock sync.Mutex -} - -func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) { - options := Options{ - //default 300s - ServiceTTL: time.Duration(300e9), - selector: selector.NewRandomSelector(), - } - for _, opt := range opts { - opt(&options) - } - if options.HttpClient == nil && options.DubboClient == nil { - return nil, jerrors.New("Must specify the transport client!") - } - invoker := &Invoker{ - Options: options, - cacheServiceMap: make(map[string]*ServiceArray), - registry: registry, - } - go invoker.listen() - return invoker, nil -} - -func (ivk *Invoker) listen() { - for { - if ivk.registry.IsClosed() { - log.Warn("event listener game over.") - return - } - - listener, err := ivk.registry.Subscribe() - if err != nil { - if ivk.registry.IsClosed() { - log.Warn("event listener game over.") - return - } - log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } - - for { - if serviceEvent, err := listener.Next(); err != nil { - log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err)) - listener.Close() - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - return - } else { - ivk.update(serviceEvent) - } - - } - - } -} - -func (ivk *Invoker) update(res *registry.ServiceEvent) { - if res == nil || res.Service == nil { - return - } - - log.Debug("registry update, result{%s}", res) - registryKey := res.Service.ServiceConfig().Key() - - ivk.listenerLock.Lock() - defer ivk.listenerLock.Unlock() - - svcArr, ok := ivk.cacheServiceMap[registryKey] - log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr) - - switch res.Action { - case registry.ServiceAdd: - if ok { - svcArr.add(res.Service, ivk.ServiceTTL) - } else { - ivk.cacheServiceMap[registryKey] = newServiceArray([]config.ConfigURL{res.Service}) - } - case registry.ServiceDel: - if ok { - svcArr.del(res.Service, ivk.ServiceTTL) - if len(svcArr.arr) == 0 { - delete(ivk.cacheServiceMap, registryKey) - log.Warn("delete registry %s from registry map", registryKey) - } - } - log.Error("selector delete registryURL{%s}", res.Service) - } -} - -func (ivk *Invoker) getService(registryConf registry.ReferenceConfig) (*ServiceArray, error) { - defer ivk.listenerLock.Unlock() - - registryKey := registryConf.Key() - - ivk.listenerLock.Lock() - svcArr, sok := ivk.cacheServiceMap[registryKey] - log.Debug("r.svcArr[registryString{%v}] = svcArr{%s}", registryKey, svcArr) - if sok && time.Since(svcArr.birth) < ivk.Options.ServiceTTL { - return svcArr, nil - } - ivk.listenerLock.Unlock() - - svcs, err := ivk.registry.GetService(registryConf) - ivk.listenerLock.Lock() - - if err != nil { - log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}", - registryConf, jerrors.ErrorStack(err), svcs) - - return nil, jerrors.Trace(err) - } - - newSvcArr := newServiceArray(svcs) - ivk.cacheServiceMap[registryKey] = newSvcArr - return newSvcArr, nil -} - -func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, req client.Request, resp interface{}) error { - - serviceConf := req.ServiceConfig() - registryArray, err := ivk.getService(serviceConf) - if err != nil { - return err - } - if len(registryArray.arr) == 0 { - return jerrors.New("cannot find svc " + serviceConf.String()) - } - url, err := ivk.selector.Select(reqId, registryArray) - if err != nil { - return err - } - if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil { - log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) - return err - } - log.Info("response result:%s", resp) - return nil -} - -func (ivk *Invoker) DubboCall(reqId int64, registryConf registry.ReferenceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { - - registryArray, err := ivk.getService(registryConf) - if err != nil { - return err - } - if len(registryArray.arr) == 0 { - return jerrors.New("cannot find svc " + registryConf.String()) - } - url, err := ivk.selector.Select(reqId, registryArray) - if err != nil { - return err - } - //TODO:这里要改一下call方法改为接收指针类型 - if err = ivk.DubboClient.Call(url.Ip()+":"+url.Port(), url, method, args, reply, opts...); err != nil { - log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) - return err - } - log.Info("response result:%s", reply) - return nil -} - -func (ivk *Invoker) Close() { - ivk.DubboClient.Close() -} diff --git a/client/invoker/service_array.go b/client/invoker/service_array.go deleted file mode 100644 index 2bd634cf88ac7f82c7331ed5af4499235398e0b8..0000000000000000000000000000000000000000 --- a/client/invoker/service_array.go +++ /dev/null @@ -1,76 +0,0 @@ -package invoker - -import ( - "fmt" - "strings" - "time" -) - -import ( - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/dubbo-go/registry" -) - -////////////////////////////////////////// -// registry array -// should be returned by registry ,will be used by client & waiting to selector -////////////////////////////////////////// - -var ( - ErrServiceArrayEmpty = jerrors.New("registryArray empty") - ErrServiceArrayTimeout = jerrors.New("registryArray timeout") -) - -type ServiceArray struct { - arr []config.ConfigURL - birth time.Time - idx int64 -} - -func newServiceArray(arr []config.ConfigURL) *ServiceArray { - return &ServiceArray{ - arr: arr, - birth: time.Now(), - } -} - -func (s *ServiceArray) GetIdx() *int64 { - return &s.idx -} - -func (s *ServiceArray) GetSize() int64 { - return int64(len(s.arr)) -} - -func (s *ServiceArray) GetService(i int64) config.ConfigURL { - return s.arr[i] -} - -func (s *ServiceArray) String() string { - var builder strings.Builder - builder.WriteString(fmt.Sprintf("birth:%s, idx:%d, arr len:%d, arr:{", s.birth, s.idx, len(s.arr))) - for i := range s.arr { - builder.WriteString(fmt.Sprintf("%d:%s, ", i, s.arr[i])) - } - builder.WriteString("}") - - return builder.String() -} - -func (s *ServiceArray) add(registry config.ConfigURL, ttl time.Duration) { - s.arr = append(s.arr, registry) - s.birth = time.Now().Add(ttl) -} - -func (s *ServiceArray) del(registry config.ConfigURL, ttl time.Duration) { - for i, svc := range s.arr { - if svc.PrimitiveURL() == registry.PrimitiveURL() { - s.arr = append(s.arr[:i], s.arr[i+1:]...) - s.birth = time.Now().Add(ttl) - break - } - } -} diff --git a/client/selector/random.go b/client/selector/random.go deleted file mode 100644 index 906e2d7b56efcb7963ebf2d2ddc0c2cc39ad5fda..0000000000000000000000000000000000000000 --- a/client/selector/random.go +++ /dev/null @@ -1,27 +0,0 @@ -package selector - -import ( - "math/rand" - "sync/atomic" -) - -import ( - "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/registry" -) - -type RandomSelector struct{} - -func NewRandomSelector() Selector { - return &RandomSelector{} -} - -func (s *RandomSelector) Select(ID int64, array client.ServiceArrayIf) (config.ConfigURL, error) { - if array.GetSize() == 0 { - return nil, ServiceArrayEmpty - } - - idx := atomic.AddInt64(array.GetIdx(), 1) - idx = ((int64)(rand.Int()) + ID) % array.GetSize() - return array.GetService(idx), nil -} diff --git a/client/selector/round_robin.go b/client/selector/round_robin.go deleted file mode 100644 index 449d803fe65fec565cd4b5b2971fdbde3e5240cd..0000000000000000000000000000000000000000 --- a/client/selector/round_robin.go +++ /dev/null @@ -1,26 +0,0 @@ -package selector - -import ( - "sync/atomic" -) - -import ( - "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/registry" -) - -type RoundRobinSelector struct{} - -func NewRoundRobinSelector() Selector { - return &RoundRobinSelector{} -} - -func (s *RoundRobinSelector) Select(ID int64, array client.ServiceArrayIf) (config.ConfigURL, error) { - if array.GetSize() == 0 { - return nil, ServiceArrayEmpty - } - - idx := atomic.AddInt64(array.GetIdx(), 1) - idx = (ID + idx) % array.GetSize() - return array.GetService(idx), nil -} diff --git a/client/selector/selector.go b/client/selector/selector.go deleted file mode 100644 index 3f72b2b31737635fae676b1b4ac930f5e3776778..0000000000000000000000000000000000000000 --- a/client/selector/selector.go +++ /dev/null @@ -1,18 +0,0 @@ -package selector - -import ( - "fmt" -) - -import ( - "github.com/dubbo/dubbo-go/client" - "github.com/dubbo/dubbo-go/registry" -) - -var ( - ServiceArrayEmpty = fmt.Errorf("emtpy service array") -) - -type Selector interface { - Select(ID int64, array client.ServiceArrayIf) (config.ConfigURL, error) -} diff --git a/client/service_array.go b/client/service_array.go deleted file mode 100644 index d6f13e0286bda6483549e3d224f169d1f5b2c58b..0000000000000000000000000000000000000000 --- a/client/service_array.go +++ /dev/null @@ -1,9 +0,0 @@ -package client - -import "github.com/dubbo/dubbo-go/registry" - -type ServiceArrayIf interface { - GetIdx() *int64 - GetSize() int64 - GetService(i int64) config.ConfigURL -} diff --git a/cluster/directory.go b/cluster/directory.go index 3924971f58b55171d5573e6ab2f8b16be3c23fa7..b3cf9a0b25386936ec7f27a4eeceb1d5e31f4a1c 100644 --- a/cluster/directory.go +++ b/cluster/directory.go @@ -1,9 +1,12 @@ package cluster -import "github.com/dubbo/dubbo-go/common" +import ( + "github.com/dubbo/dubbo-go/common" + "github.com/dubbo/dubbo-go/protocol" +) // Extension - Directory type Directory interface { common.Node - List() + List(invocation protocol.Invocation) []protocol.Invoker } diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index ccb97eabca2d0095555bae63e6f5e5f9ace64f9c..0a34c26a8006d72fa2886d1308ae04e854cdb625 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -24,3 +24,7 @@ func (dir *BaseDirectory) GetUrl() config.IURL { func (dir *BaseDirectory) Destroy() { dir.destroyed = false } + +func (dir *BaseDirectory) Context() context.Context { + return dir.context +} diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go new file mode 100644 index 0000000000000000000000000000000000000000..f5a7a20f801f518270b6751fdb96a8e4603e184e --- /dev/null +++ b/cluster/directory/static_directory.go @@ -0,0 +1,33 @@ +package directory + +import ( + "context" + "github.com/dubbo/dubbo-go/protocol" +) + +type StaticDirectory struct { + BaseDirectory + invokers []protocol.Invoker +} + +func NewStaticDirectory(context context.Context, invokers []protocol.Invoker) *StaticDirectory { + return &StaticDirectory{ + BaseDirectory: NewBaseDirectory(context, nil), + invokers: invokers, + } +} + +//for-loop invokers ,if all invokers is available ,then it means directory is available +func (dir *StaticDirectory) IsAvailable() bool { + for _, invoker := range dir.invokers { + if !invoker.IsAvailable() { + return false + } + } + return true +} + +func (dir *StaticDirectory) List(invocation protocol.Invocation) []protocol.Invoker { + //TODO:Here should add router + return dir.invokers +} diff --git a/cluster/router.go b/cluster/router.go index 4792edbc276b925cdb34856835b33c2a5289bfb9..26de1845a5ef6e0d1867fd77416c87f672b59954 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -1,6 +1,24 @@ package cluster +import ( + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + // Extension - Router + +type RouterFactory interface { + Router(config.IURL) Router +} + type Router interface { - Route() + Route([]protocol.Invoker, config.IURL, protocol.Invocation) []protocol.Invoker +} + +type RouterChain struct { + routers []Router +} + +func NewRouterChain(url config.URL) { + } diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 1183d884010fab6afe91d2b21e5571c3c746c70e..688551b98e7710755425953ab4e88830cb19cb40 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -1,6 +1,8 @@ package extension -import "github.com/dubbo/dubbo-go/protocol" +import ( + "github.com/dubbo/dubbo-go/protocol" +) var ( protocols map[string]func() protocol.Protocol @@ -10,13 +12,6 @@ func init() { protocols = make(map[string]func() protocol.Protocol) } -func SetRefProtocol(fn func() protocol.Protocol) { - protocols["refProtocol"] = fn -} - -func GetRefProtocol() protocol.Protocol { - return protocols["refProtocol"]() -} func SetProtocol(name string, v func() protocol.Protocol) { protocols[name] = v } diff --git a/config/config_url.go b/config/config_url.go index 75418eb4f77da66848706d171c703947312e354a..0e6c16190dc2ebc0d5192d86d12f8df05fcade6e 100644 --- a/config/config_url.go +++ b/config/config_url.go @@ -1,6 +1,7 @@ package config import ( + "context" "fmt" "net" "net/url" @@ -16,6 +17,7 @@ import ( type IURL interface { Key() string URLEqual(IURL) bool + Context() context.Context } type baseUrl struct { @@ -26,6 +28,7 @@ type baseUrl struct { Timeout time.Duration Params url.Values PrimitiveURL string + ctx context.Context } type URL struct { @@ -39,6 +42,11 @@ type URL struct { Version string `yaml:"version" json:"version,omitempty"` Group string `yaml:"group" json:"group,omitempty"` + + Username string + Password string + + //reference only Cluster string } @@ -115,3 +123,13 @@ func (c URL) String() string { c.Protocol, c.Location, c.Path, c.Ip, c.Port, c.Timeout, c.Version, c.Group, c.Weight, c.Params) } + +func (c *URL) ToFullString() string { + return fmt.Sprintf( + "%s://%s:%s@%s:%s/%s?%s&%s&%s&%s", + c.Protocol, c.Password, c.Username, c.Ip, c.Port, c.Path, c.Methods, c.Version, c.Group, c.Params) +} + +func (c *URL) Context() context.Context { + return c.ctx +} diff --git a/config/reference_config.go b/config/reference_config.go index 31c087cd4a92357f3214f069bc8656a777f1beff..fb02fe7f3905feba43bee7d869a97ee5f273a47b 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -7,7 +7,7 @@ import ( import "github.com/dubbo/dubbo-go/common/extension" -var refprotocol = extension.GetRefProtocol() +var refprotocol = extension.GetProtocolExtension("registry") type ReferenceConfig struct { context context.Context @@ -37,22 +37,22 @@ func (refconfig *ReferenceConfig) CreateProxy() { urls := refconfig.loadRegistries() if len(urls) == 1 { - refprotocol.Export() + refprotocol.Refer(urls[0]) } else { } } -func (refconfig *ReferenceConfig) loadRegistries() []URL { - var urls []URL +func (refconfig *ReferenceConfig) loadRegistries() []*RegistryURL { + var urls []*RegistryURL for _, registry := range refconfig.Registries { for _, registryConf := range consumerConfig.Registries { if registry.string == registryConf.Id { - url, err := NewURL(registryConf.Address) + url, err := NewRegistryURL(registryConf.Address) if err != nil { log.Error("The registry id:%s url is invalid ,and will skip the registry", registryConf.Id) } else { - urls = append(urls, *url) + urls = append(urls, url) } } diff --git a/config/registry_url.go b/config/registry_url.go index 2f6262c802e2d5840d840f186fe7d109b5b13422..208991d2619b8e7dd2ae5a26aad1df006d5fb199 100644 --- a/config/registry_url.go +++ b/config/registry_url.go @@ -1,6 +1,17 @@ package config -import "fmt" +import ( + "context" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "time" +) +import ( + jerrors "github.com/juju/errors" +) ///////////////////////////////// // dubbo role type @@ -47,6 +58,60 @@ type RegistryURL struct { Address string `yaml:"address" json:"address,omitempty"` } +func NewRegistryURL(urlString string) (*RegistryURL, error) { + + var ( + err error + rawUrlString string + serviceUrl *url.URL + s = &RegistryURL{} + ) + + // new a null instance + if urlString == "" { + return s, nil + } + + rawUrlString, err = url.QueryUnescape(urlString) + if err != nil { + return nil, jerrors.Errorf("url.QueryUnescape(%s), error{%v}", urlString, err) + } + + serviceUrl, err = url.Parse(rawUrlString) + if err != nil { + return nil, jerrors.Errorf("url.Parse(url string{%s}), error{%v}", rawUrlString, err) + } + + s.Params, err = url.ParseQuery(serviceUrl.RawQuery) + if err != nil { + return nil, jerrors.Errorf("url.ParseQuery(raw url string{%s}), error{%v}", serviceUrl.RawQuery, err) + } + + s.PrimitiveURL = urlString + s.Protocol = serviceUrl.Scheme + s.Location = serviceUrl.Host + if strings.Contains(s.Location, ":") { + s.Ip, s.Port, err = net.SplitHostPort(s.Location) + if err != nil { + return nil, jerrors.Errorf("net.SplitHostPort(Url.Host{%s}), error{%v}", s.Location, err) + } + } + s.Group = s.Params.Get("group") + s.Version = s.Params.Get("version") + timeoutStr := s.Params.Get("timeout") + if len(timeoutStr) == 0 { + timeoutStr = s.Params.Get("default.timeout") + } + if len(timeoutStr) != 0 { + timeout, err := strconv.Atoi(timeoutStr) + if err == nil && timeout != 0 { + s.Timeout = time.Duration(timeout * 1e6) // timeout unit is millisecond + } + } + + return s, nil +} + func (c *RegistryURL) Key() string { return fmt.Sprintf("%s-%s-%s-%s-%s", c.Protocol, c.Group, c.Location, c.Version, c.DubboType) } @@ -58,3 +123,7 @@ func (c *RegistryURL) URLEqual(url IURL) bool { } return true } + +func (c *RegistryURL) Context() context.Context { + return c.ctx +} diff --git a/plugins/plugins.go b/plugins/plugins.go deleted file mode 100644 index b71588683514a4fe9b91d4b14796ea7bb5777752..0000000000000000000000000000000000000000 --- a/plugins/plugins.go +++ /dev/null @@ -1,49 +0,0 @@ -package plugins - -// -//var PluggableRegistries = map[string]func(...registry.RegistryOption) (registry.Registry, error){} -// -//var PluggableLoadbalance = map[string]func() selector.Selector{ -// "round_robin": selector.NewRoundRobinSelector, -// "random": selector.NewRandomSelector, -//} -// -//// service configuration plugins , related to SeviceConfig for consumer paramters / ProviderSeviceConfig for provider parameters / -// -//// TODO:ServiceEven & URL subscribed by consumer from provider's listener shoud abstract to interface -//var PluggableServiceConfig = map[string]func() registry.ReferenceConfig{ -// "default": registry.NewServiceConfig, -//} -//var PluggableProviderServiceConfig = map[string]func() registry.ProviderServiceConfig{ -// "default": registry.NewDefaultProviderServiceConfig, -//} -// -//var PluggableServiceURL = map[string]func(string) (config.URL, error){ -// "default": registry.NewDefaultServiceURL, -//} -// -//var defaultServiceConfig = registry.NewServiceConfig -//var defaultProviderServiceConfig = registry.NewDefaultProviderServiceConfig -// -//var defaultServiceURL = registry.NewDefaultServiceURL -// -//func SetDefaultServiceConfig(s string) { -// defaultServiceConfig = PluggableServiceConfig[s] -//} -//func DefaultServiceConfig() func() registry.ReferenceConfig { -// return defaultServiceConfig -//} -// -//func SetDefaultProviderServiceConfig(s string) { -// defaultProviderServiceConfig = PluggableProviderServiceConfig[s] -//} -//func DefaultProviderServiceConfig() func() registry.ProviderServiceConfig { -// return defaultProviderServiceConfig -//} -// -//func SetDefaultServiceURL(s string) { -// defaultServiceURL = PluggableServiceURL[s] -//} -//func DefaultServiceURL() func(string) (config.URL, error) { -// return defaultServiceURL -//} diff --git a/protocol/protocol.go b/protocol/protocol.go index 1ce767be1c8d040261c17a9d617e185b7e9b2346..a9da663585e622bdf5f67223e33c3977910e95c8 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -5,7 +5,7 @@ import "github.com/dubbo/dubbo-go/config" // Extension - Protocol type Protocol interface { Export(invoker Invoker) Exporter - Refer(url config.URL) Invoker + Refer(url config.IURL) Invoker Destroy() } diff --git a/registry/directory.go b/registry/directory.go index 026025078bdfe303c50d933cbf8479a2cc4d7245..a2f352a6327581ca21a44b6499d018aec17b67f3 100644 --- a/registry/directory.go +++ b/registry/directory.go @@ -13,7 +13,9 @@ import ( import ( "github.com/dubbo/dubbo-go/cluster/directory" + "github.com/dubbo/dubbo-go/common/extension" "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" ) type Options struct { @@ -29,10 +31,12 @@ func WithServiceTTL(ttl time.Duration) Option { type RegistryDirectory struct { directory.BaseDirectory - cacheService *directory.ServiceArray - listenerLock sync.Mutex - serviceType string - registry Registry + cacheInvokers []protocol.Invoker + listenerLock sync.Mutex + serviceType string + registry Registry + cacheInvokersMap sync.Map //use sync.map + //cacheInvokersMap map[string]protocol.Invoker Options } @@ -46,11 +50,12 @@ func NewRegistryDirectory(ctx context.Context, url *config.RegistryURL, registry } return &RegistryDirectory{ - BaseDirectory: directory.NewBaseDirectory(ctx, url), - cacheService: directory.NewServiceArray(ctx, []config.URL{}), - serviceType: url.URL.Service, - registry: registry, - Options: options, + BaseDirectory: directory.NewBaseDirectory(ctx, url), + cacheInvokers: []protocol.Invoker{}, + cacheInvokersMap: sync.Map{}, + serviceType: url.URL.Service, + registry: registry, + Options: options, } } @@ -80,7 +85,7 @@ func (dir *RegistryDirectory) subscribe(url config.URL) { time.Sleep(time.Duration(RegistryConnDelay) * time.Second) return } else { - dir.update(serviceEvent) + go dir.update(serviceEvent) } } @@ -96,25 +101,100 @@ func (dir *RegistryDirectory) update(res *ServiceEvent) { log.Debug("registry update, result{%s}", res) - dir.listenerLock.Lock() - defer dir.listenerLock.Unlock() - log.Debug("update service name: %s!", res.Service) + dir.refreshInvokers(res) +} + +func (dir *RegistryDirectory) refreshInvokers(res *ServiceEvent) { + var newCacheInvokersMap sync.Map + switch res.Action { case ServiceAdd: - dir.cacheService.Add(res.Service, dir.serviceTTL) - + //dir.cacheService.Add(res.Service, dir.serviceTTL) + newCacheInvokersMap = dir.cacheInvoker(res.Service) case ServiceDel: - dir.cacheService.Del(res.Service, dir.serviceTTL) + //dir.cacheService.Del(res.Service, dir.serviceTTL) + newCacheInvokersMap = dir.uncacheInvoker(res.Service) + log.Info("selector delete service url{%s}", res.Service) + default: + return + } + + newInvokers := dir.toGroupInvokers(newCacheInvokersMap) + + dir.listenerLock.Lock() + defer dir.listenerLock.Unlock() + dir.cacheInvokers = newInvokers +} + +func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap sync.Map) []protocol.Invoker { + + newInvokersList := []protocol.Invoker{} + groupInvokersMap := make(map[string][]protocol.Invoker) + groupInvokersList := []protocol.Invoker{} + + newInvokersMap.Range(func(key, value interface{}) bool { + newInvokersList = append(newInvokersList, value.(protocol.Invoker)) + return true + }) - log.Error("selector delete service url{%s}", res.Service) + for _, invoker := range newInvokersList { + group := invoker.GetUrl().(*config.URL).Group + + if _, ok := groupInvokersMap[group]; ok { + groupInvokersMap[group] = append(groupInvokersMap[group], invoker) + } else { + groupInvokersMap[group] = []protocol.Invoker{} + } } + if len(groupInvokersMap) == 1 { + //len is 1 it means no group setting ,so do not need cluster again + groupInvokersList = groupInvokersMap[""] + } else { + for _, invokers := range groupInvokersMap { + staticDir := directory.NewStaticDirectory(dir.Context(), invokers) + cluster := extension.GetCluster(dir.GetUrl().(*config.RegistryURL).URL.Cluster, dir.Context()) + groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) + } + } + + return groupInvokersList } -func (dir *RegistryDirectory) List(){ +func (dir *RegistryDirectory) uncacheInvoker(url config.URL) sync.Map { + log.Debug("service will be deleted in cache invokers: invokers key is %s!", url.ToFullString()) + newCacheInvokers := dir.cacheInvokersMap + newCacheInvokers.Delete(url.ToFullString()) + return newCacheInvokers +} + +func (dir *RegistryDirectory) cacheInvoker(url config.URL) sync.Map { + //check the url's protocol is equal to the protocol which is configured in reference config + referenceUrl := dir.GetUrl().(*config.RegistryURL).URL + newCacheInvokers := dir.cacheInvokersMap + if url.Protocol == referenceUrl.Protocol { + url = mergeUrl(url, referenceUrl) + + if _, ok := newCacheInvokers.Load(url.ToFullString()); !ok { + + log.Debug("service will be added in cache invokers: invokers key is %s!", url.ToFullString()) + newInvoker,err := extension.GetProtocolExtension(url.Protocol).Refer(url) + if err!=nil{ + } + newCacheInvokers.Store(url.ToFullString(), newInvoker) + } + } + return newCacheInvokers +} + +//select the protocol invokers from the directory +func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { + //TODO:router + return dir.cacheInvokers } + func (dir *RegistryDirectory) IsAvailable() bool { return true } @@ -122,3 +202,9 @@ func (dir *RegistryDirectory) IsAvailable() bool { func (dir *RegistryDirectory) Destroy() { dir.BaseDirectory.Destroy() } + +// in this function we should merge the reference local url config into the service url from registry. +//for some reason(I have not finish the service module, so this function marked as TODO) +func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL { + return serviceUrl +} diff --git a/registry/protocol.go b/registry/protocol.go index 98a0af35a5dc10b49fba2fc01d52a7ed0285f461..7f3744c8a75531e5d6d0fc86acb8d8b4fb7c7f09 100644 --- a/registry/protocol.go +++ b/registry/protocol.go @@ -1,13 +1,11 @@ package registry import ( - "context" "sync" - "time" ) import ( - jerrors "github.com/juju/errors" + log "github.com/AlexStocks/log4go" ) import ( @@ -19,24 +17,22 @@ import ( const RegistryConnDelay = 3 type RegistryProtocol struct { - context context.Context // Registry Map<RegistryAddress, Registry> registies map[string]Registry registiesMutex sync.Mutex } func init() { - extension.SetRefProtocol(NewRegistryProtocol) + extension.SetProtocol("registry", NewRegistryProtocol) } -func NewRegistryProtocol(ctx context.Context) protocol.Protocol { +func NewRegistryProtocol() protocol.Protocol { return &RegistryProtocol{ - context: ctx, registies: make(map[string]Registry), } } -func (protocol *RegistryProtocol) Refer(url config.IURL) (protocol.Invoker, error) { +func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker { var regUrl = url.(*config.RegistryURL) var serviceUrl = regUrl.URL @@ -47,24 +43,25 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) (protocol.Invoker, erro if reg, ok = protocol.registies[url.Key()]; !ok { var err error - reg, err = extension.GetRegistryExtension(regUrl.Protocol, protocol.context, regUrl) + reg, err = extension.GetRegistryExtension(regUrl.Protocol, url.Context(), regUrl) if err != nil { - return nil, err + log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) + panic(err.Error()) } else { protocol.registies[url.Key()] = reg } } //new registry directory for store service url from registry - directory := NewRegistryDirectory(protocol.context, regUrl, reg) + directory := NewRegistryDirectory(url.Context(), regUrl, reg) go directory.subscribe(serviceUrl) //new cluster invoker - cluster := extension.GetCluster(serviceUrl.Cluster, protocol.context) - return cluster.Join(directory), nil + cluster := extension.GetCluster(serviceUrl.Cluster, url.Context()) + return cluster.Join(directory) } -func (*RegistryProtocol) Export() { - +func (*RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + return nil } func (*RegistryProtocol) Destroy() { diff --git a/server/config.go b/server/config.go deleted file mode 100644 index cf6eb1788b754ff6c493e69d9177546940294a3d..0000000000000000000000000000000000000000 --- a/server/config.go +++ /dev/null @@ -1,13 +0,0 @@ -package server - -import "github.com/AlexStocks/goext/net" - -type ServerConfig struct { - Protocol string `required:"true",default:"dubbo" yaml:"protocol" json:"protocol,omitempty"` // codec string, jsonrpc etc - IP string `yaml:"ip" json:"ip,omitempty"` - Port int `required:"true" yaml:"port" json:"port,omitempty"` -} - -func (c *ServerConfig) Address() string { - return gxnet.HostAddress(c.IP, c.Port) -}