From 3d3942358a3b558c7aecada98204fcbf75904cf5 Mon Sep 17 00:00:00 2001 From: "vito.he" <hxmhlt@163.com> Date: Wed, 22 May 2019 15:24:55 +0800 Subject: [PATCH] Add & Rft: add proxy_factory & refactor the extension --- .gitignore | 2 +- common/extension/cluster.go | 2 +- common/extension/filter.go | 9 ++-- common/extension/loadbalance.go | 2 +- common/extension/protocol.go | 10 ++-- common/extension/proxy_factory.go | 20 +++++++ common/extension/registry.go | 14 ++--- common/proxy/proxy_factory.go | 26 ++++++++++ common/proxy/proxy_factory/default.go | 52 +++++++++++++++++++ common/proxy/proxy_factory/default_test.go | 35 +++++++++++++ config/config_loader.go | 9 ++-- config/config_loader_test.go | 2 + config/reference_config.go | 8 ++- config/service_config.go | 6 ++- .../protocol_filter_wrapper.go | 6 +-- .../protocol_filter_wrapper_test.go | 4 +- registry/directory/directory.go | 2 +- registry/protocol/protocol.go | 4 +- 18 files changed, 167 insertions(+), 46 deletions(-) create mode 100644 common/extension/proxy_factory.go create mode 100644 common/proxy/proxy_factory.go create mode 100644 common/proxy/proxy_factory/default.go create mode 100644 common/proxy/proxy_factory/default_test.go diff --git a/.gitignore b/.gitignore index b8d1f6332..6e2013d1b 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ classes # Gopkg.lock -# vendor/ +vendor/ logs/ diff --git a/common/extension/cluster.go b/common/extension/cluster.go index 1e0253048..54b5ac59d 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -28,7 +28,7 @@ func SetCluster(name string, fcn func() cluster.Cluster) { func GetCluster(name string) cluster.Cluster { if clusters[name] == nil { - panic("cluster for " + name + " is not existing, you must import corresponding package.") + panic("cluster for " + name + " is not existing, make sure you have import the package.") } return clusters[name]() } diff --git a/common/extension/filter.go b/common/extension/filter.go index cc39c5fe7..323d7ea65 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -19,20 +19,17 @@ import ( ) var ( - filters map[string]func() filter.Filter + filters = make(map[string]func() filter.Filter) ) -func init() { - filters = make(map[string]func() filter.Filter) -} func SetFilter(name string, v func() filter.Filter) { filters[name] = v } -func GetFilterExtension(name string) filter.Filter { +func GetFilter(name string) filter.Filter { if filters[name] == nil { - panic("filter for " + name + " is not existing, you must import corresponding package.") + panic("filter for " + name + " is not existing, make sure you have import the package.") } return filters[name]() } diff --git a/common/extension/loadbalance.go b/common/extension/loadbalance.go index bc1139bdc..bb46c8283 100644 --- a/common/extension/loadbalance.go +++ b/common/extension/loadbalance.go @@ -26,7 +26,7 @@ func SetLoadbalance(name string, fcn func() cluster.LoadBalance) { func GetLoadbalance(name string) cluster.LoadBalance { if loadbalances[name] == nil { - panic("loadbalance for " + name + " is not existing, you must import corresponding package.") + panic("loadbalance for " + name + " is not existing, make sure you have import the package.") } return loadbalances[name]() } diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 791db0415..f1ddbdeef 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -19,20 +19,16 @@ import ( ) var ( - protocols map[string]func() protocol.Protocol -) - -func init() { protocols = make(map[string]func() protocol.Protocol) -} +) func SetProtocol(name string, v func() protocol.Protocol) { protocols[name] = v } -func GetProtocolExtension(name string) protocol.Protocol { +func GetProtocol(name string) protocol.Protocol { if protocols[name] == nil { - panic("protocol for " + name + " is not existing, you must import corresponding package.") + panic("protocol for " + name + " is not existing, make sure you have import the package.") } return protocols[name]() } diff --git a/common/extension/proxy_factory.go b/common/extension/proxy_factory.go new file mode 100644 index 000000000..8795ea8f9 --- /dev/null +++ b/common/extension/proxy_factory.go @@ -0,0 +1,20 @@ +package extension + +import "github.com/dubbo/go-for-apache-dubbo/common/proxy" + +var ( + proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) +) + +func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { + proxy_factories[name] = f +} +func GetProxyFactory(name string) proxy.ProxyFactory { + if name == "" { + name = "default" + } + if proxy_factories[name] == nil { + panic("proxy factory for " + name + " is not existing, make sure you have import the package.") + } + return proxy_factories[name]() +} diff --git a/common/extension/registry.go b/common/extension/registry.go index bfbefa5b8..d6cacd880 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -20,24 +20,16 @@ import ( ) var ( - registrys map[string]func(config *common.URL) (registry.Registry, error) -) - -/* -it must excute first -*/ -func init() { - // init map registrys = make(map[string]func(config *common.URL) (registry.Registry, error)) -} +) func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) { registrys[name] = v } -func GetRegistryExtension(name string, config *common.URL) (registry.Registry, error) { +func GetRegistry(name string, config *common.URL) (registry.Registry, error) { if registrys[name] == nil { - panic("registry for " + name + " is not existing, you must import corresponding package.") + panic("registry for " + name + " is not existing, make sure you have import the package.") } return registrys[name](config) diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go new file mode 100644 index 000000000..4d7fa52b7 --- /dev/null +++ b/common/proxy/proxy_factory.go @@ -0,0 +1,26 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +type ProxyFactory interface { + GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetInvoker(url common.URL) protocol.Invoker +} + +type Option func(ProxyFactory) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go new file mode 100644 index 000000000..54ba45392 --- /dev/null +++ b/common/proxy/proxy_factory/default.go @@ -0,0 +1,52 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy_factory + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/common/proxy" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func init() { + extension.SetProxyFactory("default", NewDefaultProxyFactory) +} + +type DefaultProxyFactory struct { + //delegate ProxyFactory +} + +//you can rewrite DefaultProxyFactory in extension and delegate the default proxy factory like below + +//func WithDelegate(delegateProxyFactory ProxyFactory) Option { +// return func(proxy ProxyFactory) { +// proxy.(*DefaultProxyFactory).delegate = delegateProxyFactory +// } +//} + +func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { + return &DefaultProxyFactory{} +} +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { + //create proxy + attachments := map[string]string{} + attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") + return proxy.NewProxy(invoker, nil, attachments) +} +func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { + //TODO:yincheng need to do the service invoker refactor + return protocol.NewBaseInvoker(url) +} diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go new file mode 100644 index 000000000..385dfee63 --- /dev/null +++ b/common/proxy/proxy_factory/default_test.go @@ -0,0 +1,35 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy_factory + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_GetProxy(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions("testservice") + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + assert.NotNil(t, proxy) +} + +func Test_GetInvoker(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions("testservice") + invoker := proxyFactory.GetInvoker(*url) + assert.True(t, invoker.IsAvailable()) +} diff --git a/config/config_loader.go b/config/config_loader.go index 511ccfbba..06d90208a 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -152,7 +152,8 @@ type ConsumerConfig struct { Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` RequestTimeout time.Duration - Check *bool `yaml:"check" json:"check,omitempty"` + ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` + Check *bool `yaml:"check" json:"check,omitempty"` // application ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` @@ -183,9 +184,9 @@ func GetConsumerConfig() ConsumerConfig { type ProviderConfig struct { // pprof - Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` - Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` - + Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` + Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` + ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` Services []ServiceConfig `yaml:"services" json:"services,omitempty"` diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 4496671bd..9464fd570 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -15,6 +15,7 @@ package config import ( + "github.com/dubbo/go-for-apache-dubbo/common/proxy/proxy_factory" "path/filepath" "testing" ) @@ -60,6 +61,7 @@ func TestLoad(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"} refConfigs, svcConfigs := Load() diff --git a/config/reference_config.go b/config/reference_config.go index ef161ee87..d5f782d26 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -79,21 +79,19 @@ func (refconfig *ReferenceConfig) Refer() { } if len(regUrls) == 1 { - refconfig.invoker = extension.GetProtocolExtension("registry").Refer(*regUrls[0]) + refconfig.invoker = extension.GetProtocol("registry").Refer(*regUrls[0]) } else { invokers := []protocol.Invoker{} for _, regUrl := range regUrls { - invokers = append(invokers, extension.GetProtocolExtension("registry").Refer(*regUrl)) + invokers = append(invokers, extension.GetProtocol("registry").Refer(*regUrl)) } cluster := extension.GetCluster("registryAware") refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } //create proxy - attachments := map[string]string{} - attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - refconfig.pxy = proxy.NewProxy(refconfig.invoker, nil, attachments) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker,url) } // @v is service provider implemented RPCService diff --git a/config/service_config.go b/config/service_config.go index 40f15e992..eff93a144 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -105,13 +105,15 @@ func (srvconfig *ServiceConfig) Export() error { for _, regUrl := range regUrls { regUrl.SubURL = url - invoker := protocol.NewBaseInvoker(*regUrl) + srvconfig.cacheMutex.Lock() if srvconfig.cacheProtocol == nil { log.Info("First load the registry protocol!") - srvconfig.cacheProtocol = extension.GetProtocolExtension("registry") + srvconfig.cacheProtocol = extension.GetProtocol("registry") } srvconfig.cacheMutex.Unlock() + + invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) exporter := srvconfig.cacheProtocol.Export(invoker) if exporter == nil { panic(errors.New("New exporter error")) diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 61d515ed3..275431613 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -40,7 +40,7 @@ type ProtocolFilterWrapper struct { func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter { if pfw.protocol == nil { - pfw.protocol = extension.GetProtocolExtension(invoker.GetUrl().Protocol) + pfw.protocol = extension.GetProtocol(invoker.GetUrl().Protocol) } invoker = buildInvokerChain(invoker, constant.SERVICE_FILTER_KEY) return pfw.protocol.Export(invoker) @@ -48,7 +48,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { if pfw.protocol == nil { - pfw.protocol = extension.GetProtocolExtension(url.Protocol) + pfw.protocol = extension.GetProtocol(url.Protocol) } return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) } @@ -69,7 +69,7 @@ func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { next := invoker // The order of filters is from left to right, so loading from right to left for i := len(filtNames) - 2; i >= 0; i-- { - filter := extension.GetFilterExtension(filtNames[i]) + filter := extension.GetFilter(filtNames[i]) fi := &FilterInvoker{next: next, invoker: invoker, filter: filter} next = fi } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index f3602cc32..49015cdc4 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -32,7 +32,7 @@ import ( ) func TestProtocolFilterWrapper_Export(t *testing.T) { - filtProto := extension.GetProtocolExtension(FILTER) + filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions("Service", @@ -44,7 +44,7 @@ func TestProtocolFilterWrapper_Export(t *testing.T) { } func TestProtocolFilterWrapper_Refer(t *testing.T) { - filtProto := extension.GetProtocolExtension(FILTER) + filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions("Service", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 8122e0155..9b143a19e 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -189,7 +189,7 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) { if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok { log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key()) - newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(url) if newInvoker != nil { dir.cacheInvokersMap.Store(url.Key(), newInvoker) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 7c7de267f..e91449097 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -54,7 +54,7 @@ func newRegistryProtocol() *registryProtocol { } } func getRegistry(regUrl *common.URL) registry.Registry { - reg, err := extension.GetRegistryExtension(regUrl.Protocol, regUrl) + reg, err := extension.GetRegistry(regUrl.Protocol, regUrl) if err != nil { log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) panic(err.Error()) @@ -124,7 +124,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte log.Info("The exporter has been cached, and will return cached exporter!") } else { wrappedInvoker := newWrappedInvoker(invoker, providerUrl) - cachedExporter = extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker) + cachedExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(wrappedInvoker) proto.bounds.Store(key, cachedExporter) log.Info("The exporter has not been cached, and will return a new exporter!") } -- GitLab