Skip to content
Snippets Groups Projects
Commit 116041f9 authored by fangyincheng's avatar fangyincheng
Browse files

Mrg:merge

parents 5ae13d9a a3d43ed6
No related branches found
No related tags found
No related merge requests found
Showing
with 169 additions and 47 deletions
......@@ -18,7 +18,7 @@ classes
# Gopkg.lock
# vendor/
vendor/
logs/
......@@ -7,7 +7,7 @@ Apache Dubbo Go Implementation.
Apache License, Version 2.0
## Code design ##
Based on dubbo's layered code design (protocol layer,registry layer,cluster layer,config layer and so on),
Based on dubbo's layered code design (protocol layer,registry layer,cluster layer,config layer and so on), you can achieve your needs by invoking 'extension.SetXXX' to extend these layered interfaces instead of modifying go-for-apache-dubbo's source code. And welcome to contribute your awesome extension.
About detail design please refer to [code layered design](https://github.com/dubbo/go-for-apache-dubbo/wiki/dubbo-go-V2.6-design)
## Feature list ##
......@@ -33,8 +33,8 @@ Todo List:
- routing rule (dubbo v2.6.x)
- monitoring (dubbo v2.6.x)
- metrics (dubbo v2.6.x)
- dynamic configuration (dubbo v2.7.x)
- metrics (dubbo v2.7.x) waiting dubbo's quota
You can know more about dubbo-go by its [roadmap](https://github.com/dubbo/go-for-apache-dubbo/wiki/Roadmap).
......
......@@ -28,7 +28,7 @@ func SetCluster(name string, fcn func() cluster.Cluster) {
func GetCluster(name string) cluster.Cluster {
if clusters[name] == nil {
panic("cluster for " + name + " is not existing, you must import corresponding package.")
panic("cluster for " + name + " is not existing, make sure you have import the package.")
}
return clusters[name]()
}
......@@ -19,20 +19,16 @@ import (
)
var (
filters map[string]func() filter.Filter
)
func init() {
filters = make(map[string]func() filter.Filter)
}
)
func SetFilter(name string, v func() filter.Filter) {
filters[name] = v
}
func GetFilterExtension(name string) filter.Filter {
func GetFilter(name string) filter.Filter {
if filters[name] == nil {
panic("filter for " + name + " is not existing, you must import corresponding package.")
panic("filter for " + name + " is not existing, make sure you have import the package.")
}
return filters[name]()
}
......@@ -26,7 +26,7 @@ func SetLoadbalance(name string, fcn func() cluster.LoadBalance) {
func GetLoadbalance(name string) cluster.LoadBalance {
if loadbalances[name] == nil {
panic("loadbalance for " + name + " is not existing, you must import corresponding package.")
panic("loadbalance for " + name + " is not existing, make sure you have import the package.")
}
return loadbalances[name]()
}
......@@ -19,20 +19,16 @@ import (
)
var (
protocols map[string]func() protocol.Protocol
)
func init() {
protocols = make(map[string]func() protocol.Protocol)
}
)
func SetProtocol(name string, v func() protocol.Protocol) {
protocols[name] = v
}
func GetProtocolExtension(name string) protocol.Protocol {
func GetProtocol(name string) protocol.Protocol {
if protocols[name] == nil {
panic("protocol for " + name + " is not existing, you must import corresponding package.")
panic("protocol for " + name + " is not existing, make sure you have import the package.")
}
return protocols[name]()
}
package extension
import "github.com/dubbo/go-for-apache-dubbo/common/proxy"
var (
proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory)
)
func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) {
proxy_factories[name] = f
}
func GetProxyFactory(name string) proxy.ProxyFactory {
if name == "" {
name = "default"
}
if proxy_factories[name] == nil {
panic("proxy factory for " + name + " is not existing, make sure you have import the package.")
}
return proxy_factories[name]()
}
......@@ -20,24 +20,16 @@ import (
)
var (
registrys map[string]func(config *common.URL) (registry.Registry, error)
)
/*
it must excute first
*/
func init() {
// init map
registrys = make(map[string]func(config *common.URL) (registry.Registry, error))
}
)
func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) {
registrys[name] = v
}
func GetRegistryExtension(name string, config *common.URL) (registry.Registry, error) {
func GetRegistry(name string, config *common.URL) (registry.Registry, error) {
if registrys[name] == nil {
panic("registry for " + name + " is not existing, you must import corresponding package.")
panic("registry for " + name + " is not existing, make sure you have import the package.")
}
return registrys[name](config)
......
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetInvoker(url common.URL) protocol.Invoker
}
type Option func(ProxyFactory)
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy_factory
import (
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/common/proxy"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
func init() {
extension.SetProxyFactory("default", NewDefaultProxyFactory)
}
type DefaultProxyFactory struct {
//delegate ProxyFactory
}
//you can rewrite DefaultProxyFactory in extension and delegate the default proxy factory like below
//func WithDelegate(delegateProxyFactory ProxyFactory) Option {
// return func(proxy ProxyFactory) {
// proxy.(*DefaultProxyFactory).delegate = delegateProxyFactory
// }
//}
func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory {
return &DefaultProxyFactory{}
}
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy {
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
return proxy.NewProxy(invoker, nil, attachments)
}
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
//TODO:yincheng need to do the service invoker refactor
return protocol.NewBaseInvoker(url)
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy_factory
import (
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/stretchr/testify/assert"
"testing"
)
func Test_GetProxy(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions("testservice")
proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url)
assert.NotNil(t, proxy)
}
func Test_GetInvoker(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions("testservice")
invoker := proxyFactory.GetInvoker(*url)
assert.True(t, invoker.IsAvailable())
}
......@@ -154,7 +154,8 @@ type ConsumerConfig struct {
Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"`
RequestTimeout time.Duration
Check *bool `yaml:"check" json:"check,omitempty"`
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
// application
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
......@@ -188,7 +189,8 @@ type ProviderConfig struct {
Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"`
Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"`
Filter string `yaml:"filter" json:"filter,omitempty"`
Filter string `yaml:"filter" json:"filter,omitempty"`
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty"`
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
......
......@@ -15,6 +15,7 @@
package config
import (
"github.com/dubbo/go-for-apache-dubbo/common/proxy/proxy_factory"
"path/filepath"
"testing"
)
......@@ -60,6 +61,7 @@ func TestLoad(t *testing.T) {
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"}
refConfigs, svcConfigs := Load()
......
......@@ -80,21 +80,19 @@ func (refconfig *ReferenceConfig) Refer() {
}
if len(regUrls) == 1 {
refconfig.invoker = extension.GetProtocolExtension("registry").Refer(*regUrls[0])
refconfig.invoker = extension.GetProtocol("registry").Refer(*regUrls[0])
} else {
invokers := []protocol.Invoker{}
for _, regUrl := range regUrls {
invokers = append(invokers, extension.GetProtocolExtension("registry").Refer(*regUrl))
invokers = append(invokers, extension.GetProtocol("registry").Refer(*regUrl))
}
cluster := extension.GetCluster("registryAware")
refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
refconfig.pxy = proxy.NewProxy(refconfig.invoker, nil, attachments)
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url)
}
// @v is service provider implemented RPCService
......
......@@ -106,13 +106,15 @@ func (srvconfig *ServiceConfig) Export() error {
for _, regUrl := range regUrls {
regUrl.SubURL = url
invoker := protocol.NewBaseInvoker(*regUrl)
srvconfig.cacheMutex.Lock()
if srvconfig.cacheProtocol == nil {
log.Info("First load the registry protocol!")
srvconfig.cacheProtocol = extension.GetProtocolExtension("registry")
srvconfig.cacheProtocol = extension.GetProtocol("registry")
}
srvconfig.cacheMutex.Unlock()
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter := srvconfig.cacheProtocol.Export(invoker)
if exporter == nil {
panic(errors.New("New exporter error"))
......
......@@ -41,7 +41,7 @@ type ProtocolFilterWrapper struct {
func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter {
if pfw.protocol == nil {
pfw.protocol = extension.GetProtocolExtension(invoker.GetUrl().Protocol)
pfw.protocol = extension.GetProtocol(invoker.GetUrl().Protocol)
}
invoker = buildInvokerChain(invoker, constant.SERVICE_FILTER_KEY)
return pfw.protocol.Export(invoker)
......@@ -49,7 +49,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo
func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker {
if pfw.protocol == nil {
pfw.protocol = extension.GetProtocolExtension(url.Protocol)
pfw.protocol = extension.GetProtocol(url.Protocol)
}
return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY)
}
......@@ -67,8 +67,9 @@ func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker {
next := invoker
// The order of filters is from left to right, so loading from right to left
for i := len(filtNames) - 1; i >= 0; i-- {
filter := extension.GetFilterExtension(filtNames[i])
filter := extension.GetFilter(filtNames[i])
fi := &FilterInvoker{next: next, invoker: invoker, filter: filter}
next = fi
}
......
......@@ -32,7 +32,7 @@ import (
)
func TestProtocolFilterWrapper_Export(t *testing.T) {
filtProto := extension.GetProtocolExtension(FILTER)
filtProto := extension.GetProtocol(FILTER)
filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{}
u := common.NewURLWithOptions("Service",
......@@ -44,7 +44,7 @@ func TestProtocolFilterWrapper_Export(t *testing.T) {
}
func TestProtocolFilterWrapper_Refer(t *testing.T) {
filtProto := extension.GetProtocolExtension(FILTER)
filtProto := extension.GetProtocol(FILTER)
filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{}
u := common.NewURLWithOptions("Service",
......
......@@ -189,7 +189,7 @@ func (dir *registryDirectory) cacheInvoker(url common.URL) {
if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok {
log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key())
newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(url)
if newInvoker != nil {
dir.cacheInvokersMap.Store(url.Key(), newInvoker)
}
......
......@@ -54,7 +54,7 @@ func newRegistryProtocol() *registryProtocol {
}
}
func getRegistry(regUrl *common.URL) registry.Registry {
reg, err := extension.GetRegistryExtension(regUrl.Protocol, regUrl)
reg, err := extension.GetRegistry(regUrl.Protocol, regUrl)
if err != nil {
log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
panic(err.Error())
......@@ -124,7 +124,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
log.Info("The exporter has been cached, and will return cached exporter!")
} else {
wrappedInvoker := newWrappedInvoker(invoker, providerUrl)
cachedExporter = extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker)
cachedExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(wrappedInvoker)
proto.bounds.Store(key, cachedExporter)
log.Info("The exporter has not been cached, and will return a new exporter!")
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment