diff --git a/.gitignore b/.gitignore index b8d1f6332ba31faa57814d3e62f869fd42441e8a..6e2013d1b448542ad653bb0febf98a6277f5d864 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ classes # Gopkg.lock -# vendor/ +vendor/ logs/ diff --git a/common/extension/cluster.go b/common/extension/cluster.go index 1e02530482b7d655cbad580a8d9ea2ec77833dba..54b5ac59d31950c7ac42d81856f11c28f4716a19 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -28,7 +28,7 @@ func SetCluster(name string, fcn func() cluster.Cluster) { func GetCluster(name string) cluster.Cluster { if clusters[name] == nil { - panic("cluster for " + name + " is not existing, you must import corresponding package.") + panic("cluster for " + name + " is not existing, make sure you have import the package.") } return clusters[name]() } diff --git a/common/extension/filter.go b/common/extension/filter.go index cc39c5fe7e1e4b2519351cd6116044439ca5fca7..323d7ea651c9466b411d75550c2151325a015e6e 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -19,20 +19,17 @@ import ( ) var ( - filters map[string]func() filter.Filter + filters = make(map[string]func() filter.Filter) ) -func init() { - filters = make(map[string]func() filter.Filter) -} func SetFilter(name string, v func() filter.Filter) { filters[name] = v } -func GetFilterExtension(name string) filter.Filter { +func GetFilter(name string) filter.Filter { if filters[name] == nil { - panic("filter for " + name + " is not existing, you must import corresponding package.") + panic("filter for " + name + " is not existing, make sure you have import the package.") } return filters[name]() } diff --git a/common/extension/loadbalance.go b/common/extension/loadbalance.go index bc1139bdcb18558587f6a5531a192f2a287f365c..bb46c8283386351bfb8f789bedabd0fbd5e7b43d 100644 --- a/common/extension/loadbalance.go +++ b/common/extension/loadbalance.go @@ -26,7 +26,7 @@ func SetLoadbalance(name string, fcn func() cluster.LoadBalance) { func GetLoadbalance(name string) cluster.LoadBalance { if loadbalances[name] == nil { - panic("loadbalance for " + name + " is not existing, you must import corresponding package.") + panic("loadbalance for " + name + " is not existing, make sure you have import the package.") } return loadbalances[name]() } diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 791db0415f92d12acc92ef7bedcadca563362afe..f1ddbdeef8bb3de0207befaf3ec7063571723513 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -19,20 +19,16 @@ import ( ) var ( - protocols map[string]func() protocol.Protocol -) - -func init() { protocols = make(map[string]func() protocol.Protocol) -} +) func SetProtocol(name string, v func() protocol.Protocol) { protocols[name] = v } -func GetProtocolExtension(name string) protocol.Protocol { +func GetProtocol(name string) protocol.Protocol { if protocols[name] == nil { - panic("protocol for " + name + " is not existing, you must import corresponding package.") + panic("protocol for " + name + " is not existing, make sure you have import the package.") } return protocols[name]() } diff --git a/common/extension/proxy_factory.go b/common/extension/proxy_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..8795ea8f975237c4d263053fb67052d5b98cff84 --- /dev/null +++ b/common/extension/proxy_factory.go @@ -0,0 +1,20 @@ +package extension + +import "github.com/dubbo/go-for-apache-dubbo/common/proxy" + +var ( + proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) +) + +func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { + proxy_factories[name] = f +} +func GetProxyFactory(name string) proxy.ProxyFactory { + if name == "" { + name = "default" + } + if proxy_factories[name] == nil { + panic("proxy factory for " + name + " is not existing, make sure you have import the package.") + } + return proxy_factories[name]() +} diff --git a/common/extension/registry.go b/common/extension/registry.go index bfbefa5b8bc29021782c86551a03d69835e8d2b6..d6cacd88028bf96697e11525098454d741784351 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -20,24 +20,16 @@ import ( ) var ( - registrys map[string]func(config *common.URL) (registry.Registry, error) -) - -/* -it must excute first -*/ -func init() { - // init map registrys = make(map[string]func(config *common.URL) (registry.Registry, error)) -} +) func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) { registrys[name] = v } -func GetRegistryExtension(name string, config *common.URL) (registry.Registry, error) { +func GetRegistry(name string, config *common.URL) (registry.Registry, error) { if registrys[name] == nil { - panic("registry for " + name + " is not existing, you must import corresponding package.") + panic("registry for " + name + " is not existing, make sure you have import the package.") } return registrys[name](config) diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..4d7fa52b7451b0c014d2ffdc3f0383acb6c777d4 --- /dev/null +++ b/common/proxy/proxy_factory.go @@ -0,0 +1,26 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +type ProxyFactory interface { + GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetInvoker(url common.URL) protocol.Invoker +} + +type Option func(ProxyFactory) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go new file mode 100644 index 0000000000000000000000000000000000000000..54ba45392bea7bcbb578178845ca781dbffe3cb5 --- /dev/null +++ b/common/proxy/proxy_factory/default.go @@ -0,0 +1,52 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy_factory + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/common/proxy" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func init() { + extension.SetProxyFactory("default", NewDefaultProxyFactory) +} + +type DefaultProxyFactory struct { + //delegate ProxyFactory +} + +//you can rewrite DefaultProxyFactory in extension and delegate the default proxy factory like below + +//func WithDelegate(delegateProxyFactory ProxyFactory) Option { +// return func(proxy ProxyFactory) { +// proxy.(*DefaultProxyFactory).delegate = delegateProxyFactory +// } +//} + +func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { + return &DefaultProxyFactory{} +} +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { + //create proxy + attachments := map[string]string{} + attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") + return proxy.NewProxy(invoker, nil, attachments) +} +func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { + //TODO:yincheng need to do the service invoker refactor + return protocol.NewBaseInvoker(url) +} diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go new file mode 100644 index 0000000000000000000000000000000000000000..385dfee636a5422ca81deee39b1195201c6edd3c --- /dev/null +++ b/common/proxy/proxy_factory/default_test.go @@ -0,0 +1,35 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy_factory + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_GetProxy(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions("testservice") + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + assert.NotNil(t, proxy) +} + +func Test_GetInvoker(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions("testservice") + invoker := proxyFactory.GetInvoker(*url) + assert.True(t, invoker.IsAvailable()) +} diff --git a/config/config_loader.go b/config/config_loader.go index 511ccfbbabb5be56b28bb77fc1d670e44165e462..06d90208ab92b4f084490c7f0eb9102ea754e8d8 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -152,7 +152,8 @@ type ConsumerConfig struct { Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` RequestTimeout time.Duration - Check *bool `yaml:"check" json:"check,omitempty"` + ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` + Check *bool `yaml:"check" json:"check,omitempty"` // application ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` @@ -183,9 +184,9 @@ func GetConsumerConfig() ConsumerConfig { type ProviderConfig struct { // pprof - Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` - Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` - + Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` + Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` + ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` Services []ServiceConfig `yaml:"services" json:"services,omitempty"` diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 4496671bd5e7d803402c3e954491255e1be337c1..9464fd570e1651f75d7d8df8ae4a3250f049b168 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -15,6 +15,7 @@ package config import ( + "github.com/dubbo/go-for-apache-dubbo/common/proxy/proxy_factory" "path/filepath" "testing" ) @@ -60,6 +61,7 @@ func TestLoad(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"} refConfigs, svcConfigs := Load() diff --git a/config/reference_config.go b/config/reference_config.go index ef161ee8788b15e7d63f0a1e142a52308a6aa04d..d5f782d260ea7e0bae788707caca877a56af30e4 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -79,21 +79,19 @@ func (refconfig *ReferenceConfig) Refer() { } if len(regUrls) == 1 { - refconfig.invoker = extension.GetProtocolExtension("registry").Refer(*regUrls[0]) + refconfig.invoker = extension.GetProtocol("registry").Refer(*regUrls[0]) } else { invokers := []protocol.Invoker{} for _, regUrl := range regUrls { - invokers = append(invokers, extension.GetProtocolExtension("registry").Refer(*regUrl)) + invokers = append(invokers, extension.GetProtocol("registry").Refer(*regUrl)) } cluster := extension.GetCluster("registryAware") refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } //create proxy - attachments := map[string]string{} - attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - refconfig.pxy = proxy.NewProxy(refconfig.invoker, nil, attachments) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker,url) } // @v is service provider implemented RPCService diff --git a/config/service_config.go b/config/service_config.go index 40f15e992cf44e01e3868b203776872a76286c29..eff93a1446d0fd246b7fe7787492982fcb0fa940 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -105,13 +105,15 @@ func (srvconfig *ServiceConfig) Export() error { for _, regUrl := range regUrls { regUrl.SubURL = url - invoker := protocol.NewBaseInvoker(*regUrl) + srvconfig.cacheMutex.Lock() if srvconfig.cacheProtocol == nil { log.Info("First load the registry protocol!") - srvconfig.cacheProtocol = extension.GetProtocolExtension("registry") + srvconfig.cacheProtocol = extension.GetProtocol("registry") } srvconfig.cacheMutex.Unlock() + + invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) exporter := srvconfig.cacheProtocol.Export(invoker) if exporter == nil { panic(errors.New("New exporter error")) diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 61d515ed39626c56e570b68a92845f00c9b002a1..275431613aca3004a3beeb8408dcd778edca4ed1 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -40,7 +40,7 @@ type ProtocolFilterWrapper struct { func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter { if pfw.protocol == nil { - pfw.protocol = extension.GetProtocolExtension(invoker.GetUrl().Protocol) + pfw.protocol = extension.GetProtocol(invoker.GetUrl().Protocol) } invoker = buildInvokerChain(invoker, constant.SERVICE_FILTER_KEY) return pfw.protocol.Export(invoker) @@ -48,7 +48,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { if pfw.protocol == nil { - pfw.protocol = extension.GetProtocolExtension(url.Protocol) + pfw.protocol = extension.GetProtocol(url.Protocol) } return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) } @@ -69,7 +69,7 @@ func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { next := invoker // The order of filters is from left to right, so loading from right to left for i := len(filtNames) - 2; i >= 0; i-- { - filter := extension.GetFilterExtension(filtNames[i]) + filter := extension.GetFilter(filtNames[i]) fi := &FilterInvoker{next: next, invoker: invoker, filter: filter} next = fi } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index f3602cc3291b2235272cab98cb68b18b01908f13..49015cdc4b0ce813f7d46d34b3e8ff2a52db80b9 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -32,7 +32,7 @@ import ( ) func TestProtocolFilterWrapper_Export(t *testing.T) { - filtProto := extension.GetProtocolExtension(FILTER) + filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions("Service", @@ -44,7 +44,7 @@ func TestProtocolFilterWrapper_Export(t *testing.T) { } func TestProtocolFilterWrapper_Refer(t *testing.T) { - filtProto := extension.GetProtocolExtension(FILTER) + filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions("Service", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 8122e0155ad910f7bf705cd973ce8b9a5015ac24..9b143a19ebcddc143a117bb283d7ebf6f08952f6 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -189,7 +189,7 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) { if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok { log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key()) - newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(url) if newInvoker != nil { dir.cacheInvokersMap.Store(url.Key(), newInvoker) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 7c7de267f589e3516298f2e2bfd984a4ea36400a..e914490973865a3a9c6c9d17cd098728bdf08d97 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -54,7 +54,7 @@ func newRegistryProtocol() *registryProtocol { } } func getRegistry(regUrl *common.URL) registry.Registry { - reg, err := extension.GetRegistryExtension(regUrl.Protocol, regUrl) + reg, err := extension.GetRegistry(regUrl.Protocol, regUrl) if err != nil { log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) panic(err.Error()) @@ -124,7 +124,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte log.Info("The exporter has been cached, and will return cached exporter!") } else { wrappedInvoker := newWrappedInvoker(invoker, providerUrl) - cachedExporter = extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker) + cachedExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(wrappedInvoker) proto.bounds.Store(key, cachedExporter) log.Info("The exporter has not been cached, and will return a new exporter!") }