/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package healthcheck import ( "math" ) import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" ) const ( HEALTH_CHECKER = "health.checker" DEFAULT_HEALTH_CHECKER = "default" OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 ) func init() { extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) } // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of // the invoker based on the number of successive bad request and the current active request. type DefaultHealthChecker struct { // outStandingRequestConutLimit outStandingRequestConutLimit int32 // requestSuccessiveFailureThreshold requestSuccessiveFailureThreshold int32 // requestSuccessiveFailureThreshold circuitTrippedTimeoutFactor int32 } // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { urlStatus := protocol.GetURLStatus(invoker.GetUrl()) if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.outStandingRequestConutLimit { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) return false } return true } // isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool { circuitBreakerTimeout := c.getCircuitBreakerTimeout(status) currentTime := protocol.CurrentTimeMillis() if circuitBreakerTimeout <= 0 { return false } return circuitBreakerTimeout > currentTime } // getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { sleepWindow := c.getCircuitBreakerSleepWindowTime(status) if sleepWindow <= 0 { return 0 } return status.GetLastRequestFailedTimestamp() + sleepWindow } // getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() diff := successiveFailureCount - c.requestSuccessiveFailureThreshold if diff < 0 { return 0 } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } return int64(sleepWindow) } // NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ outStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), requestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), } }