Skip to content
Snippets Groups Projects
Commit b513f201 authored by vito.he's avatar vito.he
Browse files

Ftr:2.6.0 registry & cluster

parent 5ac82469
No related branches found
No related tags found
No related merge requests found
Showing
with 270 additions and 54 deletions
package cluster
import "github.com/dubbo/dubbo-go/protocol"
type Cluster interface {
Join(Directory)protocol.Invoker
}
package cluster
import "github.com/dubbo/dubbo-go/common"
// Extension - Directory
type Directory interface {
common.Node
List()
}
package directory
import (
"context"
"github.com/dubbo/dubbo-go/config"
)
type BaseDirectory struct {
context context.Context
url *config.RegistryURL
destroyed bool
}
func NewBaseDirectory(ctx context.Context, url *config.RegistryURL) BaseDirectory {
return BaseDirectory{
context: ctx,
url: url,
}
}
func (dir *BaseDirectory) GetUrl() config.IURL {
return dir.url
}
func (dir *BaseDirectory) Destroy() {
dir.destroyed = false
}
package directory
import (
"context"
"fmt"
"strings"
......@@ -24,15 +25,17 @@ var (
)
type ServiceArray struct {
arr []config.URL
birth time.Time
idx int64
context context.Context
arr []config.URL
birth time.Time
idx int64
}
func NewServiceArray(arr []config.URL) *ServiceArray {
func NewServiceArray(ctx context.Context, arr []config.URL) *ServiceArray {
return &ServiceArray{
arr: arr,
birth: time.Now(),
context: ctx,
arr: arr,
birth: time.Now(),
}
}
......@@ -59,12 +62,12 @@ func (s *ServiceArray) String() string {
return builder.String()
}
func (s *ServiceArray) add(url config.URL, ttl time.Duration) {
func (s *ServiceArray) Add(url config.URL, ttl time.Duration) {
s.arr = append(s.arr, url)
s.birth = time.Now().Add(ttl)
}
func (s *ServiceArray) del(url config.URL, ttl time.Duration) {
func (s *ServiceArray) Del(url config.URL, ttl time.Duration) {
for i, svc := range s.arr {
if svc.PrimitiveURL == url.PrimitiveURL {
s.arr = append(s.arr[:i], s.arr[i+1:]...)
......
package cluster
import (
"context"
"github.com/tevino/abool"
)
import (
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/config"
)
type baseClusterInvoker struct {
context context.Context
directory cluster.Directory
availablecheck bool
destroyed *abool.AtomicBool
}
func newBaseClusterInvoker(ctx context.Context, directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
context: ctx,
directory: directory,
availablecheck: false,
destroyed: abool.NewBool(false),
}
}
func (invoker *baseClusterInvoker) GetUrl() config.IURL {
return invoker.directory.GetUrl()
}
func (invoker *baseClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.SetToIf(false, true) {
invoker.directory.Destroy()
}
}
func (invoker *baseClusterInvoker) IsAvailable() bool {
//TODO:不理解java版本中关于stikyInvoker的逻辑所以先不写
return invoker.directory.IsAvailable()
}
package cluster
import (
"context"
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/protocol"
)
type FailoverCluster struct {
context context.Context
}
const name = "failover"
func init(){
extension.SetCluster(name,NewFailoverCluster)
}
func NewFailoverCluster(ctx context.Context) cluster.Cluster {
return &FailoverCluster{
context: ctx,
}
}
func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewFailoverClusterInvoker(cluster.context, directory)
}
package cluster
import (
"context"
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/protocol"
)
type failoverClusterInvoker struct {
baseClusterInvoker
}
func NewFailoverClusterInvoker(ctx context.Context, directory cluster.Directory) protocol.Invoker {
return &failoverClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(ctx, directory),
}
}
func (invoker *failoverClusterInvoker) Invoke() {
}
package extension
import (
"context"
"github.com/dubbo/dubbo-go/cluster"
)
var (
clusters = make(map[string]func(ctx context.Context) cluster.Cluster)
)
func SetCluster(name string, fcn func(ctx context.Context) cluster.Cluster) {
clusters[name] = fcn
}
func GetCluster(name string, ctx context.Context) cluster.Cluster {
return clusters[name](ctx)
}
package extension
import (
"context"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/registry"
)
var (
registrys map[string]func(config *config.RegistryURL) (registry.Registry,error)
registrys map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)
)
/*
......@@ -14,15 +15,13 @@ it must excute first
*/
func init() {
// init map
registrys = make(map[string]func(config *config.RegistryURL) (registry.Registry,error))
registrys = make(map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error))
}
func SetRegistry(name string, v func(config *config.RegistryURL) (registry.Registry,error)) {
func SetRegistry(name string, v func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) {
registrys[name] = v
}
func GetRegistryExtension(name string, config *config.RegistryURL) (registry.Registry,error) {
return registrys[name](config)
func GetRegistryExtension(name string, ctx context.Context, config *config.RegistryURL) (registry.Registry, error) {
return registrys[name](ctx, config)
}
package common
import "github.com/dubbo/dubbo-go/config"
type Node interface {
GetUrl() config.IURL
IsAvailable() bool
Destroy()
}
......@@ -13,7 +13,6 @@ import (
jerrors "github.com/juju/errors"
)
type IURL interface {
Key() string
URLEqual(IURL) bool
......@@ -38,9 +37,9 @@ type URL struct {
Weight int32
Methods string `yaml:"methods" json:"methods,omitempty"`
//both for registry & service & reference
Version string `yaml:"version" json:"version,omitempty"`
Group string `yaml:"group" json:"group,omitempty"`
Cluster string
}
func NewURL(urlString string) (*URL, error) {
......
......@@ -45,3 +45,7 @@ references:
- "shanghaizk"
# protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
retries: 3
package config
import (
"context"
log "github.com/AlexStocks/log4go"
"reflect"
)
import "github.com/dubbo/dubbo-go/common/extension"
......@@ -10,17 +10,24 @@ import "github.com/dubbo/dubbo-go/common/extension"
var refprotocol = extension.GetRefProtocol()
type ReferenceConfig struct {
context context.Context
Interface string `required:"true" yaml:"interface" json:"interface,omitempty"`
Registries []referenceConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
Methods []method `yaml:"methods" json:"methods,omitempty"`
URLs []URL `yaml:"-"`
Type reflect.Type
}
type referenceConfigRegistry struct {
string
}
func NewReferenceConfig() *ReferenceConfig {
return &ReferenceConfig{}
type method struct {
name string `yaml:"name" json:"name,omitempty"`
retries int `yaml:"retries" json:"retries,omitempty"`
}
func NewReferenceConfig(ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{context: ctx}
}
func (refconfig *ReferenceConfig) CreateProxy() {
......
......@@ -12,6 +12,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/yaml.v2 v2.2.2
)
package protocol
import "reflect"
type Invocation interface {
MethodName() string
Parameters() []reflect.Value
}
package protocol
import "github.com/dubbo/dubbo-go/config"
import "github.com/dubbo/dubbo-go/common"
// Extension - Invoker
type Invoker interface {
common.Node
Invoke()
GetURL() config.URL
Destroy()
}
package registry
import (
"context"
"sync"
"time"
)
......@@ -15,21 +16,45 @@ import (
"github.com/dubbo/dubbo-go/config"
)
type Options struct {
serviceTTL time.Duration
}
type Option func(*Options)
func WithServiceTTL(ttl time.Duration) Option {
return func(o *Options) {
o.serviceTTL = ttl
}
}
type RegistryDirectory struct {
directory.BaseDirectory
cacheService *directory.ServiceArray
listenerLock sync.Mutex
serviceType string
registry Registry
Options
}
func NewRegistryDirectory(url config.RegistryURL, registry Registry) *RegistryDirectory {
func NewRegistryDirectory(ctx context.Context, url *config.RegistryURL, registry Registry, opts ...Option) *RegistryDirectory {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
}
for _, opt := range opts {
opt(&options)
}
return &RegistryDirectory{
cacheService: directory.NewServiceArray([]config.URL{}),
serviceType: url.URL.Service,
registry: registry,
BaseDirectory: directory.NewBaseDirectory(ctx, url),
cacheService: directory.NewServiceArray(ctx, []config.URL{}),
serviceType: url.URL.Service,
registry: registry,
Options: options,
}
}
//subscibe from registry
func (dir *RegistryDirectory) subscribe(url config.URL) {
for {
if dir.registry.IsClosed() {
......@@ -63,35 +88,37 @@ func (dir *RegistryDirectory) subscribe(url config.URL) {
}
}
//subscribe service from registry , and update the cacheServices
func (dir *RegistryDirectory) update(res *ServiceEvent) {
if res == nil {
return
}
log.Debug("registry update, result{%s}", res)
registryKey := res.Service.Key()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
svcArr, ok := dir.cacheService[registryKey]
log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr)
log.Debug("update service name: %s!", res.Service)
switch res.Action {
case ServiceAdd:
if ok {
svcArr.add(res.Service, ivk.ServiceTTL)
} else {
ivk.cacheServiceMap[registryKey] = newServiceArray([]registry.ServiceURL{res.Service})
}
case registry.ServiceDel:
if ok {
svcArr.del(res.Service, ivk.ServiceTTL)
if len(svcArr.arr) == 0 {
delete(ivk.cacheServiceMap, registryKey)
log.Warn("delete registry %s from registry map", registryKey)
}
}
log.Error("selector delete registryURL{%s}", res.Service)
dir.cacheService.Add(res.Service, dir.serviceTTL)
case ServiceDel:
dir.cacheService.Del(res.Service, dir.serviceTTL)
log.Error("selector delete service url{%s}", res.Service)
}
}
func (dir *RegistryDirectory) List(){
}
func (dir *RegistryDirectory) IsAvailable() bool {
return true
}
func (dir *RegistryDirectory) Destroy() {
dir.BaseDirectory.Destroy()
}
package registry
import (
"github.com/juju/utils/registry"
"github.com/prometheus/common/log"
"context"
"sync"
"time"
)
......@@ -20,6 +19,7 @@ import (
const RegistryConnDelay = 3
type RegistryProtocol struct {
context context.Context
// Registry Map<RegistryAddress, Registry>
registies map[string]Registry
registiesMutex sync.Mutex
......@@ -29,27 +29,38 @@ func init() {
extension.SetRefProtocol(NewRegistryProtocol)
}
func NewRegistryProtocol() protocol.Protocol {
func NewRegistryProtocol(ctx context.Context) protocol.Protocol {
return &RegistryProtocol{
context: ctx,
registies: make(map[string]Registry),
}
}
func (protocol *RegistryProtocol) Refer(url config.IURL) (Registry, error) {
func (protocol *RegistryProtocol) Refer(url config.IURL) (protocol.Invoker, error) {
var regUrl = url.(*config.RegistryURL)
var serviceUrl = regUrl.URL
protocol.registiesMutex.Lock()
defer protocol.registiesMutex.Unlock()
var reg Registry
if reg, ok := protocol.registies[url.Key()]; !ok {
var ok bool
if reg, ok = protocol.registies[url.Key()]; !ok {
var err error
reg, err = extension.GetRegistryExtension(regUrl.Protocol, regUrl)
protocol.registies[url.Key()] = reg
reg, err = extension.GetRegistryExtension(regUrl.Protocol, protocol.context, regUrl)
if err != nil {
return nil, err
} else {
protocol.registies[url.Key()] = reg
}
}
protocol.subscribe(reg, regUrl.URL)
//new registry directory for store service url from registry
directory := NewRegistryDirectory(protocol.context, regUrl, reg)
go directory.subscribe(serviceUrl)
//new cluster invoker
cluster := extension.GetCluster(serviceUrl.Cluster, protocol.context)
return cluster.Join(directory), nil
}
func (*RegistryProtocol) Export() {
......
package zookeeper
import (
"context"
"fmt"
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/config"
......@@ -46,6 +47,7 @@ func init() {
/////////////////////////////////////
type ZkRegistry struct {
context context.Context
*config.RegistryURL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
......@@ -62,13 +64,14 @@ type ZkRegistry struct {
zkPath map[string]int // key = protocol://ip:port/interface
}
func NewZkRegistry(url *config.RegistryURL) (registry.Registry, error) {
func NewZkRegistry(ctx context.Context, url *config.RegistryURL) (registry.Registry, error) {
var (
err error
r *ZkRegistry
)
r = &ZkRegistry{
context: ctx,
RegistryURL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment