diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index ed30559ed3c4b3930e65d6e66d5e91d8619516a1..891b189ba02b479a2c1296e43c94ad2e0a269368 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -124,15 +124,23 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey()) return nil } + refreshBlackList() if len(invokers) == 1 { - return invokers[0] + if invokers[0].IsAvailable() { + return invokers[0] + } + protocol.SetInvokerUnhealthyStatus(invokers[0]) + logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey()) + return nil } selectedInvoker := lb.Select(invokers, invocation) - //judge to if the selectedInvoker is invoked - + //judge to if the selectedInvoker is invoked and available if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) { + // 拉黑当前 + protocol.SetInvokerUnhealthyStatus(selectedInvoker) + // do reselect var reslectInvokers []protocol.Invoker @@ -140,6 +148,8 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc if !invoker.IsAvailable() { logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.", invoker.GetUrl().Ip) + // 拉黑reselect + protocol.SetInvokerUnhealthyStatus(invoker) continue } @@ -194,3 +204,13 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl } return extension.GetLoadbalance(lb) } + +func refreshBlackList() { + ivks := protocol.GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK) + logger.Debug("blackList len = ", len(ivks)) + for i, _ := range ivks { + if ivks[i].(protocol.Invoker).IsAvailable() { + protocol.RemoveInvokerUnhealthyStatus(ivks[i]) + } + } +} diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index 3effd779c989ed8e68c91eb6eab20d10d8b04e8c..a59e58ff61e3602a7581388ca06e6955ea19f436 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -48,7 +48,7 @@ type DefaultHealthChecker struct { // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { - if !invoker.IsAvailable() { + if !protocol.GetInvokerHealthyStatus(invoker){ return false } diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 1a878af2127d2c5b979cfdb55ff1c1ec08463fe7..42a802f947fd938c0f0b5a87dea70837b9cd22fa 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -32,9 +32,8 @@ import ( ) const ( - HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" - healthy = "healthy" - name = "health-check-router" + healthy = "healthy" + name = "health-check-router" ) // HealthCheckRouter provides a health-first routing mechanism through HealthChecker @@ -48,7 +47,7 @@ type HealthCheckRouter struct { func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) { r := &HealthCheckRouter{ url: url, - enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false), + enabled: url.GetParamBool(constant.HEALTH_ROUTE_ENABLED_KEY, false), } if r.enabled { checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) @@ -87,7 +86,6 @@ func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, rb[healthy].Add(uint32(i)) } } - return rb, nil } diff --git a/common/constant/key.go b/common/constant/key.go index 9525511cdaac64083cc595e1c0a6327a4ded6f9c..b10d12d7981d8734610a587a3ffaa36b03dda0e7 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -89,6 +89,8 @@ const ( RETRY_PERIOD_KEY = "retry.period" RETRY_TIMES_KEY = "retry.times" CYCLE_REPORT_KEY = "cycle.report" + DEFAULT_BLACK_LIST_RECOVER_BLOCK = 10 + ) const ( @@ -238,6 +240,9 @@ const ( // Attachment key in context in invoker AttachmentKey = "attachment" + + // HEALTH_ROUTE_ENABLED_KEY defines if use health router + HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" ) const ( diff --git a/config/reference_config.go b/config/reference_config.go index 2e914ac9a718aaef93732fafa18f7d954cc0c5f6..53433100f3c370606beebd9dd29092e022ab3cd6 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -41,30 +41,31 @@ import ( // ReferenceConfig is the configuration of service consumer type ReferenceConfig struct { - context context.Context - pxy *proxy.Proxy - id string - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Check *bool `yaml:"check" json:"check,omitempty" property:"check"` - Url string `yaml:"url" json:"url,omitempty" property:"url"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version"` - ProvideBy string `yaml:"provide_by" json:"provide_by,omitempty" property:"provide_by"` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Async bool `yaml:"async" json:"async,omitempty" property:"async"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - invoker protocol.Invoker - urls []*common.URL - Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` - Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` - RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` - ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` + context context.Context + pxy *proxy.Proxy + id string + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Check *bool `yaml:"check" json:"check,omitempty" property:"check"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version"` + ProvideBy string `yaml:"provide_by" json:"provide_by,omitempty" property:"provide_by"` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + invoker protocol.Invoker + urls []*common.URL + Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` + Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` + ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` + HealthCheckEnable bool `yaml:"health_check" json:"health_check,omitempty" property:"health_check"` } // nolint @@ -104,6 +105,9 @@ func (c *ReferenceConfig) Refer(_ interface{}) { if c.ForceTag { cfgURL.AddParam(constant.ForceUseTag, "true") } + if c.HealthCheckEnable { + cfgURL.AddParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") + } if c.Url != "" { // 1. user specified URL, could be peer-to-peer address, or register center's address. urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*") diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 8d443e84f06b89ed708e080285aee3b054ea02e6..7a915f4f5a90785580663e986a31b389e4d65706 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -30,6 +30,7 @@ import ( var ( methodStatistics sync.Map // url -> { methodName : RPCStatus} serviceStatistic sync.Map // url -> RPCStatus + invokerBlackList sync.Map // store unhealthy url blackList ) // RPCStatus is URL statistics. @@ -181,4 +182,34 @@ func CleanAllStatus() { return true } serviceStatistic.Range(delete2) + delete3 := func(key, _ interface{}) bool { + invokerBlackList.Delete(key) + return true + } + invokerBlackList.Range(delete3) +} + +func GetInvokerHealthyStatus(invoker Invoker) bool { + _, found := invokerBlackList.Load(invoker.GetUrl().Key()) + return !found +} + +func SetInvokerUnhealthyStatus(invoker Invoker) { + invokerBlackList.Store(invoker.GetUrl().Key(), invoker) +} + +func RemoveInvokerUnhealthyStatus(invoker Invoker) { + invokerBlackList.Delete(invoker.GetUrl().Key()) +} + +func GetBlackListInvokers(blockSize int) []Invoker { + resultIvks := make([]Invoker, 0, 16) + invokerBlackList.Range(func(k, v interface{}) bool { + resultIvks = append(resultIvks, v.(Invoker)) + return true + }) + if blockSize > len(resultIvks) { + return resultIvks + } + return resultIvks[:blockSize] } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index e6ae0f3bb9004f8b933d420cdd2e7561f8232b48..d8ff9375e6200dcdec9f8ed1b73b78698659ac92 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -83,11 +83,13 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. dir.consumerURL = dir.getConsumerUrl(url.SubURL) + logger.Info("in NewRegistryDirectory ") if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil { dir.BaseDirectory.SetRouterChain(routerChain) } else { logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err) } + logger.Info("here") dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)