From 7aa4218ac2d59a80954225163d9bf565e4c69ad8 Mon Sep 17 00:00:00 2001
From: Ming Deng <mindeng@ebay.com>
Date: Fri, 8 Nov 2019 23:02:14 +0800
Subject: [PATCH] Final version

---
 common/extension/graceful_shutdown.go |  55 ++++++++
 config/config_loader.go               |   1 +
 config/consumer_config.go             |   1 +
 config/graceful_shutdown.go           | 184 ++++++++++++++++++++++++++
 config/graceful_shutdown_config.go    |  26 ++++
 config/provider_config.go             |  12 +-
 go.sum                                |   4 -
 protocol/rpc_status.go                |  14 ++
 8 files changed, 288 insertions(+), 9 deletions(-)
 create mode 100644 common/extension/graceful_shutdown.go
 create mode 100644 config/graceful_shutdown.go
 create mode 100644 config/graceful_shutdown_config.go

diff --git a/common/extension/graceful_shutdown.go b/common/extension/graceful_shutdown.go
new file mode 100644
index 000000000..ff34f5073
--- /dev/null
+++ b/common/extension/graceful_shutdown.go
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+	"container/list"
+)
+
+var (
+	// SystemShutdownCallbackNames = []string{"registry"}
+	// systemShutdownCallbacks     = make(map[string]func())
+	customShutdownCallbacks     = list.New()
+)
+
+/**
+ * you should not make any assumption about the order.
+ * For example, if you have more than one callbacks, and you wish the order is:
+ * callback1()
+ * callback2()
+ * ...
+ * callbackN()
+ * Then you should put then together:
+ * func callback() {
+ *     callback1()
+ *     callback2()
+ *     ...
+ *     callbackN()
+ * }
+ * I think the order of custom callbacks should be decided by the users.
+ * Even though I can design a mechanism to support the ordered custom callbacks,
+ * the benefit of that mechanism is low.
+ * And it may introduce much complication for another users.
+ */
+func AddCustomShutdownCallback(callback func())  {
+	customShutdownCallbacks.PushBack(callback)
+}
+
+func GetAllCustomShutdownCallbacks() *list.List {
+	return customShutdownCallbacks
+}
diff --git a/config/config_loader.go b/config/config_loader.go
index b737d3f23..2747a8365 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -149,6 +149,7 @@ func Load() {
 			}
 		}
 	}
+	GracefulShutdownInit()
 }
 
 // get rpc service for consumer
diff --git a/config/consumer_config.go b/config/consumer_config.go
index b1ebdd5d8..54b87e22f 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -57,6 +57,7 @@ type ConsumerConfig struct {
 	References   map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
 	ProtocolConf interface{}                 `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
 	FilterConf   interface{}                 `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
+	ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" `
 }
 
 func (c *ConsumerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
new file mode 100644
index 000000000..cb198c125
--- /dev/null
+++ b/config/graceful_shutdown.go
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+	"os"
+	"os/signal"
+	"strconv"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+/*
+ * The key point is that find out the signals to handle.
+ * The most important documentation is https://golang.org/pkg/os/signal/
+ * From this documentation, we can know that:
+ * 1. The signals SIGKILL and SIGSTOP may not be caught by signal package;
+ * 2. SIGHUP, SIGINT, or SIGTERM signal causes the program to exit
+ * 3. SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGSTKFLT, SIGEMT, or SIGSYS signal causes the program to exit with a stack dump
+ * 4. The invocation of Notify(signal...) will disable the default behavior of those signals.
+ *
+ * So the signals SIGKILL, SIGSTOP, SIGHUP, SIGINT, SIGTERM, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGSTKFLT, SIGEMT, SIGSYS
+ * should be processed.
+ * It's seems that the Unix/Linux does not have the signal SIGSTKFLT. https://github.com/golang/go/issues/33381
+ * So this signal will be ignored.
+ *
+ */
+var gracefulShutdownOnce = sync.Once{}
+
+func GracefulShutdownInit() {
+
+	signals := make(chan os.Signal, 1)
+
+	signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
+		syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
+		syscall.SIGABRT, syscall.SIGEMT, syscall.SIGSYS,
+	)
+	for {
+		sig := <-signals
+
+		gracefulShutdownOnce.Do(func() {
+			logger.Infof("get signal %s, application is shutdown", sig.String())
+
+			destroyAllRegistries()
+			// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
+			// The value of configuration depends on how long the clients will get notification.
+			waitAndAcceptNewRequests()
+
+			// after this step, the new request will be rejected because the server is shutdown.
+			destroyProviderProtocols()
+
+			//
+
+
+			logger.Infof("Execute the custom callbacks.")
+			customCallbacks := extension.GetAllCustomShutdownCallbacks()
+			for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
+				callback.Value.(func())()
+			}
+		})
+	}
+}
+
+func destroyAllRegistries() {
+	registryProtocol := extension.GetProtocol(constant.REGISTRY_KEY)
+	registryProtocol.Destroy()
+}
+
+func destroyConsumerProtocols() {
+	if consumerConfig == nil || consumerConfig.ProtocolConf == nil {
+		return
+	}
+	destroyProtocols(consumerConfig.ProtocolConf)
+}
+func destroyProviderProtocols() {
+	if providerConfig == nil || providerConfig.ProtocolConf == nil {
+		return
+	}
+	destroyProtocols(providerConfig.ProtocolConf)
+}
+
+func destroyProtocols(protocolConf interface{}) {
+	protocols := protocolConf.(map[interface{}]interface{})
+	for name, _ := range protocols {
+		protocol := extension.GetProtocol(name.(string))
+		protocol.Destroy()
+	}
+}
+
+func waitAndAcceptNewRequests() {
+
+	if providerConfig == nil || providerConfig.ShutdownConfig == nil {
+		return
+	}
+	shutdownConfig := providerConfig.ShutdownConfig
+
+	timeout, err := strconv.ParseInt(shutdownConfig.AcceptNewRequestsTimeout, 0, 0)
+	if err != nil {
+		logger.Errorf("The timeout configuration of keeping accept new requests is invalid. Go next step!", err)
+		return
+	}
+
+	// ignore this phase
+	if timeout < 0 {
+		return
+	}
+
+	var duration = time.Duration(timeout) * time.Millisecond
+
+	time.Sleep(duration)
+}
+
+/**
+ * this method will wait a short time until timeout or all requests have been processed.
+ * this implementation use the active filter, so you need to add the filter into your application configuration
+ * for example:
+ * server.yml or client.yml
+ *
+ * filter: "active",
+ *
+ * see the ActiveFilter for more detail.
+ * We use the bigger value between consumer's config and provider's config
+ * if the application is both consumer and provider.
+ * This method's behavior is a little bit complicated.
+ */
+func waitForProcessingRequest()  {
+	var timeout int64 = 0
+	if providerConfig != nil && providerConfig.ShutdownConfig != nil {
+		timeout = waitingProcessedTimeout(providerConfig.ShutdownConfig)
+	}
+
+	if consumerConfig != nil && consumerConfig.ShutdownConfig != nil {
+		consumerTimeout := waitingProcessedTimeout(consumerConfig.ShutdownConfig)
+		if consumerTimeout > timeout {
+			timeout = consumerTimeout
+		}
+	}
+	if timeout <= 0{
+		return
+	}
+
+	timeout = timeout * time.Millisecond.Nanoseconds()
+
+	start := time.Now().UnixNano()
+
+	for time.Now().UnixNano() - start < timeout && protocol.GetTotalActive() > 0  {
+		// sleep 10 ms and then we check it again
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) int64 {
+	if len(shutdownConfig.WaitingProcessRequestsTimeout) <=0 {
+		return 0
+	}
+	config, err := strconv.ParseInt(shutdownConfig.WaitingProcessRequestsTimeout, 0, 0)
+	if err != nil {
+		logger.Errorf("The configuration of shutdownConfig.WaitingProcessRequestsTimeout is invalid: %s",
+			shutdownConfig.WaitingProcessRequestsTimeout)
+		return 0
+	}
+	return config
+}
diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go
new file mode 100644
index 000000000..9c52044e3
--- /dev/null
+++ b/config/graceful_shutdown_config.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+
+type ShutdownConfig struct {
+	// it's the total timeout configuration. After that, the application will exit.
+	Timeout                       string `yaml:"timeout" json:"timeout,omitempty" property:"timout"`
+	AcceptNewRequestsTimeout      string `yaml:"requests.accept.timeout" json:"requests.accept.timeout,omitempty" property:"requests.accept.timeout"`
+	WaitingProcessRequestsTimeout string `yaml:"requests.reject.timeout" json:"requests.reject.timeout,omitempty" property:"requests.reject.timeout"`
+}
diff --git a/config/provider_config.go b/config/provider_config.go
index 00faa1d0a..d1de691be 100644
--- a/config/provider_config.go
+++ b/config/provider_config.go
@@ -44,11 +44,12 @@ type ProviderConfig struct {
 
 	ApplicationConfig *ApplicationConfig         `yaml:"application" json:"application,omitempty" property:"application"`
 	Registry          *RegistryConfig            `yaml:"registry" json:"registry,omitempty" property:"registry"`
-	Registries        map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"`
-	Services          map[string]*ServiceConfig  `yaml:"services" json:"services,omitempty" property:"services"`
-	Protocols         map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
-	ProtocolConf      interface{}                `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
-	FilterConf        interface{}                `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
+	Registries     map[string]*RegistryConfig `yaml:"registries" json:"registries,omitempty" property:"registries"`
+	Services       map[string]*ServiceConfig  `yaml:"services" json:"services,omitempty" property:"services"`
+	Protocols      map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
+	ProtocolConf   interface{}                `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
+	FilterConf     interface{}                `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
+	ShutdownConfig *ShutdownConfig            `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" `
 }
 
 func (c *ProviderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
@@ -106,6 +107,7 @@ func ProviderInit(confProFile string) error {
 	}
 
 	logger.Debugf("provider config{%#v}\n", providerConfig)
+
 	return nil
 }
 
diff --git a/go.sum b/go.sum
index bcde5b1f8..b730b7868 100644
--- a/go.sum
+++ b/go.sum
@@ -37,8 +37,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod
 github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
 github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa h1:11TO1wiM5bvGAVrmfN5atD8gZqUSPE1TBoIs8sI6Abk=
 github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
-github.com/apache/dubbo-go-hessian2 v1.3.0 h1:ZhQYDm8GHqIp6i53T4ZJHQBN11nAYAjxlwoVznfyvD8=
-github.com/apache/dubbo-go-hessian2 v1.3.0/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
@@ -104,8 +102,6 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF
 github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
 github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk=
 github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
-github.com/dubbogo/getty v1.3.0 h1:GImOCANdts7dlRqi9GMVsZJnfst9EPyjTVTR1AesOD8=
-github.com/dubbogo/getty v1.3.0/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU=
 github.com/dubbogo/getty v1.3.1 h1:9fehwTo/D6+z6/+kADMbhbKeMkP80o/3g+XwV5lFLTY=
 github.com/dubbogo/getty v1.3.1/go.mod h1:dtLOEb1v6EMHsQNYRWEACiRLmTWB2kJGUAj1aXayPOg=
 github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go
index 3a8bfbc87..4f7e63f9f 100644
--- a/protocol/rpc_status.go
+++ b/protocol/rpc_status.go
@@ -73,3 +73,17 @@ func beginCount0(rpcStatus *RpcStatus) {
 func endCount0(rpcStatus *RpcStatus) {
 	atomic.AddInt32(&rpcStatus.active, -1)
 }
+
+func GetTotalActive() int32 {
+	var result int32 = 0
+	methodStatistics.Range(func(_, value interface{}) bool {
+		statics := value.(*sync.Map)
+		statics.Range(func(_, value interface{}) bool {
+			rpcStatus := value.(*RpcStatus)
+			result = result + rpcStatus.active
+			return true
+		})
+		return true
+	})
+	return result
+}
-- 
GitLab