Skip to content
Snippets Groups Projects
Commit ad63215e authored by AlexStocks's avatar AlexStocks
Browse files

Merge branch 'master' of https://github.com/apache/dubbo-go

parents 3d3dd8ed edcdbd9d
No related branches found
No related tags found
No related merge requests found
Showing
with 386 additions and 72 deletions
......@@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
*.jar
# Test binary, build with `go test -c`
*.test
......@@ -17,10 +18,15 @@ coverage.txt
target/
classes
# Gopkg.lock
# go mod, go test
vendor/
coverage.txt
logs/
.vscode/
coverage.txt
# unit test
remoting/zookeeper/zookeeper-4unittest/
config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
# Release Notes
## 1.1.0
### New Features
- Support Java bigdecimal<https://github.com/apache/dubbo-go/pull/126>
- Support all JDK exceptions<https://github.com/apache/dubbo-go/pull/120>
- Support multi-version of service<https://github.com/apache/dubbo-go/pull/119>
- Allow user set custom params for registry<https://github.com/apache/dubbo-go/pull/117>
- Support zookeeper config center<https://github.com/apache/dubbo-go/pull/99>
- Failsafe/Failback Cluster Strategy<https://github.com/apache/dubbo-go/pull/136>;
### Enhancement
- Use time wheel instead of time.After to defeat timer object memory leakage<https://github.com/apache/dubbo-go/pull/130>
### Bugfixes
- Preventing dead loop when got zookeeper unregister event<https://github.com/apache/dubbo-go/pull/129>
- Delete ineffassign<https://github.com/apache/dubbo-go/pull/127>
- Add wg.Done() for mockDataListener<https://github.com/apache/dubbo-go/pull/118>
- Delete wrong spelling words<https://github.com/apache/dubbo-go/pull/107>
- Use sync.Map to defeat from gettyClientPool deadlock<https://github.com/apache/dubbo-go/pull/106>
- Handle panic when function args list is empty<https://github.com/apache/dubbo-go/pull/98>
- url.Values is not safe map<https://github.com/apache/dubbo-go/pull/172>;
......@@ -176,7 +176,6 @@
END OF TERMS AND CONDITIONS
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
......
......@@ -6,9 +6,6 @@
---
Apache Dubbo Go Implementation.
![Apache Dubbo-go](./dubbogo.png "Apache Dubbo-go")
Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/).
## License
......@@ -34,19 +31,19 @@ Finished List:
- Transport: HTTP, TCP
- Codec: JsonRPC v2, Hessian v2
- Registry: ZooKeeper/[etcd v3](https://github.com/apache/dubbo-go/pull/148)/[nacos](https://github.com/apache/dubbo-go/pull/151)/[consul](https://github.com/apache/dubbo-go/pull/121)
- Configure Center: Zookeeper
- Dynamic Configure Center & Service Management Configurator: Zookeeper
- Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)
- Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group
Working List:
- Load Balance: ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter
- Filter: AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter
- Registry: k8s
- Configure Center: apollo
- Dynamic Configuration Center & Metadata Center (dubbo v2.7.x)
- Metadata Center (dubbo v2.7.x)
- Metrics: Promethus(dubbo v2.7.x)
Todo List:
......@@ -67,6 +64,19 @@ The subdirectory examples shows how to use dubbo-go. Please read the [examples/R
## Running unit tests
### Prepare
Mac/Linux
```bash
sh ./before_ut.sh
```
Windows
```bash
before_ut.bat
```
# Run
```bash
go test ./...
......
......@@ -5,9 +5,6 @@
---
Apache Dubbo Go 语言实现
![Apache Dubbo-go](./dubbogo.png "Apache Dubbo-go")
Apache/Dubbo-go image, licensed under [Creative Commons 3.0 Attributions license](https://creativecommons.org/licenses/by/3.0/).
## 证书 ##
......@@ -33,20 +30,20 @@ Apache License, Version 2.0
- 传输协议: HTTP, TCP
- 序列化协议: JsonRPC v2, Hessian v2
- 注册中心: ZooKeeper/[etcd v3](https://github.com/apache/dubbo-go/pull/148)/[nacos](https://github.com/apache/dubbo-go/pull/151)/[consul](https://github.com/apache/dubbo-go/pull/121)
- 配置中心: Zookeeper
- 动态配置中心与服务治理配置器(config center): Zookeeper
- 集群策略: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- 负载均衡策略: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)
- 其他功能支持: [泛化调用](https://github.com/apache/dubbo-go/pull/122)/启动时检查/服务直连/多服务协议/多注册中心/多服务版本/服务分组
开发中列表:
- 集群策略: Forking
- 负载均衡策略: ConsistentHash
- 过滤器: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter
- 过滤器: AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter
- 注册中心: k8s
- 配置中心: apollo
- 动态配置中心 & 元数据中心 (dubbo v2.7.x)
- 元数据中心 (dubbo v2.7.x)
- Metrics: Promethus(dubbo v2.7.x)
任务列表:
......@@ -67,10 +64,23 @@ TODO
## 运行单测
### 准备
Mac/Linux
```bash
sh ./before_ut.sh
```
Windows
```bash
before_ut.bat
```
# 执行
```bash
go test ./...
# 覆盖率
# coverage
go test ./... -coverprofile=coverage.txt -covermode=atomic
```
......
set zkJar=zookeeper-3.4.9-fatjar.jar
md remoting\zookeeper\zookeeper-4unittest\contrib\fatjar config_center\zookeeper\zookeeper-4unittest\contrib\fatjar registry\zookeeper\zookeeper-4unittest\contrib\fatjar
curl -L https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/%zkJar% -o remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/"
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "registry/zookeeper/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar registry/zookeeper/zookeeper-4unittest/contrib/fatjar
wget -P "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar" https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
\ No newline at end of file
......@@ -18,6 +18,7 @@
package cluster_impl
import (
"strconv"
"sync"
"time"
)
......@@ -54,15 +55,18 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
retriesConfig = constant.DEFAULT_FAILBACK_TIMES
retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
retries = constant.DEFAULT_FAILBACK_TIMES_INT
}
failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
invoker.maxRetries = retriesConfig
invoker.maxRetries = int64(retries)
invoker.failbackTasks = failbackTasksConfig
return invoker
}
......
......@@ -17,6 +17,10 @@
package cluster_impl
import (
"strconv"
)
import (
perrors "github.com/pkg/errors"
)
......@@ -24,6 +28,7 @@ import (
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -53,16 +58,21 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
url := invokers[0].GetUrl()
//get reties
retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, 0); v != 0 {
retries = v
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
for i := int64(0); i < retries; i++ {
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
......
......@@ -118,14 +118,14 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
}
func Test_FailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 2, urlParams)
result := normalInvoke(t, 3, urlParams)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverInvokeFail(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 3, urlParams)
result := normalInvoke(t, 4, urlParams)
assert.Errorf(t, result.Error(), "error")
count = 0
}
......@@ -133,7 +133,7 @@ func Test_FailoverInvokeFail(t *testing.T) {
func Test_FailoverInvoke1(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3")
result := normalInvoke(t, 3, urlParams)
result := normalInvoke(t, 4, urlParams)
assert.NoError(t, result.Error())
count = 0
}
......@@ -144,7 +144,7 @@ func Test_FailoverInvoke2(t *testing.T) {
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
result := normalInvoke(t, 3, urlParams, ivc)
result := normalInvoke(t, 4, urlParams, ivc)
assert.NoError(t, result.Error())
count = 0
}
......
......@@ -42,6 +42,9 @@ func NewBaseDirectory(url *common.URL) BaseDirectory {
func (dir *BaseDirectory) GetUrl() common.URL {
return *dir.url
}
func (dir *BaseDirectory) GetDirectoryUrl() *common.URL {
return dir.url
}
func (dir *BaseDirectory) Destroy(doDestroy func()) {
if dir.destroyed.CAS(false, true) {
......
......@@ -15,7 +15,6 @@
* limitations under the License.
*/
// @author yiji@apache.org
package loadbalance
import (
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package loadbalance
import (
......
......@@ -23,15 +23,21 @@ import (
"sync"
)
import (
"github.com/apache/dubbo-go/config_center"
)
// There is dubbo.properties file and application level config center configuration which higner than normal config center in java. So in java the
// configuration sequence will be config center > application level config center > dubbo.properties > spring bean configuration.
// But in go, neither the dubbo.properties file or application level config center configuration will not support for the time being.
// We just have config center configuration which can override configuration in consumer.yaml & provider.yaml.
// But for add these features in future ,I finish the environment struct following Environment class in java.
type Environment struct {
configCenterFirst bool
externalConfigs sync.Map
externalConfigMap sync.Map
configCenterFirst bool
externalConfigs sync.Map
externalConfigMap sync.Map
appExternalConfigMap sync.Map
dynamicConfiguration config_center.DynamicConfiguration
}
var (
......@@ -45,6 +51,9 @@ func GetEnvInstance() *Environment {
})
return instance
}
func NewEnvInstance() {
instance = &Environment{configCenterFirst: true}
}
//func (env *Environment) SetConfigCenterFirst() {
// env.configCenterFirst = true
......@@ -60,23 +69,34 @@ func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) {
}
}
func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string) {
for k, v := range externalMap {
env.appExternalConfigMap.Store(k, v)
}
}
func (env *Environment) Configuration() *list.List {
list := list.New()
memConf := newInmemoryConfiguration()
memConf.setProperties(&(env.externalConfigMap))
list.PushBack(memConf)
// The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
list.PushFront(newInmemoryConfiguration(&(env.externalConfigMap)))
list.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap)))
return list
}
func (env *Environment) SetDynamicConfiguration(dc config_center.DynamicConfiguration) {
env.dynamicConfiguration = dc
}
func (env *Environment) GetDynamicConfiguration() config_center.DynamicConfiguration {
return env.dynamicConfiguration
}
type InmemoryConfiguration struct {
store *sync.Map
}
func newInmemoryConfiguration() *InmemoryConfiguration {
return &InmemoryConfiguration{}
}
func (conf *InmemoryConfiguration) setProperties(p *sync.Map) {
conf.store = p
func newInmemoryConfiguration(p *sync.Map) *InmemoryConfiguration {
return &InmemoryConfiguration{store: p}
}
func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) {
......
......@@ -19,6 +19,7 @@ package config
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
......@@ -38,7 +39,7 @@ func TestEnvironment_UpdateExternalConfigMap(t *testing.T) {
func TestEnvironment_ConfigurationAndGetProperty(t *testing.T) {
GetEnvInstance().UpdateExternalConfigMap(map[string]string{"1": "2"})
list := GetEnvInstance().Configuration()
ok, v := list.Front().Value.(*InmemoryConfiguration).GetProperty("1")
ok, v := list.Back().Value.(*InmemoryConfiguration).GetProperty("1")
assert.True(t, ok)
assert.Equal(t, "2", v)
}
......
......@@ -18,27 +18,35 @@
package constant
const (
DUBBO = "dubbo"
DUBBO = "dubbo"
PROVIDER_PROTOCOL = "provider"
//compatible with 2.6.x
OVERRIDE_PROTOCOL = "override"
EMPTY_PROTOCOL = "empty"
ROUTER_PROTOCOL = "router"
)
const (
DEFAULT_WEIGHT = 100 //
DEFAULT_WARMUP = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
)
const (
DEFAULT_LOADBALANCE = "random"
DEFAULT_RETRIES = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = 3
DEFAULT_FAILBACK_TASKS = 100
DEFAULT_LOADBALANCE = "random"
DEFAULT_RETRIES = "2"
DEFAULT_RETRIES_INT = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = "3"
DEFAULT_FAILBACK_TIMES_INT = 3
DEFAULT_FAILBACK_TASKS = 100
)
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo"
DEFAULT_SERVICE_FILTERS = "echo,token"
DEFAULT_REFERENCE_FILTERS = ""
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
......@@ -46,5 +54,16 @@ const (
)
const (
ANY_VALUE = "*"
ANY_VALUE = "*"
ANYHOST_VALUE = "0.0.0.0"
REMOVE_VALUE_PREFIX = "-"
)
const (
CONFIGURATORS_CATEGORY = "configurators"
ROUTER_CATEGORY = "category"
DEFAULT_CATEGORY = PROVIDER_CATEGORY
DYNAMIC_CONFIGURATORS_CATEGORY = "dynamicconfigurators"
APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators"
PROVIDER_CATEGORY = "providers"
)
......@@ -22,15 +22,22 @@ const (
)
const (
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
BEAN_NAME_KEY = "bean.name"
GENERIC_KEY = "generic"
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
CATEGORY_KEY = "category"
CHECK_KEY = "check"
ENABLED_KEY = "enabled"
SIDE_KEY = "side"
OVERRIDE_PROVIDERS_KEY = "providerAddresses"
BEAN_NAME_KEY = "bean.name"
GENERIC_KEY = "generic"
CLASSIFIER_KEY = "classifier"
TOKEN_KEY = "token"
)
const (
......@@ -79,16 +86,23 @@ const (
)
const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
CONFIG_VERSION_KEY = "configVersion"
COMPATIBLE_CONFIG_KEY = "compatible_config"
)
const (
RegistryConfigPrefix = "dubbo.registries."
ReferenceConfigPrefix = "dubbo.reference."
ServiceConfigPrefix = "dubbo.service."
ProtocolConfigPrefix = "dubbo.protocols."
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
RegistryConfigPrefix = "dubbo.registries."
SingleRegistryConfigPrefix = "dubbo.registry."
ReferenceConfigPrefix = "dubbo.reference."
ServiceConfigPrefix = "dubbo.service."
ProtocolConfigPrefix = "dubbo.protocols."
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
)
const (
CONFIGURATORS_SUFFIX = ".configurators"
)
const (
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
)
const DefaultKey = "default"
type getConfiguratorFunc func(url *common.URL) config_center.Configurator
var (
configurator = make(map[string]getConfiguratorFunc)
)
func SetConfigurator(name string, v getConfiguratorFunc) {
configurator[name] = v
}
func GetConfigurator(name string, url *common.URL) config_center.Configurator {
if configurator[name] == nil {
panic("configurator for " + name + " is not existing, make sure you have import the package.")
}
return configurator[name](url)
}
func SetDefaultConfigurator(v getConfiguratorFunc) {
configurator[DefaultKey] = v
}
func GetDefaultConfigurator(url *common.URL) config_center.Configurator {
if configurator[DefaultKey] == nil {
panic("configurator for default is not existing, make sure you have import the package.")
}
return configurator[DefaultKey](url)
}
func GetDefaultConfiguratorFunc() getConfiguratorFunc {
if configurator[DefaultKey] == nil {
panic("configurator for default is not existing, make sure you have import the package.")
}
return configurator[DefaultKey]
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
......
......@@ -17,10 +17,20 @@
package proxy_factory
import (
"reflect"
"strings"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/proxy"
"github.com/apache/dubbo-go/protocol"
)
......@@ -51,6 +61,86 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm
return proxy.NewProxy(invoker, nil, attachments)
}
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
// todo: call service
return protocol.NewBaseInvoker(url)
return &ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
}
}
type ProxyInvoker struct {
protocol.BaseInvoker
}
func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
result := &protocol.RPCResult{}
result.SetAttachments(invocation.Attachments())
url := pi.GetUrl()
methodName := invocation.MethodName()
proto := url.Protocol
path := strings.TrimPrefix(url.Path, "/")
args := invocation.Arguments()
// get service
svc := common.ServiceMap.GetService(proto, path)
if svc == nil {
logger.Errorf("cannot find service [%s] in %s", path, proto)
result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto))
return result
}
// get method
method := svc.Method()[methodName]
if method == nil {
logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)
result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto))
return result
}
in := []reflect.Value{svc.Rcvr()}
if method.CtxType() != nil {
in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later.
}
// prepare argv
if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" {
in = append(in, reflect.ValueOf(args))
} else {
for i := 0; i < len(args); i++ {
t := reflect.ValueOf(args[i])
if !t.IsValid() {
at := method.ArgsType()[i]
if at.Kind() == reflect.Ptr {
at = at.Elem()
}
t = reflect.New(at)
}
in = append(in, t)
}
}
// prepare replyv
var replyv reflect.Value
if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
in = append(in, replyv)
}
returnValues := method.Method().Func.Call(in)
var retErr interface{}
if len(returnValues) == 1 {
retErr = returnValues[0].Interface()
} else {
replyv = returnValues[0]
retErr = returnValues[1].Interface()
}
if retErr != nil {
result.SetError(retErr.(error))
} else {
if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
result.SetResult(replyv.Interface())
}
}
return result
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment