Skip to content
Snippets Groups Projects
Commit 5de3b9a4 authored by vito.he's avatar vito.he
Browse files

Add:2.6.0 random loadbalance & failover cluster finish

parent 4fb0a65d
No related branches found
No related tags found
No related merge requests found
......@@ -5,3 +5,4 @@ import "github.com/dubbo/dubbo-go/protocol"
type Cluster interface {
Join(Directory) protocol.Invoker
}
package cluster
import (
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
"time"
)
// Extension - LoadBalance
type LoadBalance interface {
Select()
Select([]protocol.Invoker, config.URL, protocol.Invocation) protocol.Invoker
}
func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 {
url := invoker.GetUrl().(*config.URL)
weight := url.GetMethodParamInt(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
if weight > 0 {
//get service register time an do warm up time
now := time.Now().Unix()
timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now)
if uptime := now - timestamp; uptime > 0 {
warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP)
if uptime < warmup {
if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 {
weight = 1
} else if int64(ww) <= weight {
weight = int64(ww)
}
}
}
}
return weight
}
package loadbalance
import (
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
"math/rand"
)
const name = "random"
func init() {
extension.SetLoadbalance(name, NewRandomLoadBalance)
}
type RandomLoadBalance struct {
}
func NewRandomLoadBalance() cluster.LoadBalance {
return &RandomLoadBalance{}
}
func (lb *RandomLoadBalance) Select(invokers []protocol.Invoker, url config.URL, invocation protocol.Invocation) protocol.Invoker {
var length int
if length = len(invokers); length == 1 {
return invokers[0]
}
sameWeight := true
weights := make([]int64, length)
firstWeight := cluster.GetWeight(invokers[0], invocation)
totalWeight := firstWeight
weights[0] = firstWeight
for i := 1; i < length; i++ {
weight := cluster.GetWeight(invokers[i], invocation)
weights[i] = weight
totalWeight += weight
if sameWeight && weight != firstWeight {
sameWeight = false
}
}
if totalWeight > 0 && !sameWeight {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
offset := rand.Int63n(totalWeight)
for i := 0; i < length; i++ {
offset -= weights[i]
if offset < 0 {
return invokers[i]
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers[rand.Intn(length)]
}
package cluster
import (
gxnet "github.com/AlexStocks/goext/net"
"github.com/dubbo/dubbo-go/version"
jerrors "github.com/juju/errors"
"github.com/tevino/abool"
)
import (
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
type baseClusterInvoker struct {
......@@ -18,7 +22,7 @@ type baseClusterInvoker struct {
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: false,
availablecheck: true,
destroyed: abool.NewBool(false),
}
}
......@@ -37,3 +41,69 @@ func (invoker *baseClusterInvoker) IsAvailable() bool {
//TODO:不理解java版本中关于stikyInvoker的逻辑所以先不写
return invoker.directory.IsAvailable()
}
//check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip, _ := gxnet.GetLocalIP()
return jerrors.Errorf("Failed to invoke the method %v . No provider available for the service %v from"+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetUrl().Key(), invoker.directory.GetUrl().String(), ip, version.Version)
}
return nil
}
//check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.IsSet() {
ip, _ := gxnet.GetLocalIP()
return jerrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetUrl().(*config.URL).Service, ip, version.Version)
}
return nil
}
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
//todo:ticky connect 粘纸连接
if len(invokers) == 1 {
return invokers[0]
}
selectedInvoker := lb.Select(invokers, *invoker.GetUrl().(*config.URL), invocation)
//judge to if the selectedInvoker is invoked
if !selectedInvoker.IsAvailable() || !invoker.availablecheck || isInvoked(selectedInvoker, invoked) {
// do reselect
var reslectInvokers []protocol.Invoker
for _, invoker := range invokers {
if !invoker.IsAvailable() {
continue
}
if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}
if len(reslectInvokers) > 0 {
return lb.Select(reslectInvokers, *invoker.GetUrl().(*config.URL), invocation)
} else {
return nil
}
}
return selectedInvoker
}
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
if len(invoked) > 0 {
for _, i := range invoked {
if i == selectedInvoker {
return true
}
}
}
return false
}
package cluster
import (
gxnet "github.com/AlexStocks/goext/net"
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
"github.com/dubbo/dubbo-go/version"
jerrors "github.com/juju/errors"
)
type failoverClusterInvoker struct {
......@@ -16,7 +22,64 @@ func NewFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
invokers[0].GetUrl()
return &protocol.RPCResult{}
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
url := invokers[0].GetUrl().(*config.URL)
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
//get reties
retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, constant.DEFAULT_RETRIES); v != 0 {
retries = v
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
for i := int64(0); i < retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
err := invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
} else {
return result
}
}
ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: jerrors.Errorf("Failed to invoke the method %v in the service %v . Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invocation, invoker.GetUrl().(*config.URL).Service, retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error().Error(),
)}
}
package constant
const (
DEFAULT_WEIGHT = 100 //
DEFAULT_WARMUP = 10 * 60 * 1000
)
const (
DEFAULT_LOADBALANCE = "random"
DEFAULT_RETRIES = 2
)
......@@ -5,14 +5,25 @@ const (
)
const (
GROUP_KEY = "group"
VERSION_KEY = "Version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
)
const (
SERVICE_FILTER_KEY = "service.filter"
REFERENCE_FILTER_KEY = "reference.filter"
)
const (
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
)
package extension
import "github.com/dubbo/dubbo-go/cluster"
var (
loadbalances = make(map[string]func() cluster.LoadBalance)
)
func SetLoadbalance(name string, fcn func() cluster.LoadBalance) {
loadbalances[name] = fcn
}
func GetLoadbalance(name string) cluster.LoadBalance {
return loadbalances[name]()
}
......@@ -18,6 +18,9 @@ type IURL interface {
Key() string
URLEqual(IURL) bool
Context() context.Context
GetParam(string, string) string
GetParamInt(string, int64) int64
String() string
}
type baseUrl struct {
......@@ -37,22 +40,13 @@ type URL struct {
Path string `yaml:"path" json:"path,omitempty"` // like /com.ikurento.dubbo.UserProvider3
Weight int32
Methods string `yaml:"methods" json:"methods,omitempty"`
//Weight int32
Version string `yaml:"version" json:"version,omitempty"`
Group string `yaml:"group" json:"group,omitempty"`
Username string
Password string
//reference only
Cluster string
}
type method struct {
Name string
Retries int
Methods []string
}
func NewURL(ctx context.Context, urlString string) (*URL, error) {
......@@ -111,7 +105,7 @@ func NewURL(ctx context.Context, urlString string) (*URL, error) {
}
func (c *URL) Key() string {
return fmt.Sprintf("%s@%s-%s-%s-%s-%s", c.Service, c.Protocol, c.Group, c.Location, c.Version, c.Methods)
return fmt.Sprintf("%s@%s-%s-%s-%s", c.Service, c.Protocol, c.Group, c.Location, c.Version)
}
func (c *URL) URLEqual(url IURL) bool {
......@@ -124,17 +118,51 @@ func (c *URL) URLEqual(url IURL) bool {
func (c URL) String() string {
return fmt.Sprintf(
"DefaultServiceURL{Protocol:%s, Location:%s, Path:%s, Ip:%s, Port:%s, "+
"Timeout:%s, Version:%s, Group:%s, Weight_:%d, Params:%+v}",
"Timeout:%s, Version:%s, Group:%s, Params:%+v}",
c.Protocol, c.Location, c.Path, c.Ip, c.Port,
c.Timeout, c.Version, c.Group, c.Weight, c.Params)
c.Timeout, c.Version, c.Group, c.Params)
}
func (c *URL) ToFullString() string {
return fmt.Sprintf(
"%s://%s:%s@%s:%s/%s?%s&%s&%s&%s",
c.Protocol, c.Password, c.Username, c.Ip, c.Port, c.Path, c.Methods, c.Version, c.Group, c.Params)
"%s://%s:%s@%s:%s/%s?%s&%s&%s",
c.Protocol, c.Password, c.Username, c.Ip, c.Port, c.Path, c.Version, c.Group, c.Params)
}
func (c *URL) Context() context.Context {
return c.ctx
}
func (c *URL) GetParam(s string, d string) string {
var r string
if r = c.Params.Get(s); r == "" {
r = d
}
return r
}
func (c *URL) GetParamInt(s string, d int64) int64 {
var r int
var err error
if r, err = strconv.Atoi(c.Params.Get(s)); r == 0 || err != nil {
return d
}
return int64(r)
}
func (c *URL) GetMethodParamInt(method string, key string, d int64) int64 {
var r int
var err error
if r, err = strconv.Atoi(c.Params.Get("methods." + method + "." + key)); r == 0 || err != nil {
return d
}
return int64(r)
}
func (c *URL) GetMethodParam(method string, key string, d string) string {
var r string
if r = c.Params.Get(c.Params.Get("methods." + method + "." + key)); r == "" {
r = d
}
return r
}
......@@ -127,3 +127,24 @@ func (c *RegistryURL) URLEqual(url IURL) bool {
func (c *RegistryURL) Context() context.Context {
return c.ctx
}
func (c *RegistryURL) GetParam(s string, d string) string {
var r string
if r = c.Params.Get(s); r == "" {
r = d
}
return r
}
func (c *RegistryURL) GetParamInt(s string, d int64) int64 {
var r int
var err error
if r, err = strconv.Atoi(c.Params.Get(s)); r == 0 || err != nil {
return d
}
return int64(r)
}
func (c *RegistryURL) String() string {
return ""
}
......@@ -3,6 +3,7 @@ package support
import (
"context"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
import (
......@@ -21,7 +22,8 @@ type ReferenceConfig struct {
Registries []referenceConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
Methods []method `yaml:"methods" json:"methods,omitempty"`
URLs []config.URL `yaml:"-"`
URLs []config.URL `yaml:"-"`
invoker protocol.Invoker
}
type referenceConfigRegistry struct {
string
......@@ -43,10 +45,11 @@ func (refconfig *ReferenceConfig) CreateProxy() {
urls := refconfig.loadRegistries()
if len(urls) == 1 {
refprotocol.Refer(urls[0])
refconfig.invoker = refprotocol.Refer(urls[0])
} else {
//TODO:multi registries
}
//TODO:invoker yincheng 's proxy
}
func (refconfig *ReferenceConfig) loadRegistries() []*config.RegistryURL {
......
package protocol
package protocolwrapper
import (
"strings"
......
package directory
import (
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/protocol/protocolwrapper"
"github.com/dubbo/dubbo-go/registry"
protocol2 "github.com/dubbo/dubbo-go/registry/protocol"
"sync"
......@@ -155,7 +157,7 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap sync.Map) []protoco
} else {
for _, invokers := range groupInvokersMap {
staticDir := directory.NewStaticDirectory(invokers)
cluster := extension.GetCluster(dir.GetUrl().(*config.RegistryURL).URL.Cluster)
cluster := extension.GetCluster(dir.GetUrl().(*config.RegistryURL).URL.Params.Get(constant.CLUSTER_KEY))
groupInvokersList = append(groupInvokersList, cluster.Join(staticDir))
}
}
......@@ -180,7 +182,7 @@ func (dir *RegistryDirectory) cacheInvoker(url config.URL) sync.Map {
if _, ok := newCacheInvokers.Load(url.ToFullString()); !ok {
log.Debug("service will be added in cache invokers: invokers key is %s!", url.ToFullString())
newInvoker := extension.GetProtocolExtension(url.Protocol).Refer(&url)
newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(&url)
newCacheInvokers.Store(url.ToFullString(), newInvoker)
}
}
......@@ -205,9 +207,41 @@ func (dir *RegistryDirectory) Destroy() {
// in this function we should merge the reference local url config into the service url from registry.
//TODO configuration merge, in the future , the configuration center's config should merge too.
func mergeUrl(serviceUrl config.URL, referenceUrl config.URL) config.URL {
mergedUrl := serviceUrl
var methodConfigMergeFcn = []func(method string){}
//loadBalance strategy config
if v := referenceUrl.Params.Get(constant.LOADBALANCE_KEY); v != "" {
mergedUrl.Params.Set(constant.LOADBALANCE_KEY, v)
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceUrl.Params.Get(method + "." + constant.LOADBALANCE_KEY); v != "" {
mergedUrl.Params.Set(method+"."+constant.LOADBALANCE_KEY, v)
}
})
//cluster strategy config
if v := referenceUrl.Params.Get(constant.CLUSTER_KEY); v != "" {
mergedUrl.Params.Set(constant.CLUSTER_KEY, v)
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceUrl.Params.Get(method + "." + constant.CLUSTER_KEY); v != "" {
mergedUrl.Params.Set(method+"."+constant.CLUSTER_KEY, v)
}
})
//remote timestamp
if v := serviceUrl.Params.Get(constant.TIMESTAMP_KEY); v != "" {
mergedUrl.Params.Set(constant.REMOTE_TIMESTAMP_KEY, v)
mergedUrl.Params.Set(constant.TIMESTAMP_KEY, referenceUrl.Params.Get(constant.TIMESTAMP_KEY))
}
//finally execute methodConfigMergeFcn
for _, method := range referenceUrl.Methods {
for _, fcn := range methodConfigMergeFcn {
fcn("methods." + method)
}
}
return serviceUrl
}
package protocol
import (
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/registry"
directory2 "github.com/dubbo/dubbo-go/registry/directory"
"sync"
......@@ -58,7 +59,7 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker {
go directory.Subscribe(serviceUrl)
//new cluster invoker
cluster := extension.GetCluster(serviceUrl.Cluster)
cluster := extension.GetCluster(serviceUrl.Params.Get(constant.CLUSTER_KEY))
return cluster.Join(directory)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment