From b513f2014b77c765eacab66aca72aff6f6056148 Mon Sep 17 00:00:00 2001 From: "vito.he" <hxmhlt@163.com> Date: Tue, 23 Apr 2019 15:27:38 +0800 Subject: [PATCH] Ftr:2.6.0 registry & cluster --- cluster/cluster.go | 7 +++ cluster/directory.go | 3 + cluster/directory/base_directory.go | 26 ++++++++ cluster/directory/service_array.go | 19 +++--- cluster/support/base_cluster_invoker.go | 42 +++++++++++++ cluster/support/failover_cluster.go | 27 ++++++++ cluster/support/failover_cluster_invoker.go | 21 +++++++ common/extension/cluster.go | 18 ++++++ common/extension/registry.go | 13 ++-- common/node.go | 9 +++ config/config_url.go | 3 +- config/consumer_config.yml | 4 ++ config/reference_config.go | 15 +++-- go.mod | 1 + go.sum | 2 + protocol/invocation.go | 8 +++ protocol/invoker.go | 5 +- registry/directory.go | 69 ++++++++++++++------- registry/protocol.go | 27 +++++--- registry/zookeeper/registry.go | 5 +- 20 files changed, 270 insertions(+), 54 deletions(-) create mode 100644 cluster/cluster.go create mode 100644 cluster/directory/base_directory.go create mode 100644 cluster/support/base_cluster_invoker.go create mode 100644 cluster/support/failover_cluster.go create mode 100644 cluster/support/failover_cluster_invoker.go create mode 100644 common/extension/cluster.go create mode 100644 common/node.go create mode 100644 protocol/invocation.go diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 000000000..8e9da2076 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,7 @@ +package cluster + +import "github.com/dubbo/dubbo-go/protocol" + +type Cluster interface { + Join(Directory)protocol.Invoker +} diff --git a/cluster/directory.go b/cluster/directory.go index d8e3be40d..3924971f5 100644 --- a/cluster/directory.go +++ b/cluster/directory.go @@ -1,6 +1,9 @@ package cluster +import "github.com/dubbo/dubbo-go/common" + // Extension - Directory type Directory interface { + common.Node List() } diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go new file mode 100644 index 000000000..ccb97eabc --- /dev/null +++ b/cluster/directory/base_directory.go @@ -0,0 +1,26 @@ +package directory + +import ( + "context" + "github.com/dubbo/dubbo-go/config" +) + +type BaseDirectory struct { + context context.Context + url *config.RegistryURL + destroyed bool +} + +func NewBaseDirectory(ctx context.Context, url *config.RegistryURL) BaseDirectory { + return BaseDirectory{ + context: ctx, + url: url, + } +} +func (dir *BaseDirectory) GetUrl() config.IURL { + return dir.url +} + +func (dir *BaseDirectory) Destroy() { + dir.destroyed = false +} diff --git a/cluster/directory/service_array.go b/cluster/directory/service_array.go index f37966178..9f1661442 100644 --- a/cluster/directory/service_array.go +++ b/cluster/directory/service_array.go @@ -1,6 +1,7 @@ package directory import ( + "context" "fmt" "strings" @@ -24,15 +25,17 @@ var ( ) type ServiceArray struct { - arr []config.URL - birth time.Time - idx int64 + context context.Context + arr []config.URL + birth time.Time + idx int64 } -func NewServiceArray(arr []config.URL) *ServiceArray { +func NewServiceArray(ctx context.Context, arr []config.URL) *ServiceArray { return &ServiceArray{ - arr: arr, - birth: time.Now(), + context: ctx, + arr: arr, + birth: time.Now(), } } @@ -59,12 +62,12 @@ func (s *ServiceArray) String() string { return builder.String() } -func (s *ServiceArray) add(url config.URL, ttl time.Duration) { +func (s *ServiceArray) Add(url config.URL, ttl time.Duration) { s.arr = append(s.arr, url) s.birth = time.Now().Add(ttl) } -func (s *ServiceArray) del(url config.URL, ttl time.Duration) { +func (s *ServiceArray) Del(url config.URL, ttl time.Duration) { for i, svc := range s.arr { if svc.PrimitiveURL == url.PrimitiveURL { s.arr = append(s.arr[:i], s.arr[i+1:]...) diff --git a/cluster/support/base_cluster_invoker.go b/cluster/support/base_cluster_invoker.go new file mode 100644 index 000000000..81b078c11 --- /dev/null +++ b/cluster/support/base_cluster_invoker.go @@ -0,0 +1,42 @@ +package cluster + +import ( + "context" + "github.com/tevino/abool" +) + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/config" +) + +type baseClusterInvoker struct { + context context.Context + directory cluster.Directory + availablecheck bool + destroyed *abool.AtomicBool +} + +func newBaseClusterInvoker(ctx context.Context, directory cluster.Directory) baseClusterInvoker { + return baseClusterInvoker{ + context: ctx, + directory: directory, + availablecheck: false, + destroyed: abool.NewBool(false), + } +} +func (invoker *baseClusterInvoker) GetUrl() config.IURL { + return invoker.directory.GetUrl() +} + +func (invoker *baseClusterInvoker) Destroy() { + //this is must atom operation + if invoker.destroyed.SetToIf(false, true) { + invoker.directory.Destroy() + } +} + +func (invoker *baseClusterInvoker) IsAvailable() bool { + //TODO:不理解java版本中关于stikyInvoker的逻辑所以先不写 + return invoker.directory.IsAvailable() +} diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go new file mode 100644 index 000000000..1f2cfbc59 --- /dev/null +++ b/cluster/support/failover_cluster.go @@ -0,0 +1,27 @@ +package cluster + +import ( + "context" + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/protocol" +) + +type FailoverCluster struct { + context context.Context +} +const name = "failover" + +func init(){ + extension.SetCluster(name,NewFailoverCluster) +} + +func NewFailoverCluster(ctx context.Context) cluster.Cluster { + return &FailoverCluster{ + context: ctx, + } +} + +func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { + return NewFailoverClusterInvoker(cluster.context, directory) +} diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go new file mode 100644 index 000000000..c12344792 --- /dev/null +++ b/cluster/support/failover_cluster_invoker.go @@ -0,0 +1,21 @@ +package cluster + +import ( + "context" + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/protocol" +) + +type failoverClusterInvoker struct { + baseClusterInvoker +} + +func NewFailoverClusterInvoker(ctx context.Context, directory cluster.Directory) protocol.Invoker { + return &failoverClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(ctx, directory), + } +} + +func (invoker *failoverClusterInvoker) Invoke() { + +} diff --git a/common/extension/cluster.go b/common/extension/cluster.go new file mode 100644 index 000000000..cc3f6999f --- /dev/null +++ b/common/extension/cluster.go @@ -0,0 +1,18 @@ +package extension + +import ( + "context" + "github.com/dubbo/dubbo-go/cluster" +) + +var ( + clusters = make(map[string]func(ctx context.Context) cluster.Cluster) +) + +func SetCluster(name string, fcn func(ctx context.Context) cluster.Cluster) { + clusters[name] = fcn +} + +func GetCluster(name string, ctx context.Context) cluster.Cluster { + return clusters[name](ctx) +} diff --git a/common/extension/registry.go b/common/extension/registry.go index 99a7a6a04..5be15ec67 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -1,12 +1,13 @@ package extension import ( + "context" "github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/registry" ) var ( - registrys map[string]func(config *config.RegistryURL) (registry.Registry,error) + registrys map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error) ) /* @@ -14,15 +15,13 @@ it must excute first */ func init() { // init map - registrys = make(map[string]func(config *config.RegistryURL) (registry.Registry,error)) + registrys = make(map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) } -func SetRegistry(name string, v func(config *config.RegistryURL) (registry.Registry,error)) { +func SetRegistry(name string, v func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) { registrys[name] = v } - - -func GetRegistryExtension(name string, config *config.RegistryURL) (registry.Registry,error) { - return registrys[name](config) +func GetRegistryExtension(name string, ctx context.Context, config *config.RegistryURL) (registry.Registry, error) { + return registrys[name](ctx, config) } diff --git a/common/node.go b/common/node.go new file mode 100644 index 000000000..4b5403b81 --- /dev/null +++ b/common/node.go @@ -0,0 +1,9 @@ +package common + +import "github.com/dubbo/dubbo-go/config" + +type Node interface { + GetUrl() config.IURL + IsAvailable() bool + Destroy() +} diff --git a/config/config_url.go b/config/config_url.go index c0783cc45..a87ccd1a3 100644 --- a/config/config_url.go +++ b/config/config_url.go @@ -13,7 +13,6 @@ import ( jerrors "github.com/juju/errors" ) - type IURL interface { Key() string URLEqual(IURL) bool @@ -38,9 +37,9 @@ type URL struct { Weight int32 Methods string `yaml:"methods" json:"methods,omitempty"` - //both for registry & service & reference Version string `yaml:"version" json:"version,omitempty"` Group string `yaml:"group" json:"group,omitempty"` + Cluster string } func NewURL(urlString string) (*URL, error) { diff --git a/config/consumer_config.yml b/config/consumer_config.yml index e6b3df755..6c43b1808 100644 --- a/config/consumer_config.yml +++ b/config/consumer_config.yml @@ -45,3 +45,7 @@ references: - "shanghaizk" # protocol : "dubbo" interface : "com.ikurento.user.UserProvider" + cluster: "failover" + methods : + - name: "GetUser" + retries: 3 diff --git a/config/reference_config.go b/config/reference_config.go index 209e69dc2..31c087cd4 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -1,8 +1,8 @@ package config import ( + "context" log "github.com/AlexStocks/log4go" - "reflect" ) import "github.com/dubbo/dubbo-go/common/extension" @@ -10,17 +10,24 @@ import "github.com/dubbo/dubbo-go/common/extension" var refprotocol = extension.GetRefProtocol() type ReferenceConfig struct { + context context.Context Interface string `required:"true" yaml:"interface" json:"interface,omitempty"` Registries []referenceConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` + Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"` + Methods []method `yaml:"methods" json:"methods,omitempty"` URLs []URL `yaml:"-"` - Type reflect.Type } type referenceConfigRegistry struct { string } -func NewReferenceConfig() *ReferenceConfig { - return &ReferenceConfig{} +type method struct { + name string `yaml:"name" json:"name,omitempty"` + retries int `yaml:"retries" json:"retries,omitempty"` +} + +func NewReferenceConfig(ctx context.Context) *ReferenceConfig { + return &ReferenceConfig{context: ctx} } func (refconfig *ReferenceConfig) CreateProxy() { diff --git a/go.mod b/go.mod index 502b652e8..5d8c0aec0 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 // indirect github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec + github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index 38a72632c..9488c4831 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 h1:hNna6Fi0eP1f2sMBe/rJicDmaHmoXGe1Ta84FPYHLuE= +github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5/go.mod h1:f1SCnEOt6sc3fOJfPQDRDzHOtSXuTtnz0ImG9kPRDV0= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA= diff --git a/protocol/invocation.go b/protocol/invocation.go new file mode 100644 index 000000000..20d033a7b --- /dev/null +++ b/protocol/invocation.go @@ -0,0 +1,8 @@ +package protocol + +import "reflect" + +type Invocation interface { + MethodName() string + Parameters() []reflect.Value +} diff --git a/protocol/invoker.go b/protocol/invoker.go index 33ad2821a..c18042b53 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -1,10 +1,9 @@ package protocol -import "github.com/dubbo/dubbo-go/config" +import "github.com/dubbo/dubbo-go/common" // Extension - Invoker type Invoker interface { + common.Node Invoke() - GetURL() config.URL - Destroy() } diff --git a/registry/directory.go b/registry/directory.go index b3c8e358a..026025078 100644 --- a/registry/directory.go +++ b/registry/directory.go @@ -1,6 +1,7 @@ package registry import ( + "context" "sync" "time" ) @@ -15,21 +16,45 @@ import ( "github.com/dubbo/dubbo-go/config" ) +type Options struct { + serviceTTL time.Duration +} +type Option func(*Options) + +func WithServiceTTL(ttl time.Duration) Option { + return func(o *Options) { + o.serviceTTL = ttl + } +} + type RegistryDirectory struct { + directory.BaseDirectory cacheService *directory.ServiceArray listenerLock sync.Mutex serviceType string registry Registry + Options } -func NewRegistryDirectory(url config.RegistryURL, registry Registry) *RegistryDirectory { +func NewRegistryDirectory(ctx context.Context, url *config.RegistryURL, registry Registry, opts ...Option) *RegistryDirectory { + options := Options{ + //default 300s + serviceTTL: time.Duration(300e9), + } + for _, opt := range opts { + opt(&options) + } + return &RegistryDirectory{ - cacheService: directory.NewServiceArray([]config.URL{}), - serviceType: url.URL.Service, - registry: registry, + BaseDirectory: directory.NewBaseDirectory(ctx, url), + cacheService: directory.NewServiceArray(ctx, []config.URL{}), + serviceType: url.URL.Service, + registry: registry, + Options: options, } } +//subscibe from registry func (dir *RegistryDirectory) subscribe(url config.URL) { for { if dir.registry.IsClosed() { @@ -63,35 +88,37 @@ func (dir *RegistryDirectory) subscribe(url config.URL) { } } +//subscribe service from registry , and update the cacheServices func (dir *RegistryDirectory) update(res *ServiceEvent) { if res == nil { return } log.Debug("registry update, result{%s}", res) - registryKey := res.Service.Key() dir.listenerLock.Lock() defer dir.listenerLock.Unlock() - svcArr, ok := dir.cacheService[registryKey] - log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr) + log.Debug("update service name: %s!", res.Service) switch res.Action { case ServiceAdd: - if ok { - svcArr.add(res.Service, ivk.ServiceTTL) - } else { - ivk.cacheServiceMap[registryKey] = newServiceArray([]registry.ServiceURL{res.Service}) - } - case registry.ServiceDel: - if ok { - svcArr.del(res.Service, ivk.ServiceTTL) - if len(svcArr.arr) == 0 { - delete(ivk.cacheServiceMap, registryKey) - log.Warn("delete registry %s from registry map", registryKey) - } - } - log.Error("selector delete registryURL{%s}", res.Service) + dir.cacheService.Add(res.Service, dir.serviceTTL) + + case ServiceDel: + dir.cacheService.Del(res.Service, dir.serviceTTL) + + log.Error("selector delete service url{%s}", res.Service) } } + +func (dir *RegistryDirectory) List(){ + +} +func (dir *RegistryDirectory) IsAvailable() bool { + return true +} + +func (dir *RegistryDirectory) Destroy() { + dir.BaseDirectory.Destroy() +} diff --git a/registry/protocol.go b/registry/protocol.go index fc4fa512c..98a0af35a 100644 --- a/registry/protocol.go +++ b/registry/protocol.go @@ -1,8 +1,7 @@ package registry import ( - "github.com/juju/utils/registry" - "github.com/prometheus/common/log" + "context" "sync" "time" ) @@ -20,6 +19,7 @@ import ( const RegistryConnDelay = 3 type RegistryProtocol struct { + context context.Context // Registry Map<RegistryAddress, Registry> registies map[string]Registry registiesMutex sync.Mutex @@ -29,27 +29,38 @@ func init() { extension.SetRefProtocol(NewRegistryProtocol) } -func NewRegistryProtocol() protocol.Protocol { +func NewRegistryProtocol(ctx context.Context) protocol.Protocol { return &RegistryProtocol{ + context: ctx, registies: make(map[string]Registry), } } -func (protocol *RegistryProtocol) Refer(url config.IURL) (Registry, error) { +func (protocol *RegistryProtocol) Refer(url config.IURL) (protocol.Invoker, error) { var regUrl = url.(*config.RegistryURL) + var serviceUrl = regUrl.URL protocol.registiesMutex.Lock() + defer protocol.registiesMutex.Unlock() var reg Registry - if reg, ok := protocol.registies[url.Key()]; !ok { + var ok bool + if reg, ok = protocol.registies[url.Key()]; !ok { var err error - reg, err = extension.GetRegistryExtension(regUrl.Protocol, regUrl) - protocol.registies[url.Key()] = reg + reg, err = extension.GetRegistryExtension(regUrl.Protocol, protocol.context, regUrl) if err != nil { return nil, err + } else { + protocol.registies[url.Key()] = reg } } - protocol.subscribe(reg, regUrl.URL) + //new registry directory for store service url from registry + directory := NewRegistryDirectory(protocol.context, regUrl, reg) + go directory.subscribe(serviceUrl) + + //new cluster invoker + cluster := extension.GetCluster(serviceUrl.Cluster, protocol.context) + return cluster.Join(directory), nil } func (*RegistryProtocol) Export() { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 604d756c8..5efb9ee73 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -1,6 +1,7 @@ package zookeeper import ( + "context" "fmt" "github.com/dubbo/dubbo-go/common/extension" "github.com/dubbo/dubbo-go/config" @@ -46,6 +47,7 @@ func init() { ///////////////////////////////////// type ZkRegistry struct { + context context.Context *config.RegistryURL birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart @@ -62,13 +64,14 @@ type ZkRegistry struct { zkPath map[string]int // key = protocol://ip:port/interface } -func NewZkRegistry(url *config.RegistryURL) (registry.Registry, error) { +func NewZkRegistry(ctx context.Context, url *config.RegistryURL) (registry.Registry, error) { var ( err error r *ZkRegistry ) r = &ZkRegistry{ + context: ctx, RegistryURL: url, birth: time.Now().UnixNano(), done: make(chan struct{}), -- GitLab