Skip to content
Snippets Groups Projects
Commit 4e61cdb9 authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge branch 'develop' into feature/addRouter

# Conflicts:
#	cluster/router/condition/router.go
#	common/constant/env.go
#	common/url.go
parents cd777eea ea95e04b
No related branches found
No related tags found
No related merge requests found
Showing
with 134 additions and 54 deletions
# Release Notes # Release Notes
## 1.2.0
### New Features
- Add etcdv3 registry support<https://github.com/apache/dubbo-go/pull/148>
- Add nacos registry support<https://github.com/apache/dubbo-go/pull/151>
- Add fail fast cluster support<https://github.com/apache/dubbo-go/pull/140>
- Add available cluster support<https://github.com/apache/dubbo-go/pull/155>
- Add broadcast cluster support<https://github.com/apache/dubbo-go/pull/158>
- Add forking cluster support<https://github.com/apache/dubbo-go/pull/161>
- Add service token authorization support<https://github.com/apache/dubbo-go/pull/202>
- Add accessLog filter support<https://github.com/apache/dubbo-go/pull/214>
- Add tps limit support<https://github.com/apache/dubbo-go/pull/237>
- Add execute limit support<https://github.com/apache/dubbo-go/pull/246>
- Move callService to invoker & support attachments<https://github.com/apache/dubbo-go/pull/193>
- Move example in dubbo-go project away<https://github.com/apache/dubbo-go/pull/228>
- Support dynamic config center which compatible with dubbo 2.6.x & 2.7.x and commit the zookeeper impl<https://github.com/apache/dubbo-go/pull/194>
### Enhancement
- Split gettyRPCClient.close and gettyRPCClientPool.remove in protocol/dubbo/pool.go<https://github.com/apache/dubbo-go/pull/186>
- Remove client from pool before closing it<https://github.com/apache/dubbo-go/pull/190>
- Enhance the logic for fetching the local address<https://github.com/apache/dubbo-go/pull/209>
- Add protocol_conf default values<https://github.com/apache/dubbo-go/pull/221>
- Add task pool for getty<https://github.com/apache/dubbo-go/pull/141>
- Update getty: remove read queue<https://github.com/apache/dubbo-go/pull/137>
- Clean heartbeat from PendingResponse<https://github.com/apache/dubbo-go/pull/166>
### Bugfixes
- GettyRPCClientPool remove deadlock<https://github.com/apache/dubbo-go/pull/183/files>
- Fix failover cluster bug and url parameter retries change int to string type<https://github.com/apache/dubbo-go/pull/195>
- Fix url params unsafe map<https://github.com/apache/dubbo-go/pull/201>
- Read protocol config by map key in config yaml instead of protocol name<https://github.com/apache/dubbo-go/pull/218>
- Fix dubbo group issues #238<https://github.com/apache/dubbo-go/pull/243>/<https://github.com/apache/dubbo-go/pull/244>
- Fix bug in reference_config<https://github.com/apache/dubbo-go/pull/157>
- Fix high memory bug in zookeeper listener<https://github.com/apache/dubbo-go/pull/168>
## 1.1.0 ## 1.1.0
### New Features ### New Features
......
...@@ -79,8 +79,10 @@ type rest struct { ...@@ -79,8 +79,10 @@ type rest struct {
func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result { func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result {
count++ count++
var success bool var (
var err error = nil success bool
err error
)
if count >= bi.successCount { if count >= bi.successCount {
success = true success = true
} else { } else {
......
...@@ -42,8 +42,8 @@ var ( ...@@ -42,8 +42,8 @@ var (
failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
) )
// register_failsafe register failsafeCluster to cluster extension. // registerFailsafe register failsafeCluster to cluster extension.
func register_failsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failsafeCluster := NewFailsafeCluster() failsafeCluster := NewFailsafeCluster()
...@@ -62,7 +62,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { ...@@ -62,7 +62,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := register_failsafe(t, invoker) clusterInvoker := registerFailsafe(t, invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
...@@ -81,7 +81,7 @@ func Test_FailSafeInvokeFail(t *testing.T) { ...@@ -81,7 +81,7 @@ func Test_FailSafeInvokeFail(t *testing.T) {
defer ctrl.Finish() defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := register_failsafe(t, invoker) clusterInvoker := registerFailsafe(t, invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
......
...@@ -19,7 +19,6 @@ package cluster_impl ...@@ -19,7 +19,6 @@ package cluster_impl
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
) )
...@@ -45,6 +44,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { ...@@ -45,6 +44,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
} }
} }
// Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
err := invoker.checkWhetherDestroyed() err := invoker.checkWhetherDestroyed()
if err != nil { if err != nil {
...@@ -87,14 +87,18 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro ...@@ -87,14 +87,18 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))
if err != nil { if err != nil {
return &protocol.RPCResult{ return &protocol.RPCResult{
Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but no luck to perform the invocation. Last error is: %s", selected, err.Error()))} Err: fmt.Errorf("failed to forking invoke provider %v, "+
"but no luck to perform the invocation. Last error is: %v", selected, err),
}
} }
if len(rsps) == 0 { if len(rsps) == 0 {
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but no resp", selected))} return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}
} }
result, ok := rsps[0].(protocol.Result) result, ok := rsps[0].(protocol.Result)
if !ok { if !ok {
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but not legal resp", selected))} return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
} }
return result return result
} }
...@@ -22,7 +22,8 @@ import ( ...@@ -22,7 +22,8 @@ import (
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
// Extension - Directory // Directory
//Extension - Directory
type Directory interface { type Directory interface {
common.Node common.Node
List(invocation protocol.Invocation) []protocol.Invoker List(invocation protocol.Invocation) []protocol.Invoker
......
...@@ -21,7 +21,8 @@ import ( ...@@ -21,7 +21,8 @@ import (
"github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol"
) )
// Extension - LoadBalance // LoadBalance
//Extension - LoadBalance
type LoadBalance interface { type LoadBalance interface {
Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker
} }
...@@ -36,9 +36,12 @@ import ( ...@@ -36,9 +36,12 @@ import (
) )
const ( const (
// ConsistentHash ...
ConsistentHash = "consistenthash" ConsistentHash = "consistenthash"
HashNodes = "hash.nodes" // HashNodes ...
HashArguments = "hash.arguments" HashNodes = "hash.nodes"
// HashArguments ...
HashArguments = "hash.arguments"
) )
var ( var (
......
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
) )
const ( const (
// LeastActive ...
LeastActive = "leastactive" LeastActive = "leastactive"
) )
...@@ -53,12 +54,12 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation ...@@ -53,12 +54,12 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation
} }
var ( var (
leastActive int32 = -1 // The least active value of all invokers leastActive int32 = -1 // The least active value of all invokers
totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) totalWeight int64 // The number of invokers having the same least active value (LEAST_ACTIVE)
firstWeight int64 = 0 // Initial value, used for comparison firstWeight int64 // Initial value, used for comparison
leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE) leastCount int // The number of invokers having the same least active value (LEAST_ACTIVE)
leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE)
sameWeight = true // Every invoker has the same weight value? sameWeight = true // Every invoker has the same weight value?
) )
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
......
...@@ -31,16 +31,19 @@ import ( ...@@ -31,16 +31,19 @@ import (
) )
const ( const (
// RoundRobin ...
RoundRobin = "roundrobin" RoundRobin = "roundrobin"
// COMPLETE ...
COMPLETE = 0 COMPLETE = 0
// UPDATING ...
UPDATING = 1 UPDATING = 1
) )
var ( var (
methodWeightMap sync.Map // [string]invokers methodWeightMap sync.Map // [string]invokers
state int32 = COMPLETE // update lock acquired ? state = int32(COMPLETE) // update lock acquired ?
recyclePeriod int64 = 60 * time.Second.Nanoseconds() recyclePeriod = 60 * time.Second.Nanoseconds()
) )
func init() { func init() {
......
...@@ -93,15 +93,19 @@ type rest struct { ...@@ -93,15 +93,19 @@ type rest struct {
var count int var count int
func (bi *MockInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result {
count++ count++
var success bool
var err error = nil var (
success bool
err error
)
if count >= bi.successCount { if count >= bi.successCount {
success = true success = true
} else { } else {
err = perrors.New("error") err = perrors.New("error")
} }
result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}}
return result return result
} }
......
...@@ -38,9 +38,12 @@ import ( ...@@ -38,9 +38,12 @@ import (
) )
const ( const (
//ROUTE_PATTERN route pattern regex
ROUTE_PATTERN = `([&!=,]*)\\s*([^&!=,\\s]+)` ROUTE_PATTERN = `([&!=,]*)\\s*([^&!=,\\s]+)`
// FORCE ...
FORCE = "force" FORCE = "force"
ENABLED = "enabled" ENABLED = "enabled"
// PRIORITY ...
PRIORITY = "priority" PRIORITY = "priority"
) )
...@@ -289,15 +292,15 @@ func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U ...@@ -289,15 +292,15 @@ func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U
if len(sampleValue) > 0 { if len(sampleValue) > 0 {
if !matchPair.isMatch(sampleValue, param) { if !matchPair.isMatch(sampleValue, param) {
return false, nil return false, nil
} else {
result = true
} }
result = true
} else { } else {
if !(matchPair.Matches.Empty()) { if !(matchPair.Matches.Empty()) {
return false, nil return false, nil
} else {
result = true
} }
result = true
} }
} }
return result, nil return result, nil
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center"
) )
// Environment
// There is dubbo.properties file and application level config center configuration which higner than normal config center in java. So in java the // There is dubbo.properties file and application level config center configuration which higner than normal config center in java. So in java the
// configuration sequence will be config center > application level config center > dubbo.properties > spring bean configuration. // configuration sequence will be config center > application level config center > dubbo.properties > spring bean configuration.
// But in go, neither the dubbo.properties file or application level config center configuration will not support for the time being. // But in go, neither the dubbo.properties file or application level config center configuration will not support for the time being.
...@@ -82,11 +83,11 @@ func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string ...@@ -82,11 +83,11 @@ func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string
// Configuration ... // Configuration ...
func (env *Environment) Configuration() *list.List { func (env *Environment) Configuration() *list.List {
list := list.New() cfgList := list.New()
// The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration // The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
list.PushFront(newInmemoryConfiguration(&(env.externalConfigMap))) cfgList.PushFront(newInmemoryConfiguration(&(env.externalConfigMap)))
list.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap))) cfgList.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap)))
return list return cfgList
} }
// SetDynamicConfiguration ... // SetDynamicConfiguration ...
......
...@@ -18,8 +18,11 @@ ...@@ -18,8 +18,11 @@
package constant package constant
const ( const (
// CONF_CONSUMER_FILE_PATH ...
CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH" CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH"
// CONF_PROVIDER_FILE_PATH ...
CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH" CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH"
APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" // APP_LOG_CONF_FILE ...
APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE"
CONF_ROUTER_FILE_PATH = "CONF_ROUTER_FILE_PATH" CONF_ROUTER_FILE_PATH = "CONF_ROUTER_FILE_PATH"
) )
...@@ -18,7 +18,10 @@ ...@@ -18,7 +18,10 @@
package constant package constant
const ( const (
// Version apache/dubbo-go version
Version = "1.3.0" Version = "1.3.0"
Name = "dubbogo" // Name module name
DATE = "2020/01/12" Name = "dubbogo"
// Date release date
DATE = "2020/01/12"
) )
...@@ -22,7 +22,10 @@ import ( ...@@ -22,7 +22,10 @@ import (
"github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center"
) )
const DefaultKey = "default" const (
// DefaultKey ...
DefaultKey = "default"
)
type getConfiguratorFunc func(url *common.URL) config_center.Configurator type getConfiguratorFunc func(url *common.URL) config_center.Configurator
......
...@@ -26,6 +26,7 @@ var ( ...@@ -26,6 +26,7 @@ var (
) )
/** /**
* AddCustomShutdownCallback
* you should not make any assumption about the order. * you should not make any assumption about the order.
* For example, if you have more than one callbacks, and you wish the order is: * For example, if you have more than one callbacks, and you wish the order is:
* callback1() * callback1()
......
...@@ -22,12 +22,12 @@ import ( ...@@ -22,12 +22,12 @@ import (
) )
var ( var (
proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) proxyFactories = make(map[string]func(...proxy.Option) proxy.ProxyFactory)
) )
// SetProxyFactory ... // SetProxyFactory ...
func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) {
proxy_factories[name] = f proxyFactories[name] = f
} }
// GetProxyFactory ... // GetProxyFactory ...
...@@ -35,8 +35,8 @@ func GetProxyFactory(name string) proxy.ProxyFactory { ...@@ -35,8 +35,8 @@ func GetProxyFactory(name string) proxy.ProxyFactory {
if name == "" { if name == "" {
name = "default" name = "default"
} }
if proxy_factories[name] == nil { if proxyFactories[name] == nil {
panic("proxy factory for " + name + " is not existing, make sure you have import the package.") panic("proxy factory for " + name + " is not existing, make sure you have import the package.")
} }
return proxy_factories[name]() return proxyFactories[name]()
} }
...@@ -40,7 +40,9 @@ type Proxy struct { ...@@ -40,7 +40,9 @@ type Proxy struct {
once sync.Once once sync.Once
} }
var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() var (
typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
)
// NewProxy ... // NewProxy ...
func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy { func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy {
...@@ -51,6 +53,7 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str ...@@ -51,6 +53,7 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str
} }
} }
// Implement
// proxy implement // proxy implement
// In consumer, RPCService like: // In consumer, RPCService like:
// type XxxProvider struct { // type XxxProvider struct {
......
...@@ -34,19 +34,22 @@ import ( ...@@ -34,19 +34,22 @@ import (
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
) )
// rpc service interface // RPCService
//rpc service interface
type RPCService interface { type RPCService interface {
Reference() string // rpc service id or reference id // Reference:
// rpc service id or reference id
Reference() string
} }
//AsyncCallbackService callback interface for async //AsyncCallbackService callback interface for async
type AsyncCallbackService interface { type AsyncCallbackService interface {
CallBack(response CallbackResponse) // callback // Callback: callback
CallBack(response CallbackResponse)
} }
//CallbackResponse for different protocol //CallbackResponse for different protocol
type CallbackResponse interface { type CallbackResponse interface{}
}
//AsyncCallback async callback method //AsyncCallback async callback method
type AsyncCallback func(response CallbackResponse) type AsyncCallback func(response CallbackResponse)
...@@ -55,13 +58,17 @@ type AsyncCallback func(response CallbackResponse) ...@@ -55,13 +58,17 @@ type AsyncCallback func(response CallbackResponse)
// func MethodMapper() map[string][string] { // func MethodMapper() map[string][string] {
// return map[string][string]{} // return map[string][string]{}
// } // }
const METHOD_MAPPER = "MethodMapper" const (
// METHOD_MAPPER ...
METHOD_MAPPER = "MethodMapper"
)
var ( var (
// Precompute the reflect type for error. Can't use error directly // Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying. // because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem() typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// ServiceMap ...
// todo: lowerecas? // todo: lowerecas?
ServiceMap = &serviceMap{ ServiceMap = &serviceMap{
serviceMap: make(map[string]map[string]*Service), serviceMap: make(map[string]map[string]*Service),
...@@ -226,8 +233,8 @@ func (sm *serviceMap) UnRegister(protocol, serviceId string) error { ...@@ -226,8 +233,8 @@ func (sm *serviceMap) UnRegister(protocol, serviceId string) error {
// Is this an exported - upper case - name // Is this an exported - upper case - name
func isExported(name string) bool { func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name) s, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune) return unicode.IsUpper(s)
} }
// Is this type exported or a builtin? // Is this type exported or a builtin?
......
...@@ -122,9 +122,8 @@ func TestServiceMap_UnRegister(t *testing.T) { ...@@ -122,9 +122,8 @@ func TestServiceMap_UnRegister(t *testing.T) {
func TestMethodType_SuiteContext(t *testing.T) { func TestMethodType_SuiteContext(t *testing.T) {
mt := &MethodType{ctxType: reflect.TypeOf(context.TODO())} mt := &MethodType{ctxType: reflect.TypeOf(context.TODO())}
c := context.TODO() ctx := context.WithValue(context.Background(), "key", "value")
c = context.WithValue(c, "key", "value") assert.Equal(t, reflect.ValueOf(ctx), mt.SuiteContext(ctx))
assert.Equal(t, reflect.ValueOf(c), mt.SuiteContext(c))
assert.Equal(t, reflect.Zero(mt.ctxType), mt.SuiteContext(nil)) assert.Equal(t, reflect.Zero(mt.ctxType), mt.SuiteContext(nil))
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment