diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 0a34c26a8006d72fa2886d1308ae04e854cdb625..697be614f89cd324d4691d5ea29b1e8a8a3c07c2 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -1,20 +1,17 @@ package directory import ( - "context" "github.com/dubbo/dubbo-go/config" ) type BaseDirectory struct { - context context.Context url *config.RegistryURL destroyed bool } -func NewBaseDirectory(ctx context.Context, url *config.RegistryURL) BaseDirectory { +func NewBaseDirectory(url *config.RegistryURL) BaseDirectory { return BaseDirectory{ - context: ctx, - url: url, + url: url, } } func (dir *BaseDirectory) GetUrl() config.IURL { @@ -24,7 +21,3 @@ func (dir *BaseDirectory) GetUrl() config.IURL { func (dir *BaseDirectory) Destroy() { dir.destroyed = false } - -func (dir *BaseDirectory) Context() context.Context { - return dir.context -} diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index f5a7a20f801f518270b6751fdb96a8e4603e184e..e2b73b319af5dd70eb2be28ec895d7c042935135 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -1,7 +1,6 @@ package directory import ( - "context" "github.com/dubbo/dubbo-go/protocol" ) @@ -10,9 +9,9 @@ type StaticDirectory struct { invokers []protocol.Invoker } -func NewStaticDirectory(context context.Context, invokers []protocol.Invoker) *StaticDirectory { +func NewStaticDirectory(invokers []protocol.Invoker) *StaticDirectory { return &StaticDirectory{ - BaseDirectory: NewBaseDirectory(context, nil), + BaseDirectory: NewBaseDirectory(nil), invokers: invokers, } } diff --git a/cluster/support/base_cluster_invoker.go b/cluster/support/base_cluster_invoker.go index 81b078c115b57ef0754a875d5fdf56d58676b155..7d432327e3a07f389283b0c11638d77ad8d15cc1 100644 --- a/cluster/support/base_cluster_invoker.go +++ b/cluster/support/base_cluster_invoker.go @@ -1,7 +1,6 @@ package cluster import ( - "context" "github.com/tevino/abool" ) @@ -11,15 +10,13 @@ import ( ) type baseClusterInvoker struct { - context context.Context directory cluster.Directory availablecheck bool destroyed *abool.AtomicBool } -func newBaseClusterInvoker(ctx context.Context, directory cluster.Directory) baseClusterInvoker { +func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { return baseClusterInvoker{ - context: ctx, directory: directory, availablecheck: false, destroyed: abool.NewBool(false), diff --git a/cluster/support/failover_cluster.go b/cluster/support/failover_cluster.go index e920051683da68d0fe215ba6d4b5eca2ea0b519c..585f5c4125831d96cdf8a147b0e386e4ac7a5fd9 100644 --- a/cluster/support/failover_cluster.go +++ b/cluster/support/failover_cluster.go @@ -17,12 +17,10 @@ func init() { extension.SetCluster(name, NewFailoverCluster) } -func NewFailoverCluster(ctx context.Context) cluster.Cluster { - return &FailoverCluster{ - context: ctx, - } +func NewFailoverCluster() cluster.Cluster { + return &FailoverCluster{} } func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker { - return NewFailoverClusterInvoker(cluster.context, directory) + return NewFailoverClusterInvoker( directory) } diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go index c12344792d0b630cbf00f0c8b0cffdfcd4fe7c65..c0c0238cb0184683cd64c023f8820d1ce983e6fb 100644 --- a/cluster/support/failover_cluster_invoker.go +++ b/cluster/support/failover_cluster_invoker.go @@ -1,7 +1,6 @@ package cluster import ( - "context" "github.com/dubbo/dubbo-go/cluster" "github.com/dubbo/dubbo-go/protocol" ) @@ -10,12 +9,14 @@ type failoverClusterInvoker struct { baseClusterInvoker } -func NewFailoverClusterInvoker(ctx context.Context, directory cluster.Directory) protocol.Invoker { +func NewFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { return &failoverClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(ctx, directory), + baseClusterInvoker: newBaseClusterInvoker(directory), } } -func (invoker *failoverClusterInvoker) Invoke() { - +func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + invokers[0].GetUrl() + return } diff --git a/common/extension/cluster.go b/common/extension/cluster.go index cc3f6999f2743abde0583584eddfa88a57c43019..6c2fdcebc07d5ff2cbb1a1ca508b596505b8d7f4 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -1,18 +1,17 @@ package extension import ( - "context" "github.com/dubbo/dubbo-go/cluster" ) var ( - clusters = make(map[string]func(ctx context.Context) cluster.Cluster) + clusters = make(map[string]func() cluster.Cluster) ) -func SetCluster(name string, fcn func(ctx context.Context) cluster.Cluster) { +func SetCluster(name string, fcn func() cluster.Cluster) { clusters[name] = fcn } -func GetCluster(name string, ctx context.Context) cluster.Cluster { - return clusters[name](ctx) +func GetCluster(name string) cluster.Cluster { + return clusters[name]() } diff --git a/common/extension/registry.go b/common/extension/registry.go index fbd80cea1f1c6d72230e21e991b7b0fcc83add55..4e3280ca50e92174ee616a6ba1411eb0875f13e6 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -1,13 +1,12 @@ package extension import ( - "context" "github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/registry" ) var ( - registrys map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error) + registrys map[string]func(config *config.RegistryURL) (registry.Registry, error) ) /* @@ -15,14 +14,14 @@ it must excute first */ func init() { // init map - registrys = make(map[string]func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) + registrys = make(map[string]func(config *config.RegistryURL) (registry.Registry, error)) } -func SetRegistry(name string, v func(ctx context.Context, config *config.RegistryURL) (registry.Registry, error)) { +func SetRegistry(name string, v func(config *config.RegistryURL) (registry.Registry, error)) { registrys[name] = v } -func GetRegistryExtension(name string, ctx context.Context, config *config.RegistryURL) (registry.Registry, error) { - return registrys[name](ctx, config) +func GetRegistryExtension(name string, config *config.RegistryURL) (registry.Registry, error) { + return registrys[name](config) } diff --git a/config/registry_url.go b/config/registry_url.go index 208991d2619b8e7dd2ae5a26aad1df006d5fb199..3f2f37481285e310935388ed0e1792a141e47981 100644 --- a/config/registry_url.go +++ b/config/registry_url.go @@ -58,7 +58,7 @@ type RegistryURL struct { Address string `yaml:"address" json:"address,omitempty"` } -func NewRegistryURL(urlString string) (*RegistryURL, error) { +func NewRegistryURL(context context.Context, urlString string) (*RegistryURL, error) { var ( err error @@ -108,7 +108,7 @@ func NewRegistryURL(urlString string) (*RegistryURL, error) { s.Timeout = time.Duration(timeout * 1e6) // timeout unit is millisecond } } - + s.ctx = context return s, nil } diff --git a/config/application_config.go b/config/support/application_config.go similarity index 96% rename from config/application_config.go rename to config/support/application_config.go index 81d255a345541e5cb414f23c41e93340e0ae010a..367bc20468c19f4d3e70eb83aa06ab60d7b6688e 100644 --- a/config/application_config.go +++ b/config/support/application_config.go @@ -1,4 +1,4 @@ -package config +package support type ApplicationConfig struct { Organization string `yaml:"organization" json:"organization,omitempty"` diff --git a/config/config_loader.go b/config/support/config_loader.go similarity index 94% rename from config/config_loader.go rename to config/support/config_loader.go index 54d5419aefcd1cab2a654d0a415fc5e9f2b02c5c..57931d7574ab5b347a3714f4bc11dc7591cf5b13 100644 --- a/config/config_loader.go +++ b/config/support/config_loader.go @@ -1,4 +1,4 @@ -package config +package support import ( "fmt" @@ -108,8 +108,8 @@ type ConsumerConfig struct { } type ReferenceConfigTmp struct { - Service string `required:"true" yaml:"service" json:"service,omitempty"` - Registries []referenceConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` + Service string `required:"true" yaml:"service" json:"service,omitempty"` + Registries []RegistryConfig `required:"true" yaml:"registries" json:"registries,omitempty"` URLs []map[string]string } diff --git a/config/consumer_config.yml b/config/support/consumer_config.yml similarity index 100% rename from config/consumer_config.yml rename to config/support/consumer_config.yml diff --git a/config/reference_config.go b/config/support/reference_config.go similarity index 80% rename from config/reference_config.go rename to config/support/reference_config.go index fb02fe7f3905feba43bee7d869a97ee5f273a47b..11428ae7c3fa2c670d2c9680c83044a6049a77ed 100644 --- a/config/reference_config.go +++ b/config/support/reference_config.go @@ -1,11 +1,17 @@ -package config +package support import ( "context" + "github.com/dubbo/dubbo-go/config" +) + +import ( log "github.com/AlexStocks/log4go" ) -import "github.com/dubbo/dubbo-go/common/extension" +import ( + "github.com/dubbo/dubbo-go/common/extension" +) var refprotocol = extension.GetProtocolExtension("registry") @@ -15,7 +21,7 @@ type ReferenceConfig struct { Registries []referenceConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"` Methods []method `yaml:"methods" json:"methods,omitempty"` - URLs []URL `yaml:"-"` + URLs []config.URL `yaml:"-"` } type referenceConfigRegistry struct { string @@ -43,12 +49,12 @@ func (refconfig *ReferenceConfig) CreateProxy() { } } -func (refconfig *ReferenceConfig) loadRegistries() []*RegistryURL { - var urls []*RegistryURL +func (refconfig *ReferenceConfig) loadRegistries() []*config.RegistryURL { + var urls []*config.RegistryURL for _, registry := range refconfig.Registries { for _, registryConf := range consumerConfig.Registries { if registry.string == registryConf.Id { - url, err := NewRegistryURL(registryConf.Address) + url, err := config.NewRegistryURL(refconfig.context, registryConf.Address) if err != nil { log.Error("The registry id:%s url is invalid ,and will skip the registry", registryConf.Id) } else { diff --git a/config/registry_config.go b/config/support/registry_config.go similarity index 70% rename from config/registry_config.go rename to config/support/registry_config.go index 5e0edc5de13515dec348ba711a3f9e59069bff36..7fbd2b4099f2a7fe981079f14dcc17bb7b08aebe 100644 --- a/config/registry_config.go +++ b/config/support/registry_config.go @@ -1,7 +1,9 @@ -package config +package support + +import "github.com/dubbo/dubbo-go/config" type RegistryConfig struct { Id string `required:"true" yaml:"id" json:"id,omitempty"` TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty"` // unit: second - RegistryURL + config.RegistryURL } diff --git a/config/service_config.go b/config/support/service_config.go similarity index 79% rename from config/service_config.go rename to config/support/service_config.go index 3e5febebed4e19aa6a7f43ef0e1cf8913751abc9..d76f97393b383446fa988149b7e28c07694b576e 100644 --- a/config/service_config.go +++ b/config/support/service_config.go @@ -1,9 +1,11 @@ -package config +package support + +import "github.com/dubbo/dubbo-go/config" type ServiceConfig struct { Service string `required:"true" yaml:"service" json:"service,omitempty"` - URLs []URL - rpcService RPCService + URLs []config.URL + rpcService config.RPCService } func NewDefaultProviderServiceConfig() *ServiceConfig { diff --git a/registry/directory.go b/registry/directory.go index 9e9fd9b936beb647812f86dfc4a4d8b0d9ea5f61..f9cfa12c06aafaf897d2ce38e5be14e6cfcde3a2 100644 --- a/registry/directory.go +++ b/registry/directory.go @@ -1,7 +1,6 @@ package registry import ( - "context" "sync" "time" ) @@ -40,7 +39,7 @@ type RegistryDirectory struct { Options } -func NewRegistryDirectory(ctx context.Context, url *config.RegistryURL, registry Registry, opts ...Option) *RegistryDirectory { +func NewRegistryDirectory(url *config.RegistryURL, registry Registry, opts ...Option) *RegistryDirectory { options := Options{ //default 300s serviceTTL: time.Duration(300e9), @@ -50,7 +49,7 @@ func NewRegistryDirectory(ctx context.Context, url *config.RegistryURL, registry } return &RegistryDirectory{ - BaseDirectory: directory.NewBaseDirectory(ctx, url), + BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: sync.Map{}, serviceType: url.URL.Service, @@ -153,8 +152,8 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap sync.Map) []protoco groupInvokersList = groupInvokersMap[""] } else { for _, invokers := range groupInvokersMap { - staticDir := directory.NewStaticDirectory(dir.Context(), invokers) - cluster := extension.GetCluster(dir.GetUrl().(*config.RegistryURL).URL.Cluster, dir.Context()) + staticDir := directory.NewStaticDirectory(invokers) + cluster := extension.GetCluster(dir.GetUrl().(*config.RegistryURL).URL.Cluster) groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) } } @@ -200,8 +199,13 @@ func (dir *RegistryDirectory) Destroy() { dir.BaseDirectory.Destroy() } +// configuration > reference config >service config // in this function we should merge the reference local url config into the service url from registry. -//for some reason(I have not finish the service module, so this function marked as TODO) +//TODO configuration merge, in the future , the configuration center's config should merge too. func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL { + //loadBalance strategy config + + //cluster strategy config + return serviceUrl } diff --git a/registry/protocol.go b/registry/protocol.go index 7f3744c8a75531e5d6d0fc86acb8d8b4fb7c7f09..957be807c565f4a50ccafbae44e53b98ae8cdb0d 100644 --- a/registry/protocol.go +++ b/registry/protocol.go @@ -43,7 +43,7 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker { if reg, ok = protocol.registies[url.Key()]; !ok { var err error - reg, err = extension.GetRegistryExtension(regUrl.Protocol, url.Context(), regUrl) + reg, err = extension.GetRegistryExtension(regUrl.Protocol, regUrl) if err != nil { log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) panic(err.Error()) @@ -52,11 +52,11 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker { } } //new registry directory for store service url from registry - directory := NewRegistryDirectory(url.Context(), regUrl, reg) + directory := NewRegistryDirectory(regUrl, reg) go directory.subscribe(serviceUrl) //new cluster invoker - cluster := extension.GetCluster(serviceUrl.Cluster, url.Context()) + cluster := extension.GetCluster(serviceUrl.Cluster) return cluster.Join(directory) } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 8ce8da96e5bfdd23b2c1b6965b2c29d4e4825639..8336a5352455211ae337a4b35ce702d34c18d672 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -1,6 +1,7 @@ package zookeeper import ( + "context" "fmt" "github.com/dubbo/dubbo-go/config" "path" @@ -112,7 +113,8 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co newNode = path.Join(zkPath, n) log.Info("add zkNode{%s}", newNode) - &serviceURL, err = config.NewURL(n) + //context.TODO + &serviceURL, err = config.NewURL(context.TODO(), n) if err != nil { log.Error("NewURL(%s) = error{%v}", n, jerrors.ErrorStack(err)) continue @@ -143,7 +145,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co oldNode = path.Join(zkPath, n) log.Warn("delete zkPath{%s}", oldNode) - &serviceURL, err = config.NewURL(n) + &serviceURL, err = config.NewURL(context.TODO(), n) if !conf.URLEqual(&serviceURL) { log.Warn("serviceURL{%s} has been deleted is not compatible with URL{%#v}", serviceURL, conf) continue @@ -255,7 +257,7 @@ func (l *zkEventListener) listenServiceEvent(conf config.URL) { for _, c := range children { - &serviceURL, err = config.NewURL(c) + &serviceURL, err = config.NewURL(context.TODO(), c) if err != nil { log.Error("NewURL(r{%s}) = error{%v}", c, err) continue diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 5efb9ee73bb29a0455d1f2f78ce4e4fb7017b86d..6aa3a1ca8e1a0fa1fb14158b87fb94ff27cc4c82 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -64,14 +64,13 @@ type ZkRegistry struct { zkPath map[string]int // key = protocol://ip:port/interface } -func NewZkRegistry(ctx context.Context, url *config.RegistryURL) (registry.Registry, error) { +func NewZkRegistry( url *config.RegistryURL) (registry.Registry, error) { var ( err error r *ZkRegistry ) r = &ZkRegistry{ - context: ctx, RegistryURL: url, birth: time.Now().UnixNano(), done: make(chan struct{}),