Skip to content
Snippets Groups Projects
Commit deec46e4 authored by Ming Deng's avatar Ming Deng
Browse files

Fix BUG & DEBUG

parent 7aa4218a
No related branches found
No related tags found
No related merge requests found
......@@ -71,6 +71,8 @@ const (
EXECUTE_LIMIT_KEY = "execute.limit"
DEFAULT_EXECUTE_LIMIT = "-1"
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
)
const (
......
......@@ -149,7 +149,8 @@ func Load() {
}
}
}
GracefulShutdownInit()
// init the shutdown callback
// GracefulShutdownInit()
}
// get rpc service for consumer
......
......@@ -20,7 +20,7 @@ package config
import (
"os"
"os/signal"
"strconv"
"runtime/debug"
"sync"
"syscall"
"time"
......@@ -28,7 +28,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
/*
......@@ -56,44 +55,96 @@ func GracefulShutdownInit() {
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
syscall.SIGABRT, syscall.SIGEMT, syscall.SIGSYS,
)
for {
sig := <-signals
gracefulShutdownOnce.Do(func() {
logger.Infof("get signal %s, application is shutdown", sig.String())
go func() {
select {
case sig := <-signals:
logger.Infof("get signal %s, application will shutdown.", sig.String())
// gracefulShutdownOnce.Do(func() {
BeforeShutdown()
switch sig {
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
case syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
syscall.SIGABRT, syscall.SIGEMT, syscall.SIGSYS:
debug.WriteHeapDump(os.Stdout.Fd())
default:
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
os.Exit(0)
})
}
os.Exit(0)
}
}()
}
destroyAllRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
waitAndAcceptNewRequests()
func totalTimeout() time.Duration {
var providerShutdown time.Duration = 0
if providerConfig != nil && providerConfig.ShutdownConfig != nil {
providerShutdown = providerConfig.ShutdownConfig.GetTimeout()
}
// after this step, the new request will be rejected because the server is shutdown.
destroyProviderProtocols()
var consumerShutdown time.Duration = 0
if consumerConfig != nil && consumerConfig.ShutdownConfig != nil {
consumerShutdown = consumerConfig.ShutdownConfig.GetTimeout()
}
//
var timeout = providerShutdown
if consumerShutdown > providerShutdown {
timeout = consumerShutdown
}
return timeout
}
func BeforeShutdown() {
logger.Infof("Execute the custom callbacks.")
customCallbacks := extension.GetAllCustomShutdownCallbacks()
for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
callback.Value.(func())()
}
})
destroyAllRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
waitAndAcceptNewRequests()
// reject the new request, but keeping waiting for accepting requests
waitForReceivingRequests()
// If this application is not the provider, it will do nothing
destroyProviderProtocols()
// waiting for accepted requests to be processed.
// after this step, the response from other providers will be rejected.
// If this application is not the consumer, it will do nothing
destroyConsumerProtocols()
logger.Infof("Execute the custom callbacks.")
customCallbacks := extension.GetAllCustomShutdownCallbacks()
for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
callback.Value.(func())()
}
}
func destroyAllRegistries() {
logger.Infof("Graceful shutdown --- Destroy all registries. ")
registryProtocol := extension.GetProtocol(constant.REGISTRY_KEY)
registryProtocol.Destroy()
}
func destroyConsumerProtocols() {
logger.Info("Graceful shutdown --- Destroy consumer's protocols. ")
if consumerConfig == nil || consumerConfig.ProtocolConf == nil {
return
}
destroyProtocols(consumerConfig.ProtocolConf)
}
/**
* destroy the provider's protocol.
* if the protocol is consumer's protocol too, we will keep it.
*/
func destroyProviderProtocols() {
logger.Info("Graceful shutdown --- Destroy provider's protocols. ")
if providerConfig == nil || providerConfig.ProtocolConf == nil {
return
}
......@@ -103,82 +154,53 @@ func destroyProviderProtocols() {
func destroyProtocols(protocolConf interface{}) {
protocols := protocolConf.(map[interface{}]interface{})
for name, _ := range protocols {
protocol := extension.GetProtocol(name.(string))
protocol.Destroy()
extension.GetProtocol(name.(string)).Destroy()
}
}
func waitAndAcceptNewRequests() {
logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
if providerConfig == nil || providerConfig.ShutdownConfig == nil {
return
}
shutdownConfig := providerConfig.ShutdownConfig
timeout, err := strconv.ParseInt(shutdownConfig.AcceptNewRequestsTimeout, 0, 0)
if err != nil {
logger.Errorf("The timeout configuration of keeping accept new requests is invalid. Go next step!", err)
return
}
timeout := providerConfig.ShutdownConfig.GetStepTimeout()
// ignore this phase
// ignore this step
if timeout < 0 {
return
}
var duration = time.Duration(timeout) * time.Millisecond
time.Sleep(duration)
time.Sleep(timeout)
}
/**
* this method will wait a short time until timeout or all requests have been processed.
* this implementation use the active filter, so you need to add the filter into your application configuration
* for example:
* server.yml or client.yml
*
* filter: "active",
*
* see the ActiveFilter for more detail.
* We use the bigger value between consumer's config and provider's config
* if the application is both consumer and provider.
* This method's behavior is a little bit complicated.
*/
func waitForProcessingRequest() {
var timeout int64 = 0
if providerConfig != nil && providerConfig.ShutdownConfig != nil {
timeout = waitingProcessedTimeout(providerConfig.ShutdownConfig)
// for provider. It will wait for processing receiving requests
func waitForReceivingRequests() {
logger.Info("Graceful shutdown --- Keep waiting until accepting requests finish or timeout. ")
if providerConfig == nil || providerConfig.ShutdownConfig == nil {
// ignore this step
return
}
waitingProcessedTimeout(providerConfig.ShutdownConfig)
}
if consumerConfig != nil && consumerConfig.ShutdownConfig != nil {
consumerTimeout := waitingProcessedTimeout(consumerConfig.ShutdownConfig)
if consumerTimeout > timeout {
timeout = consumerTimeout
}
}
if timeout <= 0{
// for consumer. It will wait for the response of sending requests
func waitForSendingRequests() {
logger.Info("Graceful shutdown --- Keep waiting until sending requests getting response or timeout ")
if consumerConfig == nil || consumerConfig.ShutdownConfig == nil {
// ignore this step
return
}
}
timeout = timeout * time.Millisecond.Nanoseconds()
func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
timeout := shutdownConfig.GetStepTimeout()
if timeout <= 0 {
return
}
start := time.Now().UnixNano()
for time.Now().UnixNano() - start < timeout && protocol.GetTotalActive() > 0 {
for time.Now().UnixNano()-start < timeout.Nanoseconds() && !shutdownConfig.RequestsFinished {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
}
}
func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) int64 {
if len(shutdownConfig.WaitingProcessRequestsTimeout) <=0 {
return 0
}
config, err := strconv.ParseInt(shutdownConfig.WaitingProcessRequestsTimeout, 0, 0)
if err != nil {
logger.Errorf("The configuration of shutdownConfig.WaitingProcessRequestsTimeout is invalid: %s",
shutdownConfig.WaitingProcessRequestsTimeout)
return 0
}
return config
}
......@@ -17,10 +17,59 @@
package config
import (
"strconv"
"time"
"github.com/apache/dubbo-go/common/logger"
)
const (
defaultTimeout = 60 * time.Second
defaultStepTimeout = 10 * time.Second
)
type ShutdownConfig struct {
// it's the total timeout configuration. After that, the application will exit.
Timeout string `yaml:"timeout" json:"timeout,omitempty" property:"timout"`
AcceptNewRequestsTimeout string `yaml:"requests.accept.timeout" json:"requests.accept.timeout,omitempty" property:"requests.accept.timeout"`
WaitingProcessRequestsTimeout string `yaml:"requests.reject.timeout" json:"requests.reject.timeout,omitempty" property:"requests.reject.timeout"`
/*
* Total timeout. Even though we don't release all resources,
* the application will shutdown if the costing time is over this configuration. The unit is ms.
* default value is 60 * 1000 ms = 1 minutes
* In general, it should be bigger than 3 * StepTimeout.
*/
Timeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
/*
* the timeout on each step. You should evaluate the response time of request
* and the time that client noticed that server shutdown.
* For example, if your client will received the notification within 10s when you start to close server,
* and the 99.9% requests will return response in 2s, so the StepTimeout will be bigger than(10+2) * 1000ms,
* maybe (10 + 2*3) * 1000ms is a good choice.
*/
StepTimeout string `yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"`
// when we try to shutdown the application, we will reject the new requests. In most cases, you don't need to configure this.
RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"`
// true -> new request will be rejected.
RejectRequest bool
// true -> all requests had been processed. In provider side it means that all requests are returned response to clients
// In consumer side, it means that all requests getting response from servers
RequestsFinished bool
}
func (config *ShutdownConfig) GetTimeout() time.Duration {
result, err := strconv.ParseInt(config.Timeout, 0, 0)
if err != nil {
logger.Errorf("The Timeout configuration is invalid: %s, and we will use the default value: %s",
config.Timeout, defaultTimeout.String(), err)
return defaultTimeout
}
return time.Millisecond * time.Duration(result)
}
func (config *ShutdownConfig) GetStepTimeout() time.Duration {
result, err := strconv.ParseInt(config.StepTimeout, 0, 0)
if err != nil {
logger.Errorf("The StepTimeout configuration is invalid: %s, and we will use the default value: %s",
config.Timeout, defaultStepTimeout.String(), err)
return defaultStepTimeout
}
return time.Millisecond * time.Duration(result)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"testing"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
func TestBeforeShutdown(t *testing.T) {
extension.SetProtocol("registry", func() protocol.Protocol {
return &mockRegistryProtocol{}
})
extension.SetProtocol(constant.DUBBO, func() protocol.Protocol {
return &mockRegistryProtocol{}
})
protocolConfigs := make(map[interface{}]interface{})
protocolConfigs[constant.DUBBO] = "aaa"
providerConfig = &ProviderConfig{
ShutdownConfig: &ShutdownConfig{
Timeout: "1",
AcceptNewRequestsTimeout: "1",
WaitingProcessRequestsTimeout: "1",
},
ProtocolConf: protocolConfigs,
}
consumerConfig = &ConsumerConfig{
ProtocolConf: protocolConfigs,
ShutdownConfig: &ShutdownConfig{
Timeout: "1",
AcceptNewRequestsTimeout: "1",
WaitingProcessRequestsTimeout: "1",
},
}
BeforeShutdown()
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"sync/atomic"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol"
)
func init() {
var consumerFiler = &gracefulShutdownFilter{
activeCount: 0,
shutdownConfig: config.GetConsumerConfig().ShutdownConfig,
}
var providerFilter = &gracefulShutdownFilter{activeCount: 0}
extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
return consumerFiler
})
extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
return providerFilter
})
}
type gracefulShutdownFilter struct {
activeCount int32
shutdownConfig *config.ShutdownConfig
}
func (gf *gracefulShutdownFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if gf.rejectNewRequest() {
logger.Info("The application is closing, new request will be rejected.")
return gf.getRejectHandler().RejectedExecution(invoker.GetUrl(), invocation)
}
atomic.AddInt32(&gf.activeCount, 1)
return invoker.Invoke(invocation)
}
func (gf *gracefulShutdownFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
atomic.AddInt32(&gf.activeCount, -1)
// although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true.
if gf.activeCount <= 0 {
gf.shutdownConfig.RequestsFinished = true
}
return result
}
func (gf *gracefulShutdownFilter) rejectNewRequest() bool {
if gf.shutdownConfig == nil {
return false
}
return gf.shutdownConfig.RejectRequest
}
func (gf *gracefulShutdownFilter) getRejectHandler() common.RejectedExecutionHandler {
handler := constant.DEFAULT_KEY
if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 {
handler = gf.shutdownConfig.RejectRequestHandler
}
return extension.GetRejectedExecutionHandler(handler)
}
......@@ -92,7 +92,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
return nil, perrors.New("listener stopped")
case <-l.registry.done:
logger.Warnf("zk consumer register has quit, so zk event listener exit asap now.")
logger.Warnf("zk consumer register has quit, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
case e := <-l.events:
......@@ -109,13 +109,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
}
}
func (l *RegistryConfigurationListener) Close() {
if l.registry.IsAvailable() {
/**
* if the registry is not available, it means that the registry has been destroy
* so we don't need to call Done(), or it will cause the negative count panic for registry.wg
*/
l.registry.wg.Done()
}
l.registry.wg.Done()
}
func (l *RegistryConfigurationListener) valid() bool {
......
......@@ -173,9 +173,14 @@ func (r *zkRegistry) GetUrl() common.URL {
}
func (r *zkRegistry) Destroy() {
if r.configListener != nil {
r.configListener.Close()
}
/**
* Don't r.listener.Close()
* here we don't close the listener because
* the listener will be close in Subscribe().
* We can not close it here. If we do that,
* a negative count error of r.wg will occur because
* we close the listener twice.
*/
close(r.done)
r.wg.Wait()
r.closeRegisters()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment