Skip to content
Snippets Groups Projects
Commit 157770c1 authored by vito.he's avatar vito.he
Browse files

Mod: merge conflict resolve

parents b1e99e9d 3a6c6829
No related branches found
No related tags found
No related merge requests found
Showing
with 225 additions and 50 deletions
......@@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
*.jar
# Test binary, build with `go test -c`
*.test
......@@ -17,10 +18,15 @@ coverage.txt
target/
classes
# Gopkg.lock
# go mod, go test
vendor/
coverage.txt
logs/
.vscode/
coverage.txt
# unit test
remoting/zookeeper/zookeeper-4unittest/
config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
\ No newline at end of file
# Release Notes
## 1.1.0
### New Features
- Support Java bigdecimal<https://github.com/apache/dubbo-go/pull/126>
- Support all JDK exceptions<https://github.com/apache/dubbo-go/pull/120>
- Support multi-version of service<https://github.com/apache/dubbo-go/pull/119>
- Allow user set custom params for registry<https://github.com/apache/dubbo-go/pull/117>
- Support zookeeper config center<https://github.com/apache/dubbo-go/pull/99>
- Failsafe/Failback Cluster Strategy<https://github.com/apache/dubbo-go/pull/136>;
### Enhancement
- Use time wheel instead of time.After to defeat timer object memory leakage<https://github.com/apache/dubbo-go/pull/130>
### Bugfixes
- Preventing dead loop when got zookeeper unregister event<https://github.com/apache/dubbo-go/pull/129>
- Delete ineffassign<https://github.com/apache/dubbo-go/pull/127>
- Add wg.Done() for mockDataListener<https://github.com/apache/dubbo-go/pull/118>
- Delete wrong spelling words<https://github.com/apache/dubbo-go/pull/107>
- Use sync.Map to defeat from gettyClientPool deadlock<https://github.com/apache/dubbo-go/pull/106>
- Handle panic when function args list is empty<https://github.com/apache/dubbo-go/pull/98>
- url.Values is not safe map<https://github.com/apache/dubbo-go/pull/172>;
......@@ -176,7 +176,6 @@
END OF TERMS AND CONDITIONS
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
......
......@@ -18,6 +18,7 @@
package cluster_impl
import (
"strconv"
"sync"
"time"
)
......@@ -54,15 +55,18 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
retriesConfig = constant.DEFAULT_FAILBACK_TIMES
retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
retries = constant.DEFAULT_FAILBACK_TIMES_INT
}
failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
invoker.maxRetries = retriesConfig
invoker.maxRetries = int64(retries)
invoker.failbackTasks = failbackTasksConfig
return invoker
}
......
......@@ -17,6 +17,10 @@
package cluster_impl
import (
"strconv"
)
import (
perrors "github.com/pkg/errors"
)
......@@ -24,6 +28,7 @@ import (
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -53,16 +58,21 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
url := invokers[0].GetUrl()
//get reties
retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, 0); v != 0 {
retries = v
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default retries configuration instead.")
retries = constant.DEFAULT_RETRIES_INT
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
for i := int64(0); i < retries; i++ {
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
......
......@@ -118,14 +118,14 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
}
func Test_FailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 2, urlParams)
result := normalInvoke(t, 3, urlParams)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverInvokeFail(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 3, urlParams)
result := normalInvoke(t, 4, urlParams)
assert.Errorf(t, result.Error(), "error")
count = 0
}
......@@ -133,7 +133,7 @@ func Test_FailoverInvokeFail(t *testing.T) {
func Test_FailoverInvoke1(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3")
result := normalInvoke(t, 3, urlParams)
result := normalInvoke(t, 4, urlParams)
assert.NoError(t, result.Error())
count = 0
}
......@@ -144,7 +144,7 @@ func Test_FailoverInvoke2(t *testing.T) {
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
result := normalInvoke(t, 3, urlParams, ivc)
result := normalInvoke(t, 4, urlParams, ivc)
assert.NoError(t, result.Error())
count = 0
}
......
......@@ -15,7 +15,6 @@
* limitations under the License.
*/
// @author yiji@apache.org
package loadbalance
import (
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package loadbalance
import (
......
......@@ -26,13 +26,15 @@ const (
)
const (
DEFAULT_LOADBALANCE = "random"
DEFAULT_RETRIES = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = 3
DEFAULT_FAILBACK_TASKS = 100
DEFAULT_LOADBALANCE = "random"
DEFAULT_RETRIES = "2"
DEFAULT_RETRIES_INT = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = "3"
DEFAULT_FAILBACK_TIMES_INT = 3
DEFAULT_FAILBACK_TASKS = 100
)
const (
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
......
......@@ -17,10 +17,20 @@
package proxy_factory
import (
"reflect"
"strings"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/proxy"
"github.com/apache/dubbo-go/protocol"
)
......@@ -51,6 +61,86 @@ func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *comm
return proxy.NewProxy(invoker, nil, attachments)
}
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
// todo: call service
return protocol.NewBaseInvoker(url)
return &ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
}
}
type ProxyInvoker struct {
protocol.BaseInvoker
}
func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
result := &protocol.RPCResult{}
result.SetAttachments(invocation.Attachments())
url := pi.GetUrl()
methodName := invocation.MethodName()
proto := url.Protocol
path := strings.TrimPrefix(url.Path, "/")
args := invocation.Arguments()
// get service
svc := common.ServiceMap.GetService(proto, path)
if svc == nil {
logger.Errorf("cannot find service [%s] in %s", path, proto)
result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto))
return result
}
// get method
method := svc.Method()[methodName]
if method == nil {
logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)
result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto))
return result
}
in := []reflect.Value{svc.Rcvr()}
if method.CtxType() != nil {
in = append(in, method.SuiteContext(nil)) // todo: ctx will be used later.
}
// prepare argv
if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" {
in = append(in, reflect.ValueOf(args))
} else {
for i := 0; i < len(args); i++ {
t := reflect.ValueOf(args[i])
if !t.IsValid() {
at := method.ArgsType()[i]
if at.Kind() == reflect.Ptr {
at = at.Elem()
}
t = reflect.New(at)
}
in = append(in, t)
}
}
// prepare replyv
var replyv reflect.Value
if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
in = append(in, replyv)
}
returnValues := method.Method().Func.Call(in)
var retErr interface{}
if len(returnValues) == 1 {
retErr = returnValues[0].Interface()
} else {
replyv = returnValues[0]
retErr = returnValues[1].Interface()
}
if retErr != nil {
result.SetError(retErr.(error))
} else {
if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
result.SetResult(replyv.Interface())
}
}
return result
}
......@@ -111,16 +111,19 @@ func WithParams(params url.Values) option {
url.Params = params
}
}
func WithParamsValue(key, val string) option {
return func(url *URL) {
url.Params.Set(key, val)
}
}
func WithProtocol(proto string) option {
return func(url *URL) {
url.Protocol = proto
}
}
func WithIp(ip string) option {
return func(url *URL) {
url.Ip = ip
......@@ -144,6 +147,7 @@ func WithLocation(location string) option {
url.Location = location
}
}
func NewURLWithOptions(opts ...option) *URL {
url := &URL{}
for _, opt := range opts {
......
......@@ -92,7 +92,7 @@ func Test_refresh(t *testing.T) {
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
......@@ -100,14 +100,14 @@ func Test_refresh(t *testing.T) {
InterfaceId: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser",
Retries: 2,
Retries: "2",
Loadbalance: "random",
},
{
InterfaceId: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser1",
Retries: 2,
Retries: "2",
Loadbalance: "random",
},
},
......@@ -118,9 +118,9 @@ func Test_refresh(t *testing.T) {
c.SetFatherConfig(father)
c.fresh()
assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol)
assert.Equal(t, int64(10), father.References["MockService"].Retries)
assert.Equal(t, "10", father.References["MockService"].Retries)
assert.Equal(t, int64(10), father.References["MockService"].Methods[0].Retries)
assert.Equal(t, "10", father.References["MockService"].Methods[0].Retries)
assert.Equal(t, &[]bool{false}[0], father.Check)
assert.Equal(t, "dubbo", father.ApplicationConfig.Name)
}
......@@ -188,7 +188,7 @@ func Test_refreshProvider(t *testing.T) {
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
......@@ -196,13 +196,13 @@ func Test_refreshProvider(t *testing.T) {
InterfaceId: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser",
Retries: 2,
Retries: "2",
Loadbalance: "random",
},
{InterfaceId: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser1",
Retries: 2,
Retries: "2",
Loadbalance: "random",
},
},
......@@ -213,9 +213,9 @@ func Test_refreshProvider(t *testing.T) {
c.SetFatherConfig(father)
c.fresh()
assert.Equal(t, "mock100", father.Registries["shanghai_reg1"].Protocol)
assert.Equal(t, int64(10), father.Services["MockService"].Retries)
assert.Equal(t, "10", father.Services["MockService"].Retries)
assert.Equal(t, int64(10), father.Services["MockService"].Methods[0].Retries)
assert.Equal(t, "10", father.Services["MockService"].Methods[0].Retries)
assert.Equal(t, "dubbo", father.ApplicationConfig.Name)
assert.Equal(t, "20001", father.Protocols["jsonrpc1"].Port)
}
......
......@@ -52,6 +52,7 @@ type ConsumerConfig struct {
Registries map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"`
References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
}
func (*ConsumerConfig) Prefix() string {
......
......@@ -24,7 +24,7 @@ type MethodConfig struct {
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
}
......
......@@ -46,6 +46,7 @@ type ProviderConfig struct {
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
}
func (*ProviderConfig) Prefix() string {
......
......@@ -47,7 +47,7 @@ type ReferenceConfig struct {
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
......@@ -154,7 +154,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, refconfig.Loadbalance)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10))
urlMap.Set(constant.RETRIES_KEY, refconfig.Retries)
urlMap.Set(constant.GROUP_KEY, refconfig.Group)
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
......@@ -180,7 +180,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
for _, v := range refconfig.Methods {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.Retries, 10))
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
}
return urlMap
......
......@@ -20,8 +20,6 @@ package config
import (
"sync"
"testing"
"github.com/apache/dubbo-go/common/constant"
)
import (
......@@ -31,6 +29,7 @@ import (
import (
"github.com/apache/dubbo-go/cluster/cluster_impl"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
......@@ -91,18 +90,18 @@ func doInit() {
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: 2,
Retries: "2",
Loadbalance: "random",
},
{
Name: "GetUser1",
Retries: 2,
Retries: "2",
Loadbalance: "random",
},
},
......
......@@ -54,7 +54,7 @@ type ServiceConfig struct {
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
unexported *atomic.Bool
exported *atomic.Bool
......@@ -160,7 +160,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, srvconfig.Loadbalance)
urlMap.Set(constant.WARMUP_KEY, srvconfig.Warmup)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10))
urlMap.Set(constant.RETRIES_KEY, srvconfig.Retries)
urlMap.Set(constant.GROUP_KEY, srvconfig.Group)
urlMap.Set(constant.VERSION_KEY, srvconfig.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
......@@ -178,7 +178,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
for _, v := range srvconfig.Methods {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.Retries, 10))
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))
}
......
......@@ -79,19 +79,19 @@ func doinit() {
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
Cluster: "failover",
Loadbalance: "random",
Retries: 3,
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: 2,
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
{
Name: "GetUser1",
Retries: 2,
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment