diff --git a/.gitignore b/.gitignore index b8d1f6332ba31faa57814d3e62f869fd42441e8a..6e2013d1b448542ad653bb0febf98a6277f5d864 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ classes # Gopkg.lock -# vendor/ +vendor/ logs/ diff --git a/README.md b/README.md index c90eab1ee81be363de4ff46459360c973df50717..a94d510e741f1147c0f05d83319e6196e53cd6ba 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Apache Dubbo Go Implementation. Apache License, Version 2.0 ## Code design ## -Based on dubbo's layered code design (protocol layer,registry layer,cluster layer,config layer and so on), +Based on dubbo's layered code design (protocol layer,registry layer,cluster layer,config layer and so on), you can achieve your needs by invoking 'extension.SetXXX' to extend these layered interfaces instead of modifying go-for-apache-dubbo's source code. And welcome to contribute your awesome extension. About detail design please refer to [code layered design](https://github.com/dubbo/go-for-apache-dubbo/wiki/dubbo-go-V2.6-design) ## Feature list ## @@ -33,8 +33,8 @@ Todo List: - routing rule (dubbo v2.6.x) - monitoring (dubbo v2.6.x) -- metrics (dubbo v2.6.x) - dynamic configuration (dubbo v2.7.x) +- metrics (dubbo v2.7.x) waiting dubbo's quota You can know more about dubbo-go by its [roadmap](https://github.com/dubbo/go-for-apache-dubbo/wiki/Roadmap). diff --git a/common/extension/cluster.go b/common/extension/cluster.go index 1e02530482b7d655cbad580a8d9ea2ec77833dba..54b5ac59d31950c7ac42d81856f11c28f4716a19 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -28,7 +28,7 @@ func SetCluster(name string, fcn func() cluster.Cluster) { func GetCluster(name string) cluster.Cluster { if clusters[name] == nil { - panic("cluster for " + name + " is not existing, you must import corresponding package.") + panic("cluster for " + name + " is not existing, make sure you have import the package.") } return clusters[name]() } diff --git a/common/extension/filter.go b/common/extension/filter.go index cc39c5fe7e1e4b2519351cd6116044439ca5fca7..263b67eae4023bdca49af4f1197af7f19ba3dab0 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -19,20 +19,16 @@ import ( ) var ( - filters map[string]func() filter.Filter -) - -func init() { filters = make(map[string]func() filter.Filter) -} +) func SetFilter(name string, v func() filter.Filter) { filters[name] = v } -func GetFilterExtension(name string) filter.Filter { +func GetFilter(name string) filter.Filter { if filters[name] == nil { - panic("filter for " + name + " is not existing, you must import corresponding package.") + panic("filter for " + name + " is not existing, make sure you have import the package.") } return filters[name]() } diff --git a/common/extension/loadbalance.go b/common/extension/loadbalance.go index bc1139bdcb18558587f6a5531a192f2a287f365c..bb46c8283386351bfb8f789bedabd0fbd5e7b43d 100644 --- a/common/extension/loadbalance.go +++ b/common/extension/loadbalance.go @@ -26,7 +26,7 @@ func SetLoadbalance(name string, fcn func() cluster.LoadBalance) { func GetLoadbalance(name string) cluster.LoadBalance { if loadbalances[name] == nil { - panic("loadbalance for " + name + " is not existing, you must import corresponding package.") + panic("loadbalance for " + name + " is not existing, make sure you have import the package.") } return loadbalances[name]() } diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 791db0415f92d12acc92ef7bedcadca563362afe..f1ddbdeef8bb3de0207befaf3ec7063571723513 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -19,20 +19,16 @@ import ( ) var ( - protocols map[string]func() protocol.Protocol -) - -func init() { protocols = make(map[string]func() protocol.Protocol) -} +) func SetProtocol(name string, v func() protocol.Protocol) { protocols[name] = v } -func GetProtocolExtension(name string) protocol.Protocol { +func GetProtocol(name string) protocol.Protocol { if protocols[name] == nil { - panic("protocol for " + name + " is not existing, you must import corresponding package.") + panic("protocol for " + name + " is not existing, make sure you have import the package.") } return protocols[name]() } diff --git a/common/extension/proxy_factory.go b/common/extension/proxy_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..8795ea8f975237c4d263053fb67052d5b98cff84 --- /dev/null +++ b/common/extension/proxy_factory.go @@ -0,0 +1,20 @@ +package extension + +import "github.com/dubbo/go-for-apache-dubbo/common/proxy" + +var ( + proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) +) + +func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { + proxy_factories[name] = f +} +func GetProxyFactory(name string) proxy.ProxyFactory { + if name == "" { + name = "default" + } + if proxy_factories[name] == nil { + panic("proxy factory for " + name + " is not existing, make sure you have import the package.") + } + return proxy_factories[name]() +} diff --git a/common/extension/registry.go b/common/extension/registry.go index bfbefa5b8bc29021782c86551a03d69835e8d2b6..d6cacd88028bf96697e11525098454d741784351 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -20,24 +20,16 @@ import ( ) var ( - registrys map[string]func(config *common.URL) (registry.Registry, error) -) - -/* -it must excute first -*/ -func init() { - // init map registrys = make(map[string]func(config *common.URL) (registry.Registry, error)) -} +) func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) { registrys[name] = v } -func GetRegistryExtension(name string, config *common.URL) (registry.Registry, error) { +func GetRegistry(name string, config *common.URL) (registry.Registry, error) { if registrys[name] == nil { - panic("registry for " + name + " is not existing, you must import corresponding package.") + panic("registry for " + name + " is not existing, make sure you have import the package.") } return registrys[name](config) diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..4d7fa52b7451b0c014d2ffdc3f0383acb6c777d4 --- /dev/null +++ b/common/proxy/proxy_factory.go @@ -0,0 +1,26 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +type ProxyFactory interface { + GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetInvoker(url common.URL) protocol.Invoker +} + +type Option func(ProxyFactory) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go new file mode 100644 index 0000000000000000000000000000000000000000..54ba45392bea7bcbb578178845ca781dbffe3cb5 --- /dev/null +++ b/common/proxy/proxy_factory/default.go @@ -0,0 +1,52 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy_factory + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/common/proxy" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func init() { + extension.SetProxyFactory("default", NewDefaultProxyFactory) +} + +type DefaultProxyFactory struct { + //delegate ProxyFactory +} + +//you can rewrite DefaultProxyFactory in extension and delegate the default proxy factory like below + +//func WithDelegate(delegateProxyFactory ProxyFactory) Option { +// return func(proxy ProxyFactory) { +// proxy.(*DefaultProxyFactory).delegate = delegateProxyFactory +// } +//} + +func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { + return &DefaultProxyFactory{} +} +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { + //create proxy + attachments := map[string]string{} + attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") + return proxy.NewProxy(invoker, nil, attachments) +} +func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { + //TODO:yincheng need to do the service invoker refactor + return protocol.NewBaseInvoker(url) +} diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go new file mode 100644 index 0000000000000000000000000000000000000000..385dfee636a5422ca81deee39b1195201c6edd3c --- /dev/null +++ b/common/proxy/proxy_factory/default_test.go @@ -0,0 +1,35 @@ +// Copyright 2016-2019 hxmhlt +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package proxy_factory + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_GetProxy(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions("testservice") + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + assert.NotNil(t, proxy) +} + +func Test_GetInvoker(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions("testservice") + invoker := proxyFactory.GetInvoker(*url) + assert.True(t, invoker.IsAvailable()) +} diff --git a/config/config_loader.go b/config/config_loader.go index 92fff0ba7caf1428459bb8f8a3cbc5abcdf53dde..dd1c60d632390838f8c0e34af377e4f6d9800327 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -154,7 +154,8 @@ type ConsumerConfig struct { Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` RequestTimeout time.Duration - Check *bool `yaml:"check" json:"check,omitempty"` + ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` + Check *bool `yaml:"check" json:"check,omitempty"` // application ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` @@ -188,7 +189,8 @@ type ProviderConfig struct { Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` - Filter string `yaml:"filter" json:"filter,omitempty"` + Filter string `yaml:"filter" json:"filter,omitempty"` + ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"` ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 4496671bd5e7d803402c3e954491255e1be337c1..9464fd570e1651f75d7d8df8ae4a3250f049b168 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -15,6 +15,7 @@ package config import ( + "github.com/dubbo/go-for-apache-dubbo/common/proxy/proxy_factory" "path/filepath" "testing" ) @@ -60,6 +61,7 @@ func TestLoad(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"} refConfigs, svcConfigs := Load() diff --git a/config/reference_config.go b/config/reference_config.go index a52cb4334534656ea24542aab90e21e98a8f0a59..af57cc5114a744c417848500bfd25784880f33c7 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -80,21 +80,19 @@ func (refconfig *ReferenceConfig) Refer() { } if len(regUrls) == 1 { - refconfig.invoker = extension.GetProtocolExtension("registry").Refer(*regUrls[0]) + refconfig.invoker = extension.GetProtocol("registry").Refer(*regUrls[0]) } else { invokers := []protocol.Invoker{} for _, regUrl := range regUrls { - invokers = append(invokers, extension.GetProtocolExtension("registry").Refer(*regUrl)) + invokers = append(invokers, extension.GetProtocol("registry").Refer(*regUrl)) } cluster := extension.GetCluster("registryAware") refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } //create proxy - attachments := map[string]string{} - attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - refconfig.pxy = proxy.NewProxy(refconfig.invoker, nil, attachments) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) } // @v is service provider implemented RPCService diff --git a/config/service_config.go b/config/service_config.go index 3c9df84843196f9fa24bcc29a0774036e44445a4..46584b7c28ce177385624f2e903bb6e21ce93de3 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -106,13 +106,15 @@ func (srvconfig *ServiceConfig) Export() error { for _, regUrl := range regUrls { regUrl.SubURL = url - invoker := protocol.NewBaseInvoker(*regUrl) + srvconfig.cacheMutex.Lock() if srvconfig.cacheProtocol == nil { log.Info("First load the registry protocol!") - srvconfig.cacheProtocol = extension.GetProtocolExtension("registry") + srvconfig.cacheProtocol = extension.GetProtocol("registry") } srvconfig.cacheMutex.Unlock() + + invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) exporter := srvconfig.cacheProtocol.Export(invoker) if exporter == nil { panic(errors.New("New exporter error")) diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 10ac57ba5163b416eb738fb1550501a74630ce14..e7cb227d51215b7a0bfc8e56fde86f1a5bf18777 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -41,7 +41,7 @@ type ProtocolFilterWrapper struct { func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter { if pfw.protocol == nil { - pfw.protocol = extension.GetProtocolExtension(invoker.GetUrl().Protocol) + pfw.protocol = extension.GetProtocol(invoker.GetUrl().Protocol) } invoker = buildInvokerChain(invoker, constant.SERVICE_FILTER_KEY) return pfw.protocol.Export(invoker) @@ -49,7 +49,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { if pfw.protocol == nil { - pfw.protocol = extension.GetProtocolExtension(url.Protocol) + pfw.protocol = extension.GetProtocol(url.Protocol) } return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) } @@ -67,8 +67,9 @@ func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { next := invoker // The order of filters is from left to right, so loading from right to left + for i := len(filtNames) - 1; i >= 0; i-- { - filter := extension.GetFilterExtension(filtNames[i]) + filter := extension.GetFilter(filtNames[i]) fi := &FilterInvoker{next: next, invoker: invoker, filter: filter} next = fi } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index f3602cc3291b2235272cab98cb68b18b01908f13..49015cdc4b0ce813f7d46d34b3e8ff2a52db80b9 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -32,7 +32,7 @@ import ( ) func TestProtocolFilterWrapper_Export(t *testing.T) { - filtProto := extension.GetProtocolExtension(FILTER) + filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions("Service", @@ -44,7 +44,7 @@ func TestProtocolFilterWrapper_Export(t *testing.T) { } func TestProtocolFilterWrapper_Refer(t *testing.T) { - filtProto := extension.GetProtocolExtension(FILTER) + filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions("Service", diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 8122e0155ad910f7bf705cd973ce8b9a5015ac24..9b143a19ebcddc143a117bb283d7ebf6f08952f6 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -189,7 +189,7 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) { if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok { log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key()) - newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(url) if newInvoker != nil { dir.cacheInvokersMap.Store(url.Key(), newInvoker) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 7c7de267f589e3516298f2e2bfd984a4ea36400a..e914490973865a3a9c6c9d17cd098728bdf08d97 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -54,7 +54,7 @@ func newRegistryProtocol() *registryProtocol { } } func getRegistry(regUrl *common.URL) registry.Registry { - reg, err := extension.GetRegistryExtension(regUrl.Protocol, regUrl) + reg, err := extension.GetRegistry(regUrl.Protocol, regUrl) if err != nil { log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) panic(err.Error()) @@ -124,7 +124,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte log.Info("The exporter has been cached, and will return cached exporter!") } else { wrappedInvoker := newWrappedInvoker(invoker, providerUrl) - cachedExporter = extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker) + cachedExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(wrappedInvoker) proto.bounds.Store(key, cachedExporter) log.Info("The exporter has not been cached, and will return a new exporter!") }