/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package healthcheck import ( "math" ) import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" ) func init() { extension.SetHealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) } // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of // the invoker based on the number of successive bad request and the current active request. type DefaultHealthChecker struct { // the limit of outstanding request outStandingRequestConutLimit int32 // the threshold of successive-failure-request requestSuccessiveFailureThreshold int32 // value of circuit-tripped timeout factor circuitTrippedTimeoutFactor int32 } // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { if !invoker.IsAvailable() { return false } urlStatus := protocol.GetURLStatus(invoker.GetUrl()) if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestCountLimit() { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) return false } return true } // isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool { circuitBreakerTimeout := c.getCircuitBreakerTimeout(status) currentTime := protocol.CurrentTimeMillis() if circuitBreakerTimeout <= 0 { return false } return circuitBreakerTimeout > currentTime } // getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { sleepWindow := c.getCircuitBreakerSleepWindowTime(status) if sleepWindow <= 0 { return 0 } return status.GetLastRequestFailedTimestamp() + sleepWindow } // getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold() if diff < 0 { return 0 } else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } sleepWindow := (1 << uint(diff)) * c.GetCircuitTrippedTimeoutFactor() if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } return int64(sleepWindow) } // GetRequestSuccessiveFailureThreshold return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 { return c.requestSuccessiveFailureThreshold } // GetCircuitTrippedTimeoutFactor return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 { return c.circuitTrippedTimeoutFactor } // GetOutStandingRequestCountLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker func (c *DefaultHealthChecker) GetOutStandingRequestCountLimit() int32 { return c.outStandingRequestConutLimit } // NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ outStandingRequestConutLimit: url.GetParamInt32(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32), requestSuccessiveFailureThreshold: url.GetParamInt32(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF), circuitTrippedTimeoutFactor: url.GetParamInt32(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR), } }