From deec46e47e57ead84b4df9ff40ce9df12ef4a283 Mon Sep 17 00:00:00 2001
From: Ming Deng <mindeng@ebay.com>
Date: Sun, 10 Nov 2019 21:39:45 +0800
Subject: [PATCH] Fix BUG & DEBUG

---
 common/constant/key.go                |   2 +
 config/config_loader.go               |   3 +-
 config/graceful_shutdown.go           | 168 +++++++++++++++-----------
 config/graceful_shutdown_config.go    |  57 ++++++++-
 config/graceful_shutdown_test.go      |  56 +++++++++
 filter/impl/GracefulShutdownFilter.go |  84 +++++++++++++
 registry/zookeeper/listener.go        |  10 +-
 registry/zookeeper/registry.go        |  11 +-
 8 files changed, 302 insertions(+), 89 deletions(-)
 create mode 100644 config/graceful_shutdown_test.go
 create mode 100644 filter/impl/GracefulShutdownFilter.go

diff --git a/common/constant/key.go b/common/constant/key.go
index ff371d08c..2bfebf8f0 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -71,6 +71,8 @@ const (
 	EXECUTE_LIMIT_KEY                      = "execute.limit"
 	DEFAULT_EXECUTE_LIMIT                  = "-1"
 	EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
+	PROVIDER_SHUTDOWN_FILTER = "pshutdown"
+	CONSUMER_SHUTDOWN_FILTER = "cshutdown"
 )
 
 const (
diff --git a/config/config_loader.go b/config/config_loader.go
index 2747a8365..106d10ede 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -149,7 +149,8 @@ func Load() {
 			}
 		}
 	}
-	GracefulShutdownInit()
+	// init the shutdown callback
+	// GracefulShutdownInit()
 }
 
 // get rpc service for consumer
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index cb198c125..8787cb5ac 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -20,7 +20,7 @@ package config
 import (
 	"os"
 	"os/signal"
-	"strconv"
+	"runtime/debug"
 	"sync"
 	"syscall"
 	"time"
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/dubbo-go/common/constant"
 	"github.com/apache/dubbo-go/common/extension"
 	"github.com/apache/dubbo-go/common/logger"
-	"github.com/apache/dubbo-go/protocol"
 )
 
 /*
@@ -56,44 +55,96 @@ func GracefulShutdownInit() {
 		syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
 		syscall.SIGABRT, syscall.SIGEMT, syscall.SIGSYS,
 	)
-	for {
-		sig := <-signals
 
-		gracefulShutdownOnce.Do(func() {
-			logger.Infof("get signal %s, application is shutdown", sig.String())
+	go func() {
+		select {
+		case sig := <-signals:
+			logger.Infof("get signal %s, application will shutdown.", sig.String())
+			// gracefulShutdownOnce.Do(func() {
+			BeforeShutdown()
+
+			switch sig {
+			// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
+			case syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
+				syscall.SIGABRT, syscall.SIGEMT, syscall.SIGSYS:
+				debug.WriteHeapDump(os.Stdout.Fd())
+			default:
+				time.AfterFunc(totalTimeout(), func() {
+					logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
+					os.Exit(0)
+				})
+			}
+			os.Exit(0)
+		}
+	}()
+}
 
-			destroyAllRegistries()
-			// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
-			// The value of configuration depends on how long the clients will get notification.
-			waitAndAcceptNewRequests()
+func totalTimeout() time.Duration {
+	var providerShutdown time.Duration = 0
+	if providerConfig != nil && providerConfig.ShutdownConfig != nil {
+		providerShutdown = providerConfig.ShutdownConfig.GetTimeout()
+	}
 
-			// after this step, the new request will be rejected because the server is shutdown.
-			destroyProviderProtocols()
+	var consumerShutdown time.Duration = 0
+	if consumerConfig != nil && consumerConfig.ShutdownConfig != nil {
+		consumerShutdown = consumerConfig.ShutdownConfig.GetTimeout()
+	}
 
-			//
+	var timeout = providerShutdown
+	if consumerShutdown > providerShutdown {
+		timeout = consumerShutdown
+	}
+	return timeout
+}
 
+func BeforeShutdown() {
 
-			logger.Infof("Execute the custom callbacks.")
-			customCallbacks := extension.GetAllCustomShutdownCallbacks()
-			for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
-				callback.Value.(func())()
-			}
-		})
+	destroyAllRegistries()
+	// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
+	// The value of configuration depends on how long the clients will get notification.
+	waitAndAcceptNewRequests()
+
+	// reject the new request, but keeping waiting for accepting requests
+	waitForReceivingRequests()
+
+	// If this application is not the provider, it will do nothing
+	destroyProviderProtocols()
+
+	// waiting for accepted requests to be processed.
+
+	// after this step, the response from other providers will be rejected.
+	// If this application is not the consumer, it will do nothing
+	destroyConsumerProtocols()
+
+	logger.Infof("Execute the custom callbacks.")
+	customCallbacks := extension.GetAllCustomShutdownCallbacks()
+	for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
+		callback.Value.(func())()
 	}
 }
 
 func destroyAllRegistries() {
+	logger.Infof("Graceful shutdown --- Destroy all registries. ")
 	registryProtocol := extension.GetProtocol(constant.REGISTRY_KEY)
 	registryProtocol.Destroy()
 }
 
 func destroyConsumerProtocols() {
+	logger.Info("Graceful shutdown --- Destroy consumer's protocols. ")
 	if consumerConfig == nil || consumerConfig.ProtocolConf == nil {
 		return
 	}
 	destroyProtocols(consumerConfig.ProtocolConf)
 }
+
+/**
+ * destroy the provider's protocol.
+ * if the protocol is consumer's protocol too, we will keep it.
+ */
 func destroyProviderProtocols() {
+
+	logger.Info("Graceful shutdown --- Destroy provider's protocols. ")
+
 	if providerConfig == nil || providerConfig.ProtocolConf == nil {
 		return
 	}
@@ -103,82 +154,53 @@ func destroyProviderProtocols() {
 func destroyProtocols(protocolConf interface{}) {
 	protocols := protocolConf.(map[interface{}]interface{})
 	for name, _ := range protocols {
-		protocol := extension.GetProtocol(name.(string))
-		protocol.Destroy()
+		extension.GetProtocol(name.(string)).Destroy()
 	}
 }
 
 func waitAndAcceptNewRequests() {
 
+	logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
 	if providerConfig == nil || providerConfig.ShutdownConfig == nil {
 		return
 	}
-	shutdownConfig := providerConfig.ShutdownConfig
 
-	timeout, err := strconv.ParseInt(shutdownConfig.AcceptNewRequestsTimeout, 0, 0)
-	if err != nil {
-		logger.Errorf("The timeout configuration of keeping accept new requests is invalid. Go next step!", err)
-		return
-	}
+	timeout := providerConfig.ShutdownConfig.GetStepTimeout()
 
-	// ignore this phase
+	// ignore this step
 	if timeout < 0 {
 		return
 	}
-
-	var duration = time.Duration(timeout) * time.Millisecond
-
-	time.Sleep(duration)
+	time.Sleep(timeout)
 }
 
-/**
- * this method will wait a short time until timeout or all requests have been processed.
- * this implementation use the active filter, so you need to add the filter into your application configuration
- * for example:
- * server.yml or client.yml
- *
- * filter: "active",
- *
- * see the ActiveFilter for more detail.
- * We use the bigger value between consumer's config and provider's config
- * if the application is both consumer and provider.
- * This method's behavior is a little bit complicated.
- */
-func waitForProcessingRequest()  {
-	var timeout int64 = 0
-	if providerConfig != nil && providerConfig.ShutdownConfig != nil {
-		timeout = waitingProcessedTimeout(providerConfig.ShutdownConfig)
+// for provider. It will wait for processing receiving requests
+func waitForReceivingRequests() {
+	logger.Info("Graceful shutdown --- Keep waiting until accepting requests finish or timeout. ")
+	if providerConfig == nil || providerConfig.ShutdownConfig == nil {
+		// ignore this step
+		return
 	}
+	waitingProcessedTimeout(providerConfig.ShutdownConfig)
+}
 
-	if consumerConfig != nil && consumerConfig.ShutdownConfig != nil {
-		consumerTimeout := waitingProcessedTimeout(consumerConfig.ShutdownConfig)
-		if consumerTimeout > timeout {
-			timeout = consumerTimeout
-		}
-	}
-	if timeout <= 0{
+// for consumer. It will wait for the response of sending requests
+func waitForSendingRequests()  {
+	logger.Info("Graceful shutdown --- Keep waiting until sending requests getting response or timeout ")
+	if consumerConfig == nil || consumerConfig.ShutdownConfig == nil {
+		// ignore this step
 		return
 	}
+}
 
-	timeout = timeout * time.Millisecond.Nanoseconds()
-
+func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
+	timeout := shutdownConfig.GetStepTimeout()
+	if timeout <= 0 {
+		return
+	}
 	start := time.Now().UnixNano()
-
-	for time.Now().UnixNano() - start < timeout && protocol.GetTotalActive() > 0  {
+	for time.Now().UnixNano()-start < timeout.Nanoseconds() && !shutdownConfig.RequestsFinished {
 		// sleep 10 ms and then we check it again
 		time.Sleep(10 * time.Millisecond)
 	}
 }
-
-func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) int64 {
-	if len(shutdownConfig.WaitingProcessRequestsTimeout) <=0 {
-		return 0
-	}
-	config, err := strconv.ParseInt(shutdownConfig.WaitingProcessRequestsTimeout, 0, 0)
-	if err != nil {
-		logger.Errorf("The configuration of shutdownConfig.WaitingProcessRequestsTimeout is invalid: %s",
-			shutdownConfig.WaitingProcessRequestsTimeout)
-		return 0
-	}
-	return config
-}
diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go
index 9c52044e3..c5c16add0 100644
--- a/config/graceful_shutdown_config.go
+++ b/config/graceful_shutdown_config.go
@@ -17,10 +17,59 @@
 
 package config
 
+import (
+	"strconv"
+	"time"
 
+	"github.com/apache/dubbo-go/common/logger"
+)
+
+const (
+	defaultTimeout  = 60 * time.Second
+	defaultStepTimeout = 10 * time.Second
+)
 type ShutdownConfig struct {
-	// it's the total timeout configuration. After that, the application will exit.
-	Timeout                       string `yaml:"timeout" json:"timeout,omitempty" property:"timout"`
-	AcceptNewRequestsTimeout      string `yaml:"requests.accept.timeout" json:"requests.accept.timeout,omitempty" property:"requests.accept.timeout"`
-	WaitingProcessRequestsTimeout string `yaml:"requests.reject.timeout" json:"requests.reject.timeout,omitempty" property:"requests.reject.timeout"`
+	/*
+	 * Total timeout. Even though we don't release all resources,
+	 * the application will shutdown if the costing time is over this configuration. The unit is ms.
+	 * default value is 60 * 1000 ms = 1 minutes
+	 * In general, it should be bigger than 3 * StepTimeout.
+	 */
+	Timeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
+	/*
+	 * the timeout on each step. You should evaluate the response time of request
+	 * and the time that client noticed that server shutdown.
+	 * For example, if your client will received the notification within 10s when you start to close server,
+	 * and the 99.9% requests will return response in 2s, so the StepTimeout will be bigger than(10+2) * 1000ms,
+	 * maybe (10 + 2*3) * 1000ms is a good choice.
+	 */
+	StepTimeout string `yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"`
+	// when we try to shutdown the application, we will reject the new requests. In most cases, you don't need to configure this.
+	RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"`
+	// true -> new request will be rejected.
+	RejectRequest bool
+
+	// true -> all requests had been processed. In provider side it means that all requests are returned response to clients
+	// In consumer side, it means that all requests getting response from servers
+	RequestsFinished bool
+}
+
+func (config *ShutdownConfig) GetTimeout() time.Duration {
+	result, err := strconv.ParseInt(config.Timeout, 0, 0)
+	if err != nil {
+		logger.Errorf("The Timeout configuration is invalid: %s, and we will use the default value: %s",
+			config.Timeout, defaultTimeout.String(), err)
+		return defaultTimeout
+	}
+	return time.Millisecond * time.Duration(result)
+}
+
+func (config *ShutdownConfig) GetStepTimeout() time.Duration {
+	result, err := strconv.ParseInt(config.StepTimeout, 0, 0)
+	if err != nil {
+		logger.Errorf("The StepTimeout configuration is invalid: %s, and we will use the default value: %s",
+			config.Timeout, defaultStepTimeout.String(), err)
+		return defaultStepTimeout
+	}
+	return time.Millisecond * time.Duration(result)
 }
diff --git a/config/graceful_shutdown_test.go b/config/graceful_shutdown_test.go
new file mode 100644
index 000000000..595856208
--- /dev/null
+++ b/config/graceful_shutdown_test.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+	"testing"
+
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+
+func TestBeforeShutdown(t *testing.T) {
+	extension.SetProtocol("registry", func() protocol.Protocol {
+		return &mockRegistryProtocol{}
+	})
+	extension.SetProtocol(constant.DUBBO, func() protocol.Protocol {
+		return &mockRegistryProtocol{}
+	})
+
+	protocolConfigs := make(map[interface{}]interface{})
+	protocolConfigs[constant.DUBBO] = "aaa"
+	providerConfig = &ProviderConfig{
+		ShutdownConfig: &ShutdownConfig{
+			Timeout:                       "1",
+			AcceptNewRequestsTimeout:      "1",
+			WaitingProcessRequestsTimeout: "1",
+		},
+		ProtocolConf: protocolConfigs,
+	}
+	consumerConfig = &ConsumerConfig{
+		ProtocolConf: protocolConfigs,
+		ShutdownConfig: &ShutdownConfig{
+			Timeout:                       "1",
+			AcceptNewRequestsTimeout:      "1",
+			WaitingProcessRequestsTimeout: "1",
+		},
+	}
+	BeforeShutdown()
+}
\ No newline at end of file
diff --git a/filter/impl/GracefulShutdownFilter.go b/filter/impl/GracefulShutdownFilter.go
new file mode 100644
index 000000000..7337b20d0
--- /dev/null
+++ b/filter/impl/GracefulShutdownFilter.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+	"sync/atomic"
+
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/filter"
+	"github.com/apache/dubbo-go/filter/common"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+func init() {
+	var consumerFiler = &gracefulShutdownFilter{
+		activeCount:    0,
+		shutdownConfig: config.GetConsumerConfig().ShutdownConfig,
+	}
+	var providerFilter = &gracefulShutdownFilter{activeCount: 0}
+
+	extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
+		return consumerFiler
+	})
+
+	extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
+		return providerFilter
+	})
+}
+
+type gracefulShutdownFilter struct {
+	activeCount    int32
+	shutdownConfig *config.ShutdownConfig
+}
+
+func (gf *gracefulShutdownFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+	if gf.rejectNewRequest() {
+		logger.Info("The application is closing, new request will be rejected.")
+		return gf.getRejectHandler().RejectedExecution(invoker.GetUrl(), invocation)
+	}
+	atomic.AddInt32(&gf.activeCount, 1)
+	return invoker.Invoke(invocation)
+}
+
+func (gf *gracefulShutdownFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+	atomic.AddInt32(&gf.activeCount, -1)
+	// although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true.
+	if gf.activeCount <= 0 {
+		gf.shutdownConfig.RequestsFinished = true
+	}
+	return result
+}
+
+func (gf *gracefulShutdownFilter) rejectNewRequest() bool {
+	if gf.shutdownConfig == nil {
+		return false
+	}
+	return gf.shutdownConfig.RejectRequest
+}
+
+func (gf *gracefulShutdownFilter) getRejectHandler() common.RejectedExecutionHandler {
+	handler := constant.DEFAULT_KEY
+	if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 {
+		handler = gf.shutdownConfig.RejectRequestHandler
+	}
+	return extension.GetRejectedExecutionHandler(handler)
+}
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index c25028d58..67479c443 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -92,7 +92,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 			return nil, perrors.New("listener stopped")
 
 		case <-l.registry.done:
-			logger.Warnf("zk consumer register has quit, so zk event listener exit asap now.")
+			logger.Warnf("zk consumer register has quit, so zk event listener exit now.")
 			return nil, perrors.New("listener stopped")
 
 		case e := <-l.events:
@@ -109,13 +109,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 	}
 }
 func (l *RegistryConfigurationListener) Close() {
-	if l.registry.IsAvailable() {
-		/**
-		 * if the registry is not available, it means that the registry has been destroy
-		 * so we don't need to call Done(), or it will cause the negative count panic for registry.wg
-		 */
-		l.registry.wg.Done()
-	}
+	l.registry.wg.Done()
 }
 
 func (l *RegistryConfigurationListener) valid() bool {
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 29ae51d44..682b9fe0c 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -173,9 +173,14 @@ func (r *zkRegistry) GetUrl() common.URL {
 }
 
 func (r *zkRegistry) Destroy() {
-	if r.configListener != nil {
-		r.configListener.Close()
-	}
+	/**
+	 * Don't r.listener.Close()
+	 * here we don't close the listener because
+	 * the listener will be close in Subscribe().
+	 * We can not close it here. If we do that,
+	 * a negative count error of r.wg will occur because
+	 * we close the listener twice.
+	 */
 	close(r.done)
 	r.wg.Wait()
 	r.closeRegisters()
-- 
GitLab