Skip to content
Snippets Groups Projects
Commit 87033905 authored by vito.he's avatar vito.he
Browse files

Merge branch 'master' into develop

parents 0add5c98 39f90526
No related branches found
No related tags found
No related merge requests found
......@@ -15,6 +15,10 @@ Apache License, Version 2.0
[v1.0.0 - May 29, 2019 compatible with dubbo v2.6.5](https://github.com/apache/dubbo-go/releases/tag/v1.0.0)
[v1.1.0 - Sep 7, 2019 the first release after transferred to apache](https://github.com/apache/dubbo-go/releases/tag/v1.1.0)
[v1.2.0 - Nov 15, 2019](https://github.com/apache/dubbo-go/releases/tag/v1.2.0)
## Project Architecture ##
Both extension module and layered project architecture is according to Apache Dubbo (including protocol layer, registry layer, cluster layer, config layer and so on), the advantage of this arch is as following: you can implement these layered interfaces in your own way, override the default implementation of dubbo-go by calling 'extension.SetXXX' of extension, complete your special needs without modifying the source code. At the same time, you are welcome to contribute implementation of useful extension to the community.
......@@ -90,6 +94,8 @@ Working List:
You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap).
![feature](https://raw.githubusercontent.com/wiki/apache/dubbo-go/arch.png)
## Document
TODO
......
......@@ -14,6 +14,10 @@ Apache License, Version 2.0
[v1.0.0 - 2019年5月29日 兼容dubbo v2.6.5 版本](https://github.com/apache/dubbo-go/releases/tag/v1.0.0)
[v1.1.0 - 2019年9月7日 捐献给Apache之后的第一次release](https://github.com/apache/dubbo-go/releases/tag/v1.1.0)
[v1.2.0 - 2019年11月15日](https://github.com/apache/dubbo-go/releases/tag/v1.2.0)
## 工程架构 ##
基于dubbo的extension模块和分层的代码设计(包括 protocol layer, registry layer, cluster layer, config 等等)。我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。同时,欢迎你为社区贡献有用的拓展实现。
......@@ -89,6 +93,8 @@ Apache License, Version 2.0
你可以通过访问 [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap) 知道更多关于 dubbo-go 的信息。
![feature](https://raw.githubusercontent.com/wiki/apache/dubbo-go/arch.png)
## 文档
TODO
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package loadbalance
import (
"crypto/md5"
"encoding/json"
"fmt"
"hash/crc32"
"regexp"
"sort"
"strconv"
"strings"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
const (
ConsistentHash = "consistenthash"
HashNodes = "hash.nodes"
HashArguments = "hash.arguments"
)
var (
selectors = make(map[string]*ConsistentHashSelector)
re = regexp.MustCompile(constant.COMMA_SPLIT_PATTERN)
)
func init() {
extension.SetLoadbalance(ConsistentHash, NewConsistentHashLoadBalance)
}
type ConsistentHashLoadBalance struct {
}
func NewConsistentHashLoadBalance() cluster.LoadBalance {
return &ConsistentHashLoadBalance{}
}
func (lb *ConsistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
methodName := invocation.MethodName()
key := invokers[0].GetUrl().ServiceKey() + "." + methodName
// hash the invokers
bs := make([]byte, 0)
for _, invoker := range invokers {
b, err := json.Marshal(invoker)
if err != nil {
return nil
}
bs = append(bs, b...)
}
hashCode := crc32.ChecksumIEEE(bs)
selector, ok := selectors[key]
if !ok || selector.hashCode != hashCode {
selectors[key] = newConsistentHashSelector(invokers, methodName, hashCode)
selector = selectors[key]
}
return selector.Select(invocation)
}
type Uint32Slice []uint32
func (s Uint32Slice) Len() int {
return len(s)
}
func (s Uint32Slice) Less(i, j int) bool {
return s[i] < s[j]
}
func (s Uint32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type ConsistentHashSelector struct {
hashCode uint32
replicaNum int
virtualInvokers map[uint32]protocol.Invoker
keys Uint32Slice
argumentIndex []int
}
func newConsistentHashSelector(invokers []protocol.Invoker, methodName string,
hashCode uint32) *ConsistentHashSelector {
selector := &ConsistentHashSelector{}
selector.virtualInvokers = make(map[uint32]protocol.Invoker)
selector.hashCode = hashCode
url := invokers[0].GetUrl()
selector.replicaNum = int(url.GetMethodParamInt(methodName, HashNodes, 160))
indices := re.Split(url.GetMethodParam(methodName, HashArguments, "0"), -1)
for _, index := range indices {
i, err := strconv.Atoi(index)
if err != nil {
return nil
}
selector.argumentIndex = append(selector.argumentIndex, i)
}
for _, invoker := range invokers {
u := invoker.GetUrl()
address := u.Ip + ":" + u.Port
for i := 0; i < selector.replicaNum/4; i++ {
digest := md5.Sum([]byte(address + strconv.Itoa(i)))
for j := 0; j < 4; j++ {
key := selector.hash(digest, j)
selector.keys = append(selector.keys, key)
selector.virtualInvokers[key] = invoker
}
}
}
sort.Sort(selector.keys)
return selector
}
func (c *ConsistentHashSelector) Select(invocation protocol.Invocation) protocol.Invoker {
key := c.toKey(invocation.Arguments())
digest := md5.Sum([]byte(key))
return c.selectForKey(c.hash(digest, 0))
}
func (c *ConsistentHashSelector) toKey(args []interface{}) string {
var sb strings.Builder
for i := range c.argumentIndex {
if i >= 0 && i < len(args) {
fmt.Fprint(&sb, args[i].(string))
}
}
return sb.String()
}
func (c *ConsistentHashSelector) selectForKey(hash uint32) protocol.Invoker {
idx := sort.Search(len(c.keys), func(i int) bool {
return c.keys[i] >= hash
})
if idx == len(c.keys) {
idx = 0
}
return c.virtualInvokers[c.keys[idx]]
}
func (c *ConsistentHashSelector) hash(digest [16]byte, i int) uint32 {
return uint32((digest[3+i*4]&0xFF)<<24) | uint32((digest[2+i*4]&0xFF)<<16) |
uint32((digest[1+i*4]&0xFF)<<8) | uint32(digest[i*4]&0xFF)&0xFFFFFFF
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package loadbalance
import (
"context"
"testing"
)
import (
"github.com/stretchr/testify/suite"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestConsistentHashSelectorSuite(t *testing.T) {
suite.Run(t, new(consistentHashSelectorSuite))
}
type consistentHashSelectorSuite struct {
suite.Suite
selector *ConsistentHashSelector
}
func (s *consistentHashSelectorSuite) SetupTest() {
var invokers []protocol.Invoker
url, _ := common.NewURL(context.TODO(),
"dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
invokers = append(invokers, protocol.NewBaseInvoker(url))
s.selector = newConsistentHashSelector(invokers, "echo", 999944)
}
func (s *consistentHashSelectorSuite) TestToKey() {
result := s.selector.toKey([]interface{}{"username", "age"})
s.Equal(result, "usernameage")
}
func (s *consistentHashSelectorSuite) TestSelectForKey() {
url1, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080")
url2, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081")
s.selector.virtualInvokers = make(map[uint32]protocol.Invoker)
s.selector.virtualInvokers[99874] = protocol.NewBaseInvoker(url1)
s.selector.virtualInvokers[9999945] = protocol.NewBaseInvoker(url2)
s.selector.keys = []uint32{99874, 9999945}
result := s.selector.selectForKey(9999944)
s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?")
}
func TestConsistentHashLoadBalanceSuite(t *testing.T) {
suite.Run(t, new(consistentHashLoadBalanceSuite))
}
type consistentHashLoadBalanceSuite struct {
suite.Suite
url1 common.URL
url2 common.URL
url3 common.URL
invokers []protocol.Invoker
invoker1 protocol.Invoker
invoker2 protocol.Invoker
invoker3 protocol.Invoker
lb cluster.LoadBalance
}
func (s *consistentHashLoadBalanceSuite) SetupTest() {
var err error
s.url1, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.url2, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.url3, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.invoker1 = protocol.NewBaseInvoker(s.url1)
s.invoker2 = protocol.NewBaseInvoker(s.url2)
s.invoker3 = protocol.NewBaseInvoker(s.url3)
s.invokers = append(s.invokers, s.invoker1, s.invoker2, s.invoker3)
s.lb = NewConsistentHashLoadBalance()
}
func (s *consistentHashLoadBalanceSuite) TestSelect() {
args := []interface{}{"name", "password", "age"}
invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080")
args = []interface{}{"ok", "abc"}
invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082")
}
......@@ -67,3 +67,7 @@ const (
APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators"
PROVIDER_CATEGORY = "providers"
)
const (
COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*"
)
......@@ -134,14 +134,14 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR
newUrlStr = newUrlStr + v
url, err := common.NewURL(context.Background(), newUrlStr)
if err != nil {
perrors.WithStack(err)
return nil, perrors.WithStack(err)
}
urls = append(urls, &url)
}
} else {
url, err := common.NewURL(context.Background(), urlStr)
if err != nil {
perrors.WithStack(err)
return nil, perrors.WithStack(err)
}
urls = append(urls, &url)
}
......
......@@ -52,3 +52,5 @@ require (
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
)
go 1.13
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment