diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index dc715d5ed2778ed46ec49b08da1a2e6c96e3a84a..d12c8dfb1fecbff64a303d2b54557a8b84acd298 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -25,6 +25,10 @@ import ( "time" ) +import ( + "github.com/dubbogo/gost/container/gxset" +) + import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -60,7 +64,7 @@ func GracefulShutdownInit() { go func() { select { case sig := <-signals: - logger.Infof("get signal %s, application will shutdown.", sig.String()) + logger.Infof("get signal %s, application will shutdown.", sig) // gracefulShutdownOnce.Do(func() { BeforeShutdown() @@ -87,18 +91,20 @@ func BeforeShutdown() { // The value of configuration depends on how long the clients will get notification. waitAndAcceptNewRequests() - time.Sleep(1 * time.Minute) // reject the new request, but keeping waiting for accepting requests waitForReceivingRequests() + // we fetch the protocols from Consumer.References. Consumer.ProtocolConfig doesn't contains all protocol, like jsonrpc + consumerProtocols := getConsumerProtocols() + // If this application is not the provider, it will do nothing - destroyProviderProtocols() + destroyProviderProtocols(consumerProtocols) // reject sending the new request, and waiting for response of sending requests waitForSendingRequests() // If this application is not the consumer, it will do nothing - destroyConsumerProtocols() + destroyConsumerProtocols(consumerProtocols) logger.Info("Graceful shutdown --- Execute the custom callbacks.") customCallbacks := extension.GetAllCustomShutdownCallbacks() @@ -113,13 +119,9 @@ func destroyAllRegistries() { registryProtocol.Destroy() } -func destroyConsumerProtocols() { +func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) { logger.Info("Graceful shutdown --- Destroy consumer's protocols. ") - if consumerConfig == nil || consumerConfig.ProtocolConf == nil { - return - } - protocols := consumerConfig.ProtocolConf.(map[interface{}]interface{}) - for name, _ := range protocols { + for name := range consumerProtocols.Items { extension.GetProtocol(name.(string)).Destroy() } } @@ -128,7 +130,7 @@ func destroyConsumerProtocols() { * destroy the provider's protocol. * if the protocol is consumer's protocol too, we will keep it. */ -func destroyProviderProtocols() { +func destroyProviderProtocols(consumerProtocols *gxset.HashSet) { logger.Info("Graceful shutdown --- Destroy provider's protocols. ") @@ -136,17 +138,11 @@ func destroyProviderProtocols() { return } - consumerProtocol := make(map[interface{}]interface{}, 0) - if consumerConfig != nil && consumerConfig.ProtocolConf != nil { - consumerProtocol = consumerConfig.ProtocolConf.(map[interface{}]interface{}) - } - protocols := providerConfig.ProtocolConf.(map[interface{}]interface{}) - for name, _ := range protocols { - _, found := consumerProtocol[name] + for name := range protocols { - // the protocol is the consumer's protocol, we can not destroy it. - if found { + // the protocol is the consumer's protocol too, we can not destroy it. + if consumerProtocols.Contains(name) { continue } extension.GetProtocol(name.(string)).Destroy() @@ -194,20 +190,21 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) { if timeout <= 0 { return } - start := time.Now().UnixNano() - for time.Now().UnixNano()-start < timeout.Nanoseconds() && !shutdownConfig.RequestsFinished { + start := time.Now() + + for time.Now().After(start.Add(timeout)) && !shutdownConfig.RequestsFinished { // sleep 10 ms and then we check it again time.Sleep(10 * time.Millisecond) } } func totalTimeout() time.Duration { - var providerShutdown time.Duration = 0 + var providerShutdown time.Duration if providerConfig != nil && providerConfig.ShutdownConfig != nil { providerShutdown = providerConfig.ShutdownConfig.GetTimeout() } - var consumerShutdown time.Duration = 0 + var consumerShutdown time.Duration if consumerConfig != nil && consumerConfig.ShutdownConfig != nil { consumerShutdown = consumerConfig.ShutdownConfig.GetTimeout() } @@ -218,3 +215,18 @@ func totalTimeout() time.Duration { } return timeout } + +/* + * we can not get the protocols from consumerConfig because some protocol don't have configuration, like jsonrpc. + */ +func getConsumerProtocols() *gxset.HashSet { + result := gxset.NewSet() + if consumerConfig == nil || consumerConfig.References == nil { + return result + } + + for _, reference := range consumerConfig.References { + result.Add(reference.Protocol) + } + return result +} diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go index 9077752ce27b5480dc5a96d34a5f63e838003c26..df55728565f6cf14ce4357f8c9c7927c30d80e40 100644 --- a/config/graceful_shutdown_config.go +++ b/config/graceful_shutdown_config.go @@ -20,6 +20,7 @@ package config import ( "time" ) + import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" @@ -63,7 +64,7 @@ func (config *ShutdownConfig) Prefix() string { func (config *ShutdownConfig) GetTimeout() time.Duration { result, err := time.ParseDuration(config.Timeout) if err != nil { - logger.Errorf("The Timeout configuration is invalid: %s, and we will use the default value: %s", + logger.Errorf("The Timeout configuration is invalid: %s, and we will use the default value: %s, err: %v", config.Timeout, defaultTimeout.String(), err) return defaultTimeout } @@ -73,8 +74,8 @@ func (config *ShutdownConfig) GetTimeout() time.Duration { func (config *ShutdownConfig) GetStepTimeout() time.Duration { result, err := time.ParseDuration(config.StepTimeout) if err != nil { - logger.Errorf("The StepTimeout configuration is invalid: %s, and we will use the default value: %s", - config.Timeout, defaultStepTimeout.String(), err) + logger.Errorf("The StepTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v", + config.StepTimeout, defaultStepTimeout.String(), err) return defaultStepTimeout } return result diff --git a/config/graceful_shutdown_config_test.go b/config/graceful_shutdown_config_test.go index 4b6645db03902df4345ed82048724cb706bdff3c..583ed70b838a8271a47e180ee3c6eb32cbb46984 100644 --- a/config/graceful_shutdown_config_test.go +++ b/config/graceful_shutdown_config_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" ) + import ( "github.com/stretchr/testify/assert" ) diff --git a/config/graceful_shutdown_test.go b/config/graceful_shutdown_test.go index ca01f68142944afa33f5fa5e7541d5e6e8107869..0c4960152d0562eca85b5c49abaf06f33a73f458 100644 --- a/config/graceful_shutdown_test.go +++ b/config/graceful_shutdown_test.go @@ -20,6 +20,7 @@ package config import ( "testing" ) + import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -42,7 +43,7 @@ func TestBeforeShutdown(t *testing.T) { return &mockRegistryProtocol{} }) - protocolConfigs := make(map[interface{}]interface{}) + protocolConfigs := make(map[interface{}]interface{}, 16) protocolConfigs[constant.DUBBO] = "aaa" // without configuration @@ -56,7 +57,7 @@ func TestBeforeShutdown(t *testing.T) { }, } - providerProtocols := make(map[interface{}]interface{}) + providerProtocols := make(map[interface{}]interface{}, 16) providerProtocols[constant.DUBBO] = "aaa" providerProtocols["mock"] = "aaa" diff --git a/filter/impl/graceful_shutdown_filter.go b/filter/impl/graceful_shutdown_filter.go index 285fe11da84abd55303413386e210ce01b9738e1..b912ea88e4ba4741b7d7fe36b8bbd3ba158abe63 100644 --- a/filter/impl/graceful_shutdown_filter.go +++ b/filter/impl/graceful_shutdown_filter.go @@ -33,10 +33,9 @@ import ( func init() { var consumerFiler = &gracefulShutdownFilter{ - activeCount: 0, shutdownConfig: config.GetConsumerConfig().ShutdownConfig, } - var providerFilter = &gracefulShutdownFilter{activeCount: 0, + var providerFilter = &gracefulShutdownFilter{ shutdownConfig: config.GetProviderConfig().ShutdownConfig, } diff --git a/filter/impl/graceful_shutdown_filter_test.go b/filter/impl/graceful_shutdown_filter_test.go index f541219b5a097cbe1e025ec9374e337a83e8e4d4..21da167ea0f201ea357c51cab0ecb4f8ebec0957 100644 --- a/filter/impl/graceful_shutdown_filter_test.go +++ b/filter/impl/graceful_shutdown_filter_test.go @@ -21,9 +21,11 @@ import ( "net/url" "testing" ) + import ( "github.com/stretchr/testify/assert" ) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant"