Skip to content
Snippets Groups Projects
Commit 02716fdd authored by pantianying's avatar pantianying
Browse files

Merge branch 'apache-develop' into develop_req_timeout_config

# Conflicts:
#	config/method_config.go
#	config/reference_config.go
parents 412bafb4 0de1d89d
No related branches found
No related tags found
No related merge requests found
Showing
with 461 additions and 20 deletions
...@@ -15,6 +15,10 @@ Apache License, Version 2.0 ...@@ -15,6 +15,10 @@ Apache License, Version 2.0
[v1.0.0 - May 29, 2019 compatible with dubbo v2.6.5](https://github.com/apache/dubbo-go/releases/tag/v1.0.0) [v1.0.0 - May 29, 2019 compatible with dubbo v2.6.5](https://github.com/apache/dubbo-go/releases/tag/v1.0.0)
[v1.1.0 - Sep 7, 2019 the first release after transferred to apache](https://github.com/apache/dubbo-go/releases/tag/v1.1.0)
[v1.2.0 - Nov 15, 2019](https://github.com/apache/dubbo-go/releases/tag/v1.2.0)
## Project Architecture ## ## Project Architecture ##
Both extension module and layered project architecture is according to Apache Dubbo (including protocol layer, registry layer, cluster layer, config layer and so on), the advantage of this arch is as following: you can implement these layered interfaces in your own way, override the default implementation of dubbo-go by calling 'extension.SetXXX' of extension, complete your special needs without modifying the source code. At the same time, you are welcome to contribute implementation of useful extension to the community. Both extension module and layered project architecture is according to Apache Dubbo (including protocol layer, registry layer, cluster layer, config layer and so on), the advantage of this arch is as following: you can implement these layered interfaces in your own way, override the default implementation of dubbo-go by calling 'extension.SetXXX' of extension, complete your special needs without modifying the source code. At the same time, you are welcome to contribute implementation of useful extension to the community.
...@@ -90,13 +94,15 @@ Working List: ...@@ -90,13 +94,15 @@ Working List:
You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap). You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap).
![feature](https://raw.githubusercontent.com/wiki/apache/dubbo-go/arch.png)
## Document ## Document
TODO https://dubbogo.github.io/dubbo-go-website(**Improving**)
## Quick Start ## Quick Start
[dubbogo-samples](https://github.com/dubbogo/dubbogo-samples) shows how to use dubbo-go. Please read the [dubbogo-samples/README.md](https://github.com/dubbogo/dubbogo-samples/blob/master/README.md) carefully to learn how to dispose the configuration and compile the program. [dubbo-samples/golang](https://github.com/dubbogo/dubbo-samples) shows how to use dubbo-go. Please read the [dubbo-samples/golang/README.md](https://github.com/dubbogo/dubbo-samples/blob/master/golang/README.md) carefully to learn how to dispose the configuration and compile the program.
## Running unit tests ## Running unit tests
......
...@@ -14,6 +14,10 @@ Apache License, Version 2.0 ...@@ -14,6 +14,10 @@ Apache License, Version 2.0
[v1.0.0 - 2019年5月29日 兼容dubbo v2.6.5 版本](https://github.com/apache/dubbo-go/releases/tag/v1.0.0) [v1.0.0 - 2019年5月29日 兼容dubbo v2.6.5 版本](https://github.com/apache/dubbo-go/releases/tag/v1.0.0)
[v1.1.0 - 2019年9月7日 捐献给Apache之后的第一次release](https://github.com/apache/dubbo-go/releases/tag/v1.1.0)
[v1.2.0 - 2019年11月15日](https://github.com/apache/dubbo-go/releases/tag/v1.2.0)
## 工程架构 ## ## 工程架构 ##
基于dubbo的extension模块和分层的代码设计(包括 protocol layer, registry layer, cluster layer, config 等等)。我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。同时,欢迎你为社区贡献有用的拓展实现。 基于dubbo的extension模块和分层的代码设计(包括 protocol layer, registry layer, cluster layer, config 等等)。我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。同时,欢迎你为社区贡献有用的拓展实现。
...@@ -89,13 +93,15 @@ Apache License, Version 2.0 ...@@ -89,13 +93,15 @@ Apache License, Version 2.0
你可以通过访问 [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap) 知道更多关于 dubbo-go 的信息。 你可以通过访问 [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap) 知道更多关于 dubbo-go 的信息。
![feature](https://raw.githubusercontent.com/wiki/apache/dubbo-go/arch.png)
## 文档 ## 文档
TODO https://dubbogo.github.io/dubbo-go-website(**完善中**)
## 快速开始 ## ## 快速开始 ##
[dubbogo-samples](https://github.com/dubbogo/dubbogo-samples)这个项目的事例展示了如何使用 dubbo-go 。请仔细阅读 [dubbogo-samples/README.md](https://github.com/dubbogo/dubbogo-samples/blob/master/README.md) 学习如何处理配置并编译程序。 [dubbo-samples/golang](https://github.com/dubbogo/dubbo-samples)这个项目的事例展示了如何使用 dubbo-go 。请仔细阅读 [dubbo-samples/golang/README.md](https://github.com/dubbogo/dubbo-samples/blob/master/golang/README.md) 学习如何处理配置并编译程序。
## 运行单测 ## 运行单测
......
...@@ -35,6 +35,7 @@ type baseClusterInvoker struct { ...@@ -35,6 +35,7 @@ type baseClusterInvoker struct {
directory cluster.Directory directory cluster.Directory
availablecheck bool availablecheck bool
destroyed *atomic.Bool destroyed *atomic.Bool
stickyInvoker protocol.Invoker
} }
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
...@@ -56,7 +57,9 @@ func (invoker *baseClusterInvoker) Destroy() { ...@@ -56,7 +57,9 @@ func (invoker *baseClusterInvoker) Destroy() {
} }
func (invoker *baseClusterInvoker) IsAvailable() bool { func (invoker *baseClusterInvoker) IsAvailable() bool {
//TODO:sticky connection if invoker.stickyInvoker != nil {
return invoker.stickyInvoker.IsAvailable()
}
return invoker.directory.IsAvailable() return invoker.directory.IsAvailable()
} }
...@@ -83,15 +86,42 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { ...@@ -83,15 +86,42 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
} }
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
//todo:sticky connect
var selectedInvoker protocol.Invoker
url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have
sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)
if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
invoker.stickyInvoker = nil
}
if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {
return invoker.stickyInvoker
}
}
selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
}
return selectedInvoker
}
func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 1 { if len(invokers) == 1 {
return invokers[0] return invokers[0]
} }
selectedInvoker := lb.Select(invokers, invocation) selectedInvoker := lb.Select(invokers, invocation)
//judge to if the selectedInvoker is invoked //judge to if the selectedInvoker is invoked
if !selectedInvoker.IsAvailable() || !invoker.availablecheck || isInvoked(selectedInvoker, invoked) { if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
// do reselect // do reselect
var reslectInvokers []protocol.Invoker var reslectInvokers []protocol.Invoker
...@@ -106,13 +136,12 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p ...@@ -106,13 +136,12 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
} }
if len(reslectInvokers) > 0 { if len(reslectInvokers) > 0 {
return lb.Select(reslectInvokers, invocation) selectedInvoker = lb.Select(reslectInvokers, invocation)
} else { } else {
return nil return nil
} }
} }
return selectedInvoker return selectedInvoker
} }
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"context"
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func Test_StickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
assert.Equal(t, result, result1)
}
func Test_StickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
invoked = append(invoked, result)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
assert.NotEqual(t, result, result1)
}
...@@ -67,7 +67,7 @@ func Test_FailbackSuceess(t *testing.T) { ...@@ -67,7 +67,7 @@ func Test_FailbackSuceess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
......
...@@ -64,7 +64,7 @@ func Test_FailfastInvokeSuccess(t *testing.T) { ...@@ -64,7 +64,7 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker) clusterInvoker := registerFailfast(t, invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl) invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
...@@ -84,7 +84,7 @@ func Test_FailfastInvokeFail(t *testing.T) { ...@@ -84,7 +84,7 @@ func Test_FailfastInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(t, invoker) clusterInvoker := registerFailfast(t, invoker)
invoker.EXPECT().GetUrl().Return(failfastUrl) invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()
mockResult := &protocol.RPCResult{Err: perrors.New("error")} mockResult := &protocol.RPCResult{Err: perrors.New("error")}
......
...@@ -64,7 +64,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { ...@@ -64,7 +64,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := register_failsafe(t, invoker) clusterInvoker := register_failsafe(t, invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
...@@ -83,7 +83,7 @@ func Test_FailSafeInvokeFail(t *testing.T) { ...@@ -83,7 +83,7 @@ func Test_FailSafeInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl) invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := register_failsafe(t, invoker) clusterInvoker := register_failsafe(t, invoker)
invoker.EXPECT().GetUrl().Return(failsafeUrl) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()
mockResult := &protocol.RPCResult{Err: perrors.New("error")} mockResult := &protocol.RPCResult{Err: perrors.New("error")}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package loadbalance
import (
"crypto/md5"
"encoding/json"
"fmt"
"hash/crc32"
"regexp"
"sort"
"strconv"
"strings"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
const (
ConsistentHash = "consistenthash"
HashNodes = "hash.nodes"
HashArguments = "hash.arguments"
)
var (
selectors = make(map[string]*ConsistentHashSelector)
re = regexp.MustCompile(constant.COMMA_SPLIT_PATTERN)
)
func init() {
extension.SetLoadbalance(ConsistentHash, NewConsistentHashLoadBalance)
}
type ConsistentHashLoadBalance struct {
}
func NewConsistentHashLoadBalance() cluster.LoadBalance {
return &ConsistentHashLoadBalance{}
}
func (lb *ConsistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
methodName := invocation.MethodName()
key := invokers[0].GetUrl().ServiceKey() + "." + methodName
// hash the invokers
bs := make([]byte, 0)
for _, invoker := range invokers {
b, err := json.Marshal(invoker)
if err != nil {
return nil
}
bs = append(bs, b...)
}
hashCode := crc32.ChecksumIEEE(bs)
selector, ok := selectors[key]
if !ok || selector.hashCode != hashCode {
selectors[key] = newConsistentHashSelector(invokers, methodName, hashCode)
selector = selectors[key]
}
return selector.Select(invocation)
}
type Uint32Slice []uint32
func (s Uint32Slice) Len() int {
return len(s)
}
func (s Uint32Slice) Less(i, j int) bool {
return s[i] < s[j]
}
func (s Uint32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type ConsistentHashSelector struct {
hashCode uint32
replicaNum int
virtualInvokers map[uint32]protocol.Invoker
keys Uint32Slice
argumentIndex []int
}
func newConsistentHashSelector(invokers []protocol.Invoker, methodName string,
hashCode uint32) *ConsistentHashSelector {
selector := &ConsistentHashSelector{}
selector.virtualInvokers = make(map[uint32]protocol.Invoker)
selector.hashCode = hashCode
url := invokers[0].GetUrl()
selector.replicaNum = int(url.GetMethodParamInt(methodName, HashNodes, 160))
indices := re.Split(url.GetMethodParam(methodName, HashArguments, "0"), -1)
for _, index := range indices {
i, err := strconv.Atoi(index)
if err != nil {
return nil
}
selector.argumentIndex = append(selector.argumentIndex, i)
}
for _, invoker := range invokers {
u := invoker.GetUrl()
address := u.Ip + ":" + u.Port
for i := 0; i < selector.replicaNum/4; i++ {
digest := md5.Sum([]byte(address + strconv.Itoa(i)))
for j := 0; j < 4; j++ {
key := selector.hash(digest, j)
selector.keys = append(selector.keys, key)
selector.virtualInvokers[key] = invoker
}
}
}
sort.Sort(selector.keys)
return selector
}
func (c *ConsistentHashSelector) Select(invocation protocol.Invocation) protocol.Invoker {
key := c.toKey(invocation.Arguments())
digest := md5.Sum([]byte(key))
return c.selectForKey(c.hash(digest, 0))
}
func (c *ConsistentHashSelector) toKey(args []interface{}) string {
var sb strings.Builder
for i := range c.argumentIndex {
if i >= 0 && i < len(args) {
fmt.Fprint(&sb, args[i].(string))
}
}
return sb.String()
}
func (c *ConsistentHashSelector) selectForKey(hash uint32) protocol.Invoker {
idx := sort.Search(len(c.keys), func(i int) bool {
return c.keys[i] >= hash
})
if idx == len(c.keys) {
idx = 0
}
return c.virtualInvokers[c.keys[idx]]
}
func (c *ConsistentHashSelector) hash(digest [16]byte, i int) uint32 {
return uint32((digest[3+i*4]&0xFF)<<24) | uint32((digest[2+i*4]&0xFF)<<16) |
uint32((digest[1+i*4]&0xFF)<<8) | uint32(digest[i*4]&0xFF)&0xFFFFFFF
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package loadbalance
import (
"context"
"testing"
)
import (
"github.com/stretchr/testify/suite"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestConsistentHashSelectorSuite(t *testing.T) {
suite.Run(t, new(consistentHashSelectorSuite))
}
type consistentHashSelectorSuite struct {
suite.Suite
selector *ConsistentHashSelector
}
func (s *consistentHashSelectorSuite) SetupTest() {
var invokers []protocol.Invoker
url, _ := common.NewURL(context.TODO(),
"dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
invokers = append(invokers, protocol.NewBaseInvoker(url))
s.selector = newConsistentHashSelector(invokers, "echo", 999944)
}
func (s *consistentHashSelectorSuite) TestToKey() {
result := s.selector.toKey([]interface{}{"username", "age"})
s.Equal(result, "usernameage")
}
func (s *consistentHashSelectorSuite) TestSelectForKey() {
url1, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080")
url2, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081")
s.selector.virtualInvokers = make(map[uint32]protocol.Invoker)
s.selector.virtualInvokers[99874] = protocol.NewBaseInvoker(url1)
s.selector.virtualInvokers[9999945] = protocol.NewBaseInvoker(url2)
s.selector.keys = []uint32{99874, 9999945}
result := s.selector.selectForKey(9999944)
s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?")
}
func TestConsistentHashLoadBalanceSuite(t *testing.T) {
suite.Run(t, new(consistentHashLoadBalanceSuite))
}
type consistentHashLoadBalanceSuite struct {
suite.Suite
url1 common.URL
url2 common.URL
url3 common.URL
invokers []protocol.Invoker
invoker1 protocol.Invoker
invoker2 protocol.Invoker
invoker3 protocol.Invoker
lb cluster.LoadBalance
}
func (s *consistentHashLoadBalanceSuite) SetupTest() {
var err error
s.url1, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.url2, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.url3, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.invoker1 = protocol.NewBaseInvoker(s.url1)
s.invoker2 = protocol.NewBaseInvoker(s.url2)
s.invoker3 = protocol.NewBaseInvoker(s.url3)
s.invokers = append(s.invokers, s.invoker1, s.invoker2, s.invoker3)
s.lb = NewConsistentHashLoadBalance()
}
func (s *consistentHashLoadBalanceSuite) TestSelect() {
args := []interface{}{"name", "password", "age"}
invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080")
args = []interface{}{"ok", "abc"}
invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082")
}
...@@ -67,3 +67,7 @@ const ( ...@@ -67,3 +67,7 @@ const (
APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators" APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators"
PROVIDER_CATEGORY = "providers" PROVIDER_CATEGORY = "providers"
) )
const (
COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*"
)
...@@ -55,6 +55,7 @@ const ( ...@@ -55,6 +55,7 @@ const (
WEIGHT_KEY = "weight" WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup" WARMUP_KEY = "warmup"
RETRIES_KEY = "retries" RETRIES_KEY = "retries"
STICKY_KEY = "sticky"
BEAN_NAME = "bean.name" BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks" FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks" FORKS_KEY = "forks"
......
...@@ -181,3 +181,7 @@ func (p *Proxy) Implement(v common.RPCService) { ...@@ -181,3 +181,7 @@ func (p *Proxy) Implement(v common.RPCService) {
func (p *Proxy) Get() common.RPCService { func (p *Proxy) Get() common.RPCService {
return p.rpc return p.rpc
} }
func (p *Proxy) GetCallback() interface{} {
return p.callBack
}
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
type ProxyFactory interface { type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy
GetInvoker(url common.URL) protocol.Invoker GetInvoker(url common.URL) protocol.Invoker
} }
......
...@@ -55,11 +55,16 @@ func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { ...@@ -55,11 +55,16 @@ func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory {
return &DefaultProxyFactory{} return &DefaultProxyFactory{}
} }
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy {
return factory.GetAsyncProxy(invoker, nil, url)
}
func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy {
//create proxy //create proxy
attachments := map[string]string{} attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
return proxy.NewProxy(invoker, nil, attachments) return proxy.NewProxy(invoker, callBack, attachments)
} }
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
return &ProxyInvoker{ return &ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url), BaseInvoker: *protocol.NewBaseInvoker(url),
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package proxy_factory package proxy_factory
import ( import (
"fmt"
"testing" "testing"
) )
...@@ -37,6 +38,21 @@ func Test_GetProxy(t *testing.T) { ...@@ -37,6 +38,21 @@ func Test_GetProxy(t *testing.T) {
assert.NotNil(t, proxy) assert.NotNil(t, proxy)
} }
type TestAsync struct {
}
func (u *TestAsync) CallBack(res common.CallbackResponse) {
fmt.Println("CallBack res:", res)
}
func Test_GetAsyncProxy(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions()
async := &TestAsync{}
proxy := proxyFactory.GetAsyncProxy(protocol.NewBaseInvoker(*url), async.CallBack, url)
assert.NotNil(t, proxy)
}
func Test_GetInvoker(t *testing.T) { func Test_GetInvoker(t *testing.T) {
proxyFactory := NewDefaultProxyFactory() proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions() url := common.NewURLWithOptions()
......
...@@ -39,6 +39,18 @@ type RPCService interface { ...@@ -39,6 +39,18 @@ type RPCService interface {
Reference() string // rpc service id or reference id Reference() string // rpc service id or reference id
} }
//AsyncCallbackService callback interface for async
type AsyncCallbackService interface {
CallBack(response CallbackResponse) // callback
}
//CallbackResponse for different protocol
type CallbackResponse interface {
}
//AsyncCallback async callback method
type AsyncCallback func(response CallbackResponse)
// for lowercase func // for lowercase func
// func MethodMapper() map[string][string] { // func MethodMapper() map[string][string] {
// return map[string][string]{} // return map[string][string]{}
......
...@@ -447,6 +447,11 @@ func (c URL) GetMethodParam(method string, key string, d string) string { ...@@ -447,6 +447,11 @@ func (c URL) GetMethodParam(method string, key string, d string) string {
return r return r
} }
func (c URL) GetMethodParamBool(method string, key string, d bool) bool {
r := c.GetParamBool("methods."+method+"."+key, d)
return r
}
func (c *URL) RemoveParams(set *gxset.HashSet) { func (c *URL) RemoveParams(set *gxset.HashSet) {
c.paramsLock.Lock() c.paramsLock.Lock()
defer c.paramsLock.Unlock() defer c.paramsLock.Unlock()
......
...@@ -217,6 +217,18 @@ func TestURL_GetMethodParam(t *testing.T) { ...@@ -217,6 +217,18 @@ func TestURL_GetMethodParam(t *testing.T) {
assert.Equal(t, "1s", v) assert.Equal(t, "1s", v)
} }
func TestURL_GetMethodParamBool(t *testing.T) {
params := url.Values{}
params.Set("methods.GetValue.async", "true")
u := URL{baseUrl: baseUrl{params: params}}
v := u.GetMethodParamBool("GetValue", "async", false)
assert.Equal(t, true, v)
u = URL{}
v = u.GetMethodParamBool("GetValue2", "async", false)
assert.Equal(t, false, v)
}
func TestMergeUrl(t *testing.T) { func TestMergeUrl(t *testing.T) {
referenceUrlParams := url.Values{} referenceUrlParams := url.Values{}
referenceUrlParams.Set(constant.CLUSTER_KEY, "random") referenceUrlParams.Set(constant.CLUSTER_KEY, "random")
......
...@@ -36,6 +36,7 @@ type MethodConfig struct { ...@@ -36,6 +36,7 @@ type MethodConfig struct {
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
} }
......
...@@ -60,6 +60,7 @@ type ReferenceConfig struct { ...@@ -60,6 +60,7 @@ type ReferenceConfig struct {
invoker protocol.Invoker invoker protocol.Invoker
urls []*common.URL urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
} }
...@@ -169,10 +170,6 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { ...@@ -169,10 +170,6 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
if len(refconfig.RequestTimeout) != 0 {
urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout)
}
//getty invoke async or sync //getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
...@@ -195,7 +192,8 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { ...@@ -195,7 +192,8 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
for _, v := range refconfig.Methods { for _, v := range refconfig.Methods {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
if v.RequestTimeout != "" { urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
if len(v.RequestTimeout) != 0 {
urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout) urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
} }
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment