diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..8e9da20763347e1500b454e6e3a670cfdaddf4a9 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,7 @@ +package cluster + +import "github.com/dubbo/dubbo-go/protocol" + +type Cluster interface { + Join(Directory)protocol.Invoker +} diff --git a/cluster/directory.go b/cluster/directory.go index d8e3be40d5948af5474da4736d062af6d73c2c5a..3924971f58b55171d5573e6ab2f8b16be3c23fa7 100644 --- a/cluster/directory.go +++ b/cluster/directory.go @@ -1,6 +1,9 @@ package cluster +import "github.com/dubbo/dubbo-go/common" + // Extension - Directory type Directory interface { + common.Node List() } diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go new file mode 100644 index 0000000000000000000000000000000000000000..ccb97eabca2d0095555bae63e6f5e5f9ace64f9c --- /dev/null +++ b/cluster/directory/base_directory.go @@ -0,0 +1,26 @@ +package directory + +import ( + "context" + "github.com/dubbo/dubbo-go/config" +) + +type BaseDirectory struct { + context context.Context + url *config.RegistryURL + destroyed bool +} + +func NewBaseDirectory(ctx context.Context, url *config.RegistryURL) BaseDirectory { + return BaseDirectory{ + context: ctx, + url: url, + } +} +func (dir *BaseDirectory) GetUrl() config.IURL { + return dir.url +} + +func (dir *BaseDirectory) Destroy() { + dir.destroyed = false +} diff --git a/cluster/directory/service_array.go b/cluster/directory/service_array.go index f37966178c179995b3516559f1cd5ab9c04c89ea..9f1661442f771d46e247e32f6353a2e79105a6b2 100644 --- a/cluster/directory/service_array.go +++ b/cluster/directory/service_array.go @@ -1,6 +1,7 @@ package directory import ( + "context" "fmt" "strings" @@ -24,15 +25,17 @@ var ( ) type ServiceArray struct { - arr []config.URL - birth time.Time - idx int64 + context context.Context + arr []config.URL + birth time.Time + idx int64 } -func NewServiceArray(arr []config.URL) *ServiceArray { +func NewServiceArray(ctx context.Context, arr []config.URL) *ServiceArray { return &ServiceArray{ - arr: arr, - birth: time.Now(), + context: ctx, + arr: arr, + birth: time.Now(), } } @@ -59,12 +62,12 @@ func (s *ServiceArray) String() string { return builder.String() } -func (s *ServiceArray) add(url config.URL, ttl time.Duration) { +func (s *ServiceArray) Add(url config.URL, ttl time.Duration) { s.arr = append(s.arr, url) s.birth = time.Now().Add(ttl) } -func (s *ServiceArray) del(url config.URL, ttl time.Duration) { +func (s *ServiceArray) Del(url config.URL, ttl time.Duration) { for i, svc := range s.arr { if svc.PrimitiveURL == url.PrimitiveURL { s.arr = append(s.arr[:i], s.arr[i+1:]...) diff --git a/cluster/support/base_cluster_invoker.go b/cluster/support/base_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..81b078c115b57ef0754a875d5fdf56d58676b155 --- /dev/null +++ b/cluster/support/base_cluster_invoker.go @@ -0,0 +1,42 @@ +package cluster + +import ( + "context" + "github.com/tevino/abool" +) + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/config" +) + +type baseClusterInvoker struct { + context context.Context + directory cluster.Directory + availablecheck bool + destroyed *abool.AtomicBool +} + +func newBaseClusterInvoker(ctx context.Context, directory cluster.Directory) baseClusterInvoker { + return baseClusterInvoker{ + context: ctx, + directory: directory, + availablecheck: false, + destroyed: abool.NewBool(false), + } +} +func (invoker *baseClusterInvoker) GetUrl() config.IURL { + return invoker.directory.GetUrl() +} + +func (invoker *baseClusterInvoker) Destroy() { + //this is must atom operation + if invoker.destroyed.SetToIf(false, true) { + invoker.directory.Destroy() + } +} + +func (invoker *baseClusterInvoker) IsAvailable() bool { + //TODO:不理解java版本中关于stikyInvoker的逻辑所以先不写 + return invoker.directory.IsAvailable() +} diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..1f2cfbc59add599284aa07afa475e82333ce2558 --- /dev/null +++ b/cluster/support/failover_cluster.go @@ -0,0 +1,27 @@ +package cluster + +import ( + "context" + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/protocol" +) + +type FailoverCluster struct { + context context.Context +} +const name = "failover" + +func init(){ + extension.SetCluster(name,NewFailoverCluster) +} + +func NewFailoverCluster(ctx context.Context) cluster.Cluster { + return &FailoverCluster{ + context: ctx, + } +} + +func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { + return NewFailoverClusterInvoker(cluster.context, directory) +} diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..c12344792d0b630cbf00f0c8b0cffdfcd4fe7c65 --- /dev/null +++ b/cluster/support/failover_cluster_invoker.go @@ -0,0 +1,21 @@ +package cluster + +import ( + "context" + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/protocol" +) + +type failoverClusterInvoker struct { + baseClusterInvoker +} + +func NewFailoverClusterInvoker(ctx context.Context, directory cluster.Directory) protocol.Invoker { + return &failoverClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(ctx, directory), + } +} + +func (invoker *failoverClusterInvoker) Invoke() { + +} diff --git a/common/extension/cluster.go b/common/extension/cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..cc3f6999f2743abde0583584eddfa88a57c43019 --- /dev/null +++ b/common/extension/cluster.go @@ -0,0 +1,18 @@ +package extension + +import ( + "context" + "github.com/dubbo/dubbo-go/cluster" +) + +var ( + clusters = make(map[string]func(ctx context.Context) cluster.Cluster) +) + +func SetCluster(name string, fcn func(ctx context.Context) cluster.Cluster) { + clusters[name] = fcn +} + +func GetCluster(name string, ctx context.Context) cluster.Cluster { + return clusters[name](ctx) +} diff --git a/common/extension/registry.go b/common/extension/registry.go index 99a7a6a0404f5787a7c07da5a6967c19ba1c2c0a..5be15ec67d2c8ff9804948b3a6f12a944d3220e0 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -1,12 +1,13 @@ package extension import ( + "context" "github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/registry" ) var ( - registrys map[string]func(config *config.RegistryURL) (registry.Registry,error) + registrys map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error) ) /* @@ -14,15 +15,13 @@ it must excute first */ func init() { // init map - registrys = make(map[string]func(config *config.RegistryURL) (registry.Registry,error)) + registrys = make(map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) } -func SetRegistry(name string, v func(config *config.RegistryURL) (registry.Registry,error)) { +func SetRegistry(name string, v func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) { registrys[name] = v } - - -func GetRegistryExtension(name string, config *config.RegistryURL) (registry.Registry,error) { - return registrys[name](config) +func GetRegistryExtension(name string, ctx context.Context, config *config.RegistryURL) (registry.Registry, error) { + return registrys[name](ctx, config) } diff --git a/common/node.go b/common/node.go new file mode 100644 index 0000000000000000000000000000000000000000..4b5403b81710e0257226055d3f3ef3d72e69a889 --- /dev/null +++ b/common/node.go @@ -0,0 +1,9 @@ +package common + +import "github.com/dubbo/dubbo-go/config" + +type Node interface { + GetUrl() config.IURL + IsAvailable() bool + Destroy() +} diff --git a/config/config_url.go b/config/config_url.go index c0783cc459c76d1903d71dffbaef60b0144f2501..a87ccd1a362182a8eba046d0f68c4f3556c3b259 100644 --- a/config/config_url.go +++ b/config/config_url.go @@ -13,7 +13,6 @@ import ( jerrors "github.com/juju/errors" ) - type IURL interface { Key() string URLEqual(IURL) bool @@ -38,9 +37,9 @@ type URL struct { Weight int32 Methods string `yaml:"methods" json:"methods,omitempty"` - //both for registry & service & reference Version string `yaml:"version" json:"version,omitempty"` Group string `yaml:"group" json:"group,omitempty"` + Cluster string } func NewURL(urlString string) (*URL, error) { diff --git a/config/consumer_config.yml b/config/consumer_config.yml index e6b3df7559f5700476918802a6d0fd9ee3e489dc..6c43b180821aa4a2f07a96c26c4c75846c16b9cb 100644 --- a/config/consumer_config.yml +++ b/config/consumer_config.yml @@ -45,3 +45,7 @@ references: - "shanghaizk" # protocol : "dubbo" interface : "com.ikurento.user.UserProvider" + cluster: "failover" + methods : + - name: "GetUser" + retries: 3 diff --git a/config/reference_config.go b/config/reference_config.go index 209e69dc28640c6f8a933b061f2ed743c21d957c..31c087cd4a92357f3214f069bc8656a777f1beff 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -1,8 +1,8 @@ package config import ( + "context" log "github.com/AlexStocks/log4go" - "reflect" ) import "github.com/dubbo/dubbo-go/common/extension" @@ -10,17 +10,24 @@ import "github.com/dubbo/dubbo-go/common/extension" var refprotocol = extension.GetRefProtocol() type ReferenceConfig struct { + context context.Context Interface string `required:"true" yaml:"interface" json:"interface,omitempty"` Registries []referenceConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` + Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"` + Methods []method `yaml:"methods" json:"methods,omitempty"` URLs []URL `yaml:"-"` - Type reflect.Type } type referenceConfigRegistry struct { string } -func NewReferenceConfig() *ReferenceConfig { - return &ReferenceConfig{} +type method struct { + name string `yaml:"name" json:"name,omitempty"` + retries int `yaml:"retries" json:"retries,omitempty"` +} + +func NewReferenceConfig(ctx context.Context) *ReferenceConfig { + return &ReferenceConfig{context: ctx} } func (refconfig *ReferenceConfig) CreateProxy() { diff --git a/go.mod b/go.mod index 502b652e8fefc45864b2354c34eb6e5761c0a7c6..5d8c0aec0c6a7dc54d260299617757ecd4fe2a5b 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 // indirect github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec + github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index 38a72632c75cccb81f2ce9397893ca944b8279a5..9488c4831d16511650e4838f786a59692bb65fc0 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 h1:hNna6Fi0eP1f2sMBe/rJicDmaHmoXGe1Ta84FPYHLuE= +github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5/go.mod h1:f1SCnEOt6sc3fOJfPQDRDzHOtSXuTtnz0ImG9kPRDV0= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA= diff --git a/protocol/invocation.go b/protocol/invocation.go new file mode 100644 index 0000000000000000000000000000000000000000..20d033a7bf9152f2df723efda9883d9244eca5d6 --- /dev/null +++ b/protocol/invocation.go @@ -0,0 +1,8 @@ +package protocol + +import "reflect" + +type Invocation interface { + MethodName() string + Parameters() []reflect.Value +} diff --git a/protocol/invoker.go b/protocol/invoker.go index 33ad2821a0cfd18fe17667d1a93880d244f24ae1..c18042b53c717a8a416f618fb9bffc5325b2cd16 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -1,10 +1,9 @@ package protocol -import "github.com/dubbo/dubbo-go/config" +import "github.com/dubbo/dubbo-go/common" // Extension - Invoker type Invoker interface { + common.Node Invoke() - GetURL() config.URL - Destroy() } diff --git a/registry/directory.go b/registry/directory.go index b3c8e358a61789a2e1bc5026c3748aa5371b8a78..026025078bdfe303c50d933cbf8479a2cc4d7245 100644 --- a/registry/directory.go +++ b/registry/directory.go @@ -1,6 +1,7 @@ package registry import ( + "context" "sync" "time" ) @@ -15,21 +16,45 @@ import ( "github.com/dubbo/dubbo-go/config" ) +type Options struct { + serviceTTL time.Duration +} +type Option func(*Options) + +func WithServiceTTL(ttl time.Duration) Option { + return func(o *Options) { + o.serviceTTL = ttl + } +} + type RegistryDirectory struct { + directory.BaseDirectory cacheService *directory.ServiceArray listenerLock sync.Mutex serviceType string registry Registry + Options } -func NewRegistryDirectory(url config.RegistryURL, registry Registry) *RegistryDirectory { +func NewRegistryDirectory(ctx context.Context, url *config.RegistryURL, registry Registry, opts ...Option) *RegistryDirectory { + options := Options{ + //default 300s + serviceTTL: time.Duration(300e9), + } + for _, opt := range opts { + opt(&options) + } + return &RegistryDirectory{ - cacheService: directory.NewServiceArray([]config.URL{}), - serviceType: url.URL.Service, - registry: registry, + BaseDirectory: directory.NewBaseDirectory(ctx, url), + cacheService: directory.NewServiceArray(ctx, []config.URL{}), + serviceType: url.URL.Service, + registry: registry, + Options: options, } } +//subscibe from registry func (dir *RegistryDirectory) subscribe(url config.URL) { for { if dir.registry.IsClosed() { @@ -63,35 +88,37 @@ func (dir *RegistryDirectory) subscribe(url config.URL) { } } +//subscribe service from registry , and update the cacheServices func (dir *RegistryDirectory) update(res *ServiceEvent) { if res == nil { return } log.Debug("registry update, result{%s}", res) - registryKey := res.Service.Key() dir.listenerLock.Lock() defer dir.listenerLock.Unlock() - svcArr, ok := dir.cacheService[registryKey] - log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr) + log.Debug("update service name: %s!", res.Service) switch res.Action { case ServiceAdd: - if ok { - svcArr.add(res.Service, ivk.ServiceTTL) - } else { - ivk.cacheServiceMap[registryKey] = newServiceArray([]registry.ServiceURL{res.Service}) - } - case registry.ServiceDel: - if ok { - svcArr.del(res.Service, ivk.ServiceTTL) - if len(svcArr.arr) == 0 { - delete(ivk.cacheServiceMap, registryKey) - log.Warn("delete registry %s from registry map", registryKey) - } - } - log.Error("selector delete registryURL{%s}", res.Service) + dir.cacheService.Add(res.Service, dir.serviceTTL) + + case ServiceDel: + dir.cacheService.Del(res.Service, dir.serviceTTL) + + log.Error("selector delete service url{%s}", res.Service) } } + +func (dir *RegistryDirectory) List(){ + +} +func (dir *RegistryDirectory) IsAvailable() bool { + return true +} + +func (dir *RegistryDirectory) Destroy() { + dir.BaseDirectory.Destroy() +} diff --git a/registry/protocol.go b/registry/protocol.go index fc4fa512c18a231a20a54086750b895bc14ce450..98a0af35a5dc10b49fba2fc01d52a7ed0285f461 100644 --- a/registry/protocol.go +++ b/registry/protocol.go @@ -1,8 +1,7 @@ package registry import ( - "github.com/juju/utils/registry" - "github.com/prometheus/common/log" + "context" "sync" "time" ) @@ -20,6 +19,7 @@ import ( const RegistryConnDelay = 3 type RegistryProtocol struct { + context context.Context // Registry Map<RegistryAddress, Registry> registies map[string]Registry registiesMutex sync.Mutex @@ -29,27 +29,38 @@ func init() { extension.SetRefProtocol(NewRegistryProtocol) } -func NewRegistryProtocol() protocol.Protocol { +func NewRegistryProtocol(ctx context.Context) protocol.Protocol { return &RegistryProtocol{ + context: ctx, registies: make(map[string]Registry), } } -func (protocol *RegistryProtocol) Refer(url config.IURL) (Registry, error) { +func (protocol *RegistryProtocol) Refer(url config.IURL) (protocol.Invoker, error) { var regUrl = url.(*config.RegistryURL) + var serviceUrl = regUrl.URL protocol.registiesMutex.Lock() + defer protocol.registiesMutex.Unlock() var reg Registry - if reg, ok := protocol.registies[url.Key()]; !ok { + var ok bool + if reg, ok = protocol.registies[url.Key()]; !ok { var err error - reg, err = extension.GetRegistryExtension(regUrl.Protocol, regUrl) - protocol.registies[url.Key()] = reg + reg, err = extension.GetRegistryExtension(regUrl.Protocol, protocol.context, regUrl) if err != nil { return nil, err + } else { + protocol.registies[url.Key()] = reg } } - protocol.subscribe(reg, regUrl.URL) + //new registry directory for store service url from registry + directory := NewRegistryDirectory(protocol.context, regUrl, reg) + go directory.subscribe(serviceUrl) + + //new cluster invoker + cluster := extension.GetCluster(serviceUrl.Cluster, protocol.context) + return cluster.Join(directory), nil } func (*RegistryProtocol) Export() { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 604d756c8b60544ce7eb78d690fae1a5fcc8bf6d..5efb9ee73bb29a0455d1f2f78ce4e4fb7017b86d 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -1,6 +1,7 @@ package zookeeper import ( + "context" "fmt" "github.com/dubbo/dubbo-go/common/extension" "github.com/dubbo/dubbo-go/config" @@ -46,6 +47,7 @@ func init() { ///////////////////////////////////// type ZkRegistry struct { + context context.Context *config.RegistryURL birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart @@ -62,13 +64,14 @@ type ZkRegistry struct { zkPath map[string]int // key = protocol://ip:port/interface } -func NewZkRegistry(url *config.RegistryURL) (registry.Registry, error) { +func NewZkRegistry(ctx context.Context, url *config.RegistryURL) (registry.Registry, error) { var ( err error r *ZkRegistry ) r = &ZkRegistry{ + context: ctx, RegistryURL: url, birth: time.Now().UnixNano(), done: make(chan struct{}),