diff --git a/CHANGE.md b/CHANGE.md index cdfca4fb6d5d7106ed0c56e40cf40647607c3015..947a695ca854fe8c3d91d8ea989b52dcddbe1523 100644 --- a/CHANGE.md +++ b/CHANGE.md @@ -1,5 +1,43 @@ # Release Notes +## 1.2.0 + +### New Features + +- Add etcdv3 registry support<https://github.com/apache/dubbo-go/pull/148> +- Add nacos registry support<https://github.com/apache/dubbo-go/pull/151> +- Add fail fast cluster support<https://github.com/apache/dubbo-go/pull/140> +- Add available cluster support<https://github.com/apache/dubbo-go/pull/155> +- Add broadcast cluster support<https://github.com/apache/dubbo-go/pull/158> +- Add forking cluster support<https://github.com/apache/dubbo-go/pull/161> +- Add service token authorization support<https://github.com/apache/dubbo-go/pull/202> +- Add accessLog filter support<https://github.com/apache/dubbo-go/pull/214> +- Add tps limit support<https://github.com/apache/dubbo-go/pull/237> +- Add execute limit support<https://github.com/apache/dubbo-go/pull/246> +- Move callService to invoker & support attachments<https://github.com/apache/dubbo-go/pull/193> +- Move example in dubbo-go project away<https://github.com/apache/dubbo-go/pull/228> +- Support dynamic config center which compatible with dubbo 2.6.x & 2.7.x and commit the zookeeper impl<https://github.com/apache/dubbo-go/pull/194> + +### Enhancement + +- Split gettyRPCClient.close and gettyRPCClientPool.remove in protocol/dubbo/pool.go<https://github.com/apache/dubbo-go/pull/186> +- Remove client from pool before closing it<https://github.com/apache/dubbo-go/pull/190> +- Enhance the logic for fetching the local address<https://github.com/apache/dubbo-go/pull/209> +- Add protocol_conf default values<https://github.com/apache/dubbo-go/pull/221> +- Add task pool for getty<https://github.com/apache/dubbo-go/pull/141> +- Update getty: remove read queue<https://github.com/apache/dubbo-go/pull/137> +- Clean heartbeat from PendingResponse<https://github.com/apache/dubbo-go/pull/166> + +### Bugfixes + +- GettyRPCClientPool remove deadlock<https://github.com/apache/dubbo-go/pull/183/files> +- Fix failover cluster bug and url parameter retries change int to string type<https://github.com/apache/dubbo-go/pull/195> +- Fix url params unsafe map<https://github.com/apache/dubbo-go/pull/201> +- Read protocol config by map key in config yaml instead of protocol name<https://github.com/apache/dubbo-go/pull/218> +- Fix dubbo group issues #238<https://github.com/apache/dubbo-go/pull/243>/<https://github.com/apache/dubbo-go/pull/244> +- Fix bug in reference_config<https://github.com/apache/dubbo-go/pull/157> +- Fix high memory bug in zookeeper listener<https://github.com/apache/dubbo-go/pull/168> + ## 1.1.0 ### New Features diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go index 7bde83ea66a49f9317732ec46da0f11800f846eb..46b7b28e0299b669f5ec48ed024e7aa80c39e3d8 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -79,8 +79,10 @@ type rest struct { func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result { count++ - var success bool - var err error = nil + var ( + success bool + err error + ) if count >= bi.successCount { success = true } else { diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go index 930b4bb16628e2b363659a65fc174543b7f2cf6e..234995b8e522124fe9beff0937ca23a63aa63844 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -42,8 +42,8 @@ var ( failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") ) -// register_failsafe register failsafeCluster to cluster extension. -func register_failsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { +// registerFailsafe register failsafeCluster to cluster extension. +func registerFailsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) failsafeCluster := NewFailsafeCluster() @@ -62,7 +62,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := register_failsafe(t, invoker) + clusterInvoker := registerFailsafe(t, invoker) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() @@ -81,7 +81,7 @@ func Test_FailSafeInvokeFail(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := register_failsafe(t, invoker) + clusterInvoker := registerFailsafe(t, invoker) invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go index c830079ff6d3c29c3385eda289782f5e52877be2..058d7fefd6edf6c43e5eda4b8f2f6a9c161189e2 100644 --- a/cluster/cluster_impl/forking_cluster_invoker.go +++ b/cluster/cluster_impl/forking_cluster_invoker.go @@ -19,7 +19,6 @@ package cluster_impl import ( "context" - "errors" "fmt" "time" ) @@ -45,6 +44,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { } } +// Invoke ... func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { err := invoker.checkWhetherDestroyed() if err != nil { @@ -87,14 +87,18 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) if err != nil { return &protocol.RPCResult{ - Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but no luck to perform the invocation. Last error is: %s", selected, err.Error()))} + Err: fmt.Errorf("failed to forking invoke provider %v, "+ + "but no luck to perform the invocation. Last error is: %v", selected, err), + } } if len(rsps) == 0 { - return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but no resp", selected))} + return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)} } + result, ok := rsps[0].(protocol.Result) if !ok { - return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but not legal resp", selected))} + return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)} } + return result } diff --git a/cluster/directory.go b/cluster/directory.go index 045296ce549b13c327b6f479ca0bd75d0b6ce131..5a03b3a4490ce0b3aadece8a9ef43395f845dd12 100644 --- a/cluster/directory.go +++ b/cluster/directory.go @@ -22,7 +22,8 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// Extension - Directory +// Directory +//Extension - Directory type Directory interface { common.Node List(invocation protocol.Invocation) []protocol.Invoker diff --git a/cluster/loadbalance.go b/cluster/loadbalance.go index 9ae4e4eb808b28581d12b72829c921c4f0cc9ac8..fb3641a77377eabbd692729a32e2c0c096282f18 100644 --- a/cluster/loadbalance.go +++ b/cluster/loadbalance.go @@ -21,7 +21,8 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// Extension - LoadBalance +// LoadBalance +//Extension - LoadBalance type LoadBalance interface { Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker } diff --git a/cluster/loadbalance/consistent_hash.go b/cluster/loadbalance/consistent_hash.go index 8c5f8a5001347d10da4347827c1935ddda1f8a86..957c110663d6c56ada15543d372e210fa83bf74b 100644 --- a/cluster/loadbalance/consistent_hash.go +++ b/cluster/loadbalance/consistent_hash.go @@ -36,9 +36,12 @@ import ( ) const ( + // ConsistentHash ... ConsistentHash = "consistenthash" - HashNodes = "hash.nodes" - HashArguments = "hash.arguments" + // HashNodes ... + HashNodes = "hash.nodes" + // HashArguments ... + HashArguments = "hash.arguments" ) var ( diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go index 773bb9323f02349a221a754f256b6c50ac2911a2..e7c41aac93e8d3dfcef5d49fa486483bd045f569 100644 --- a/cluster/loadbalance/least_active.go +++ b/cluster/loadbalance/least_active.go @@ -28,6 +28,7 @@ import ( ) const ( + // LeastActive ... LeastActive = "leastactive" ) @@ -53,12 +54,12 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation } var ( - leastActive int32 = -1 // The least active value of all invokers - totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) - firstWeight int64 = 0 // Initial value, used for comparison - leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE) - leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) - sameWeight = true // Every invoker has the same weight value? + leastActive int32 = -1 // The least active value of all invokers + totalWeight int64 // The number of invokers having the same least active value (LEAST_ACTIVE) + firstWeight int64 // Initial value, used for comparison + leastCount int // The number of invokers having the same least active value (LEAST_ACTIVE) + leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE) + sameWeight = true // Every invoker has the same weight value? ) for i := 0; i < count; i++ { diff --git a/cluster/loadbalance/round_robin.go b/cluster/loadbalance/round_robin.go index 653e42c3b5d08cbefb25db98278fb6afa6f02c96..4d039999677aefb1093071666a845279dc357ce9 100644 --- a/cluster/loadbalance/round_robin.go +++ b/cluster/loadbalance/round_robin.go @@ -31,16 +31,19 @@ import ( ) const ( + // RoundRobin ... RoundRobin = "roundrobin" + // COMPLETE ... COMPLETE = 0 + // UPDATING ... UPDATING = 1 ) var ( - methodWeightMap sync.Map // [string]invokers - state int32 = COMPLETE // update lock acquired ? - recyclePeriod int64 = 60 * time.Second.Nanoseconds() + methodWeightMap sync.Map // [string]invokers + state = int32(COMPLETE) // update lock acquired ? + recyclePeriod = 60 * time.Second.Nanoseconds() ) func init() { diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 054bd0b6890a210ca20805ed4b1977699cf3152e..072da585fe9ae90d8e0333f03d189d3cc410219d 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -93,15 +93,19 @@ type rest struct { var count int -func (bi *MockInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { count++ - var success bool - var err error = nil + + var ( + success bool + err error + ) if count >= bi.successCount { success = true } else { err = perrors.New("error") } + result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} return result } diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go index da4215123cd444857c8c48085b2a37bef752c3b6..bc6861f29249bd7c02d774fed9471d4effebc262 100644 --- a/cluster/router/condition/router.go +++ b/cluster/router/condition/router.go @@ -38,9 +38,12 @@ import ( ) const ( + //ROUTE_PATTERN route pattern regex ROUTE_PATTERN = `([&!=,]*)\\s*([^&!=,\\s]+)` + // FORCE ... FORCE = "force" ENABLED = "enabled" + // PRIORITY ... PRIORITY = "priority" ) @@ -289,15 +292,15 @@ func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U if len(sampleValue) > 0 { if !matchPair.isMatch(sampleValue, param) { return false, nil - } else { - result = true } + + result = true } else { if !(matchPair.Matches.Empty()) { return false, nil - } else { - result = true } + + result = true } } return result, nil diff --git a/common/config/environment.go b/common/config/environment.go index 256741b99968b292330b26cd6c46f6ee421a55a2..071af31152ba4ce3c579f70aa23df59d718ce506 100644 --- a/common/config/environment.go +++ b/common/config/environment.go @@ -27,6 +27,7 @@ import ( "github.com/apache/dubbo-go/config_center" ) +// Environment // There is dubbo.properties file and application level config center configuration which higner than normal config center in java. So in java the // configuration sequence will be config center > application level config center > dubbo.properties > spring bean configuration. // But in go, neither the dubbo.properties file or application level config center configuration will not support for the time being. @@ -82,11 +83,11 @@ func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string // Configuration ... func (env *Environment) Configuration() *list.List { - list := list.New() + cfgList := list.New() // The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration - list.PushFront(newInmemoryConfiguration(&(env.externalConfigMap))) - list.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap))) - return list + cfgList.PushFront(newInmemoryConfiguration(&(env.externalConfigMap))) + cfgList.PushFront(newInmemoryConfiguration(&(env.appExternalConfigMap))) + return cfgList } // SetDynamicConfiguration ... diff --git a/common/constant/env.go b/common/constant/env.go index 77680e377e33d2d575843728cedd536da798782d..c899634409b1a941f18642046490a322786bdd2b 100644 --- a/common/constant/env.go +++ b/common/constant/env.go @@ -18,8 +18,11 @@ package constant const ( + // CONF_CONSUMER_FILE_PATH ... CONF_CONSUMER_FILE_PATH = "CONF_CONSUMER_FILE_PATH" + // CONF_PROVIDER_FILE_PATH ... CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH" - APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" + // APP_LOG_CONF_FILE ... + APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" CONF_ROUTER_FILE_PATH = "CONF_ROUTER_FILE_PATH" ) diff --git a/common/constant/version.go b/common/constant/version.go index 8ef9fae2c6088e78007abf8f7ddd81cc363c4ec3..730224376054a36b0c7cfeda7d5ea5e7ce058618 100644 --- a/common/constant/version.go +++ b/common/constant/version.go @@ -18,7 +18,10 @@ package constant const ( + // Version apache/dubbo-go version Version = "1.3.0" - Name = "dubbogo" - DATE = "2020/01/12" + // Name module name + Name = "dubbogo" + // Date release date + DATE = "2020/01/12" ) diff --git a/common/extension/configurator.go b/common/extension/configurator.go index 63bcc8c55dc48ce1feb43ea0dc82172f6ea48526..de98f8a260ea1f3a2e2a1f32c82dc869585e2789 100644 --- a/common/extension/configurator.go +++ b/common/extension/configurator.go @@ -22,7 +22,10 @@ import ( "github.com/apache/dubbo-go/config_center" ) -const DefaultKey = "default" +const ( + // DefaultKey ... + DefaultKey = "default" +) type getConfiguratorFunc func(url *common.URL) config_center.Configurator diff --git a/common/extension/graceful_shutdown.go b/common/extension/graceful_shutdown.go index bc03a2ff4a440aabfef4233374308fc486f5618e..3abd75c0aa328f3553c3d83340ae440b8dfe3356 100644 --- a/common/extension/graceful_shutdown.go +++ b/common/extension/graceful_shutdown.go @@ -26,6 +26,7 @@ var ( ) /** + * AddCustomShutdownCallback * you should not make any assumption about the order. * For example, if you have more than one callbacks, and you wish the order is: * callback1() diff --git a/common/extension/proxy_factory.go b/common/extension/proxy_factory.go index 7b9a5b860ba1413f69d46e0657b1145e3c285304..19826bb0560ea0d3fa471c04873b20a6878f57d8 100644 --- a/common/extension/proxy_factory.go +++ b/common/extension/proxy_factory.go @@ -22,12 +22,12 @@ import ( ) var ( - proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) + proxyFactories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) ) // SetProxyFactory ... func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { - proxy_factories[name] = f + proxyFactories[name] = f } // GetProxyFactory ... @@ -35,8 +35,8 @@ func GetProxyFactory(name string) proxy.ProxyFactory { if name == "" { name = "default" } - if proxy_factories[name] == nil { + if proxyFactories[name] == nil { panic("proxy factory for " + name + " is not existing, make sure you have import the package.") } - return proxy_factories[name]() + return proxyFactories[name]() } diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index d0be491d406170ea4c52e65f70f0dfbe7b1b3cb6..43ca720d0e71577a446829f702c1d2fe23a32905 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -40,7 +40,9 @@ type Proxy struct { once sync.Once } -var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() +var ( + typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() +) // NewProxy ... func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy { @@ -51,6 +53,7 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str } } +// Implement // proxy implement // In consumer, RPCService like: // type XxxProvider struct { diff --git a/common/rpc_service.go b/common/rpc_service.go index b819cf28f5c2499d39ac3bcd977c0d1b442daff5..b235c32abc9a971d7144605c8b4b82953ac8f3c4 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -34,19 +34,22 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -// rpc service interface +// RPCService +//rpc service interface type RPCService interface { - Reference() string // rpc service id or reference id + // Reference: + // rpc service id or reference id + Reference() string } //AsyncCallbackService callback interface for async type AsyncCallbackService interface { - CallBack(response CallbackResponse) // callback + // Callback: callback + CallBack(response CallbackResponse) } //CallbackResponse for different protocol -type CallbackResponse interface { -} +type CallbackResponse interface{} //AsyncCallback async callback method type AsyncCallback func(response CallbackResponse) @@ -55,13 +58,17 @@ type AsyncCallback func(response CallbackResponse) // func MethodMapper() map[string][string] { // return map[string][string]{} // } -const METHOD_MAPPER = "MethodMapper" +const ( + // METHOD_MAPPER ... + METHOD_MAPPER = "MethodMapper" +) var ( // Precompute the reflect type for error. Can't use error directly // because Typeof takes an empty interface value. This is annoying. typeOfError = reflect.TypeOf((*error)(nil)).Elem() + // ServiceMap ... // todo: lowerecas? ServiceMap = &serviceMap{ serviceMap: make(map[string]map[string]*Service), @@ -226,8 +233,8 @@ func (sm *serviceMap) UnRegister(protocol, serviceId string) error { // Is this an exported - upper case - name func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) + s, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(s) } // Is this type exported or a builtin? diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index 7df039b905d3cc064c5d6d9404fc874cf693dac9..8c9b9d15cdd4061dbe2f445b5fff7a868e5ae67e 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -122,9 +122,8 @@ func TestServiceMap_UnRegister(t *testing.T) { func TestMethodType_SuiteContext(t *testing.T) { mt := &MethodType{ctxType: reflect.TypeOf(context.TODO())} - c := context.TODO() - c = context.WithValue(c, "key", "value") - assert.Equal(t, reflect.ValueOf(c), mt.SuiteContext(c)) + ctx := context.WithValue(context.Background(), "key", "value") + assert.Equal(t, reflect.ValueOf(ctx), mt.SuiteContext(ctx)) assert.Equal(t, reflect.Zero(mt.ctxType), mt.SuiteContext(nil)) } diff --git a/common/url.go b/common/url.go index ff3ca4952e1cfdb35d33d4faf6ca9e75adfefde1..78e766b87902313a6ed2a60eda9919d88fd2af30 100644 --- a/common/url.go +++ b/common/url.go @@ -45,16 +45,22 @@ import ( // dubbo role type ///////////////////////////////// +// role constant const ( + // CONSUMER ... CONSUMER = iota + // CONFIGURATOR ... CONFIGURATOR + // ROUTER ... ROUTER + // PROVIDER ... PROVIDER ) var ( // DubboNodes ... DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} + // DubboRole ... DubboRole = [...]string{"consumer", "", "routers", "provider"} ) @@ -478,7 +484,7 @@ func (c URL) GetRawParam(key string) string { } } -// GetParamBool +// GetParamBool ... func (c URL) GetParamBool(s string, d bool) bool { var r bool diff --git a/config/base_config.go b/config/base_config.go index bfc4b02dcaa88a66af7740d432806a22bcdd3b93..c4f9947eb69ff28c64bb2b6ace738f8ed9251a5d 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package config import ( @@ -93,6 +94,9 @@ func (c *BaseConfig) prepareEnvironment() error { configFile = c.ConfigCenterConfig.ConfigFile } appContent, err = dynamicConfig.GetProperties(configFile, config_center.WithGroup(appGroup)) + if err != nil { + return perrors.WithStack(err) + } } //global config file mapContent, err := dynamicConfig.Parser().Parse(content) @@ -298,8 +302,8 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC func (c *BaseConfig) fresh() { configList := config.GetEnvInstance().Configuration() for element := configList.Front(); element != nil; element = element.Next() { - config := element.Value.(*config.InmemoryConfiguration) - c.freshInternalConfig(config) + cfg := element.Value.(*config.InmemoryConfiguration) + c.freshInternalConfig(cfg) } } diff --git a/config/config_loader.go b/config/config_loader.go index 36dd78ae235d4d849e00be421ea0556039500620..7f72ebc857602cef1af46f0ab497818d1a04992c 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -80,7 +80,7 @@ func checkApplicationName(config *ApplicationConfig) { } } -// Dubbo Init +// Load Dubbo Init func Load() { // reference config if consumerConfig == nil { @@ -164,12 +164,12 @@ func Load() { GracefulShutdownInit() } -// get rpc service for consumer +// GetRPCService get rpc service for consumer func GetRPCService(name string) common.RPCService { return consumerConfig.References[name].GetRPCService() } -// create rpc service for consumer +// RPCService create rpc service for consumer func RPCService(service common.RPCService) { consumerConfig.References[service.Reference()].Implement(service) } diff --git a/config/consumer_config.go b/config/consumer_config.go index 7a35d8ef7541a005c972ac261e9e95cf7b703ea5..7756f3b51c0f46a19687affb4dc6eadf9ef711c7 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package config import ( diff --git a/config/generic_service.go b/config/generic_service.go index e0171418ceaa72ecb9ad64055781baa2e5afdc30..9895486e977a9848e576597f31b724d51d144d4e 100644 --- a/config/generic_service.go +++ b/config/generic_service.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package config // GenericService ... diff --git a/config/graceful_shutdown_signal_darwin.go b/config/graceful_shutdown_signal_darwin.go index c6932bf981d8857615f19c8ead3ef0f93dd74358..8ad79ffa62ceed4096c60bfb9139b7ff1586808e 100644 --- a/config/graceful_shutdown_signal_darwin.go +++ b/config/graceful_shutdown_signal_darwin.go @@ -22,11 +22,13 @@ import ( "syscall" ) -// ShutdownSignals ... -var ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP, - syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, - syscall.SIGABRT, syscall.SIGSYS} +var ( + // ShutdownSignals ... + ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP, + syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, + syscall.SIGABRT, syscall.SIGSYS} -// DumpHeapShutdownSignals ... -var DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, - syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS} + // DumpHeapShutdownSignals ... + DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, + syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS} +) diff --git a/config/graceful_shutdown_signal_linux.go b/config/graceful_shutdown_signal_linux.go index 59c1a5d149c2e9db8e9ac981adec107cafc863ad..8ad79ffa62ceed4096c60bfb9139b7ff1586808e 100644 --- a/config/graceful_shutdown_signal_linux.go +++ b/config/graceful_shutdown_signal_linux.go @@ -22,9 +22,13 @@ import ( "syscall" ) -var ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP, - syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, - syscall.SIGABRT, syscall.SIGSYS} +var ( + // ShutdownSignals ... + ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP, + syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, + syscall.SIGABRT, syscall.SIGSYS} -var DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, - syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS} + // DumpHeapShutdownSignals ... + DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, + syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS} +) diff --git a/config/graceful_shutdown_signal_windows.go b/config/graceful_shutdown_signal_windows.go index 91b2bce7c2311ecbe9a1255be3e7b7b357a9b403..815a05ecb20a8fc202debaf6f39d699845cd689e 100644 --- a/config/graceful_shutdown_signal_windows.go +++ b/config/graceful_shutdown_signal_windows.go @@ -22,8 +22,12 @@ import ( "syscall" ) -var ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, - syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, - syscall.SIGABRT} +var ( + // ShutdownSignals ... + ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, + syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, + syscall.SIGABRT} -var DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT} + // DumpHeapShutdownSignals ... + DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT} +) diff --git a/config/method_config.go b/config/method_config.go index 6dd8099a6310a861d6645d478b0c1688fcdebf77..8f196d9e2c03071a663db03cb185fb9106d6484a 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package config import ( @@ -43,11 +44,11 @@ type MethodConfig struct { // Prefix ... func (c *MethodConfig) Prefix() string { - if c.InterfaceId != "" { + if len(c.InterfaceId) != 0 { return constant.DUBBO + "." + c.InterfaceName + "." + c.InterfaceId + "." + c.Name + "." - } else { - return constant.DUBBO + "." + c.InterfaceName + "." + c.Name + "." } + + return constant.DUBBO + "." + c.InterfaceName + "." + c.Name + "." } // UnmarshalYAML ... diff --git a/config/protocol_config.go b/config/protocol_config.go index 9495a7fd892354f2b7611a73760b0a2885794534..4828d6e5bd28de19d896340f39c5633d0acd4874 100644 --- a/config/protocol_config.go +++ b/config/protocol_config.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package config import ( diff --git a/config/provider_config.go b/config/provider_config.go index 537608d4b51e5a24b269f9baa295764f7c6330ed..0bfa78647b58d9b6eb961adc5485207faffe1e1e 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package config import ( diff --git a/config/reference_config.go b/config/reference_config.go index e3fe856b1228d445c0f53c56991f5eb5a6fb2d34..07b7e8f125c4a31b668d06bfcea8cb835385fd9a 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -70,13 +70,13 @@ func (c *ReferenceConfig) Prefix() string { return constant.ReferenceConfigPrefix + c.InterfaceName + "." } -// The only way to get a new ReferenceConfig +// NewReferenceConfig The only way to get a new ReferenceConfig func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig { return &ReferenceConfig{id: id, context: ctx} } // UnmarshalYAML ... -func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { +func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { type rf ReferenceConfig raw := rf{} // Put your defaults here @@ -84,8 +84,8 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro return err } - *refconfig = ReferenceConfig(raw) - if err := defaults.Set(refconfig); err != nil { + *c = ReferenceConfig(raw) + if err := defaults.Set(c); err != nil { return err } @@ -93,49 +93,50 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro } // Refer ... -func (refconfig *ReferenceConfig) Refer(impl interface{}) { - url := common.NewURLWithOptions(common.WithPath(refconfig.id), - common.WithProtocol(refconfig.Protocol), - common.WithParams(refconfig.getUrlMap()), - common.WithParamsValue(constant.BEAN_NAME_KEY, refconfig.id), +func (c *ReferenceConfig) Refer(_ interface{}) { + cfgURL := common.NewURLWithOptions( + common.WithPath(c.id), + common.WithProtocol(c.Protocol), + common.WithParams(c.getUrlMap()), + common.WithParamsValue(constant.BEAN_NAME_KEY, c.id), ) //1. user specified URL, could be peer-to-peer address, or register center's address. - if refconfig.Url != "" { - urlStrings := gxstrings.RegSplit(refconfig.Url, "\\s*[;]+\\s*") + if c.Url != "" { + urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*") for _, urlStr := range urlStrings { serviceUrl, err := common.NewURL(context.Background(), urlStr) if err != nil { panic(fmt.Sprintf("user specified URL %v refer error, error message is %v ", urlStr, err.Error())) } if serviceUrl.Protocol == constant.REGISTRY_PROTOCOL { - serviceUrl.SubURL = url - refconfig.urls = append(refconfig.urls, &serviceUrl) + serviceUrl.SubURL = cfgURL + c.urls = append(c.urls, &serviceUrl) } else { if serviceUrl.Path == "" { - serviceUrl.Path = "/" + refconfig.id + serviceUrl.Path = "/" + c.id } // merge url need to do - newUrl := common.MergeUrl(&serviceUrl, url) - refconfig.urls = append(refconfig.urls, newUrl) + newUrl := common.MergeUrl(&serviceUrl, cfgURL) + c.urls = append(c.urls, newUrl) } } } else { //2. assemble SubURL from register center's configuration妯″紡 - refconfig.urls = loadRegistries(refconfig.Registry, consumerConfig.Registries, common.CONSUMER) + c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER) //set url to regUrls - for _, regUrl := range refconfig.urls { - regUrl.SubURL = url + for _, regUrl := range c.urls { + regUrl.SubURL = cfgURL } } - if len(refconfig.urls) == 1 { - refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0]) + if len(c.urls) == 1 { + c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0]) } else { invokers := []protocol.Invoker{} var regUrl *common.URL - for _, u := range refconfig.urls { + for _, u := range c.urls { invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u)) if u.Protocol == constant.REGISTRY_PROTOCOL { regUrl = u @@ -143,53 +144,54 @@ func (refconfig *ReferenceConfig) Refer(impl interface{}) { } if regUrl != nil { cluster := extension.GetCluster("registryAware") - refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } else { - cluster := extension.GetCluster(refconfig.Cluster) - refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + cluster := extension.GetCluster(c.Cluster) + c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } } //create proxy - if refconfig.Async { - callback := GetCallback(refconfig.id) - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(refconfig.invoker, callback, url) + if c.Async { + callback := GetCallback(c.id) + c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL) } else { - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL) } } +// Implement // @v is service provider implemented RPCService -func (refconfig *ReferenceConfig) Implement(v common.RPCService) { - refconfig.pxy.Implement(v) +func (c *ReferenceConfig) Implement(v common.RPCService) { + c.pxy.Implement(v) } // GetRPCService ... -func (refconfig *ReferenceConfig) GetRPCService() common.RPCService { - return refconfig.pxy.Get() +func (c *ReferenceConfig) GetRPCService() common.RPCService { + return c.pxy.Get() } -func (refconfig *ReferenceConfig) getUrlMap() url.Values { +func (c *ReferenceConfig) getUrlMap() url.Values { urlMap := url.Values{} //first set user params - for k, v := range refconfig.Params { + for k, v := range c.Params { urlMap.Set(k, v) } - urlMap.Set(constant.INTERFACE_KEY, refconfig.InterfaceName) + urlMap.Set(constant.INTERFACE_KEY, c.InterfaceName) urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) - urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster) - urlMap.Set(constant.LOADBALANCE_KEY, refconfig.Loadbalance) - urlMap.Set(constant.RETRIES_KEY, refconfig.Retries) - urlMap.Set(constant.GROUP_KEY, refconfig.Group) - urlMap.Set(constant.VERSION_KEY, refconfig.Version) - urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) + urlMap.Set(constant.CLUSTER_KEY, c.Cluster) + urlMap.Set(constant.LOADBALANCE_KEY, c.Loadbalance) + urlMap.Set(constant.RETRIES_KEY, c.Retries) + urlMap.Set(constant.GROUP_KEY, c.Group) + urlMap.Set(constant.VERSION_KEY, c.Version) + urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(c.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) - if len(refconfig.RequestTimeout) != 0 { - urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout) + if len(c.RequestTimeout) != 0 { + urlMap.Set(constant.TIMEOUT_KEY, c.RequestTimeout) } //getty invoke async or sync - urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) - urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky)) + urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(c.Async)) + urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(c.Sticky)) //application info urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name) @@ -202,12 +204,12 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { //filter var defaultReferenceFilter = constant.DEFAULT_REFERENCE_FILTERS - if refconfig.Generic { + if c.Generic { defaultReferenceFilter = constant.GENERIC_REFERENCE_FILTERS + "," + defaultReferenceFilter } - urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, defaultReferenceFilter)) + urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, c.Filter, defaultReferenceFilter)) - for _, v := range refconfig.Methods { + for _, v := range c.Methods { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky)) @@ -221,11 +223,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { } // GenericLoad ... -func (refconfig *ReferenceConfig) GenericLoad(id string) { - genericService := NewGenericService(refconfig.id) +func (c *ReferenceConfig) GenericLoad(id string) { + genericService := NewGenericService(c.id) SetConsumerService(genericService) - refconfig.id = id - refconfig.Refer(genericService) - refconfig.Implement(genericService) + c.id = id + c.Refer(genericService) + c.Implement(genericService) return } diff --git a/config/registry_config.go b/config/registry_config.go index b387f6fdb8755536b20e6e1130d0007938102a08..c347c2c2348018a66114c56a1c982d57d4f2783f 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -117,15 +117,16 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf return urls } -func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { +func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap := url.Values{} - urlMap.Set(constant.GROUP_KEY, regconfig.Group) + urlMap.Set(constant.GROUP_KEY, c.Group) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) - urlMap.Set(constant.REGISTRY_KEY, regconfig.Protocol) - urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, regconfig.TimeoutStr) - for k, v := range regconfig.Params { + urlMap.Set(constant.REGISTRY_KEY, c.Protocol) + urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + for k, v := range c.Params { urlMap.Set(k, v) } + return urlMap } diff --git a/config/service.go b/config/service.go index b2ff15c7895f357b501cb2d066de0a729e4f73a0..b7e7dc2a425b42363d570fc37a70e2e5094e7d9d 100644 --- a/config/service.go +++ b/config/service.go @@ -26,12 +26,12 @@ var ( proServices = map[string]common.RPCService{} // service name -> service ) -// SetConService is called by init() of implement of RPCService +// SetConsumerService is called by init() of implement of RPCService func SetConsumerService(service common.RPCService) { conServices[service.Reference()] = service } -// SetProService is called by init() of implement of RPCService +// SetProviderService is called by init() of implement of RPCService func SetProviderService(service common.RPCService) { proServices[service.Reference()] = service } diff --git a/config/service_config.go b/config/service_config.go index 2e947bb6c322e52c643569c5bc06ca10b2851ec5..37ec3a3ae611d60d71f5679c1d501bb699351849 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -92,7 +92,7 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } -// The only way to get a new ServiceConfig +// NewServiceConfig The only way to get a new ServiceConfig func NewServiceConfig(id string, context context.Context) *ServiceConfig { return &ServiceConfig{ @@ -105,67 +105,68 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { } // Export ... -func (srvconfig *ServiceConfig) Export() error { +func (c *ServiceConfig) Export() error { // TODO: config center start here // TODO:delay export - if srvconfig.unexported != nil && srvconfig.unexported.Load() { - err := perrors.Errorf("The service %v has already unexported! ", srvconfig.InterfaceName) + if c.unexported != nil && c.unexported.Load() { + err := perrors.Errorf("The service %v has already unexported! ", c.InterfaceName) logger.Errorf(err.Error()) return err } - if srvconfig.unexported != nil && srvconfig.exported.Load() { - logger.Warnf("The service %v has already exported! ", srvconfig.InterfaceName) + if c.unexported != nil && c.exported.Load() { + logger.Warnf("The service %v has already exported! ", c.InterfaceName) return nil } - regUrls := loadRegistries(srvconfig.Registry, providerConfig.Registries, common.PROVIDER) - urlMap := srvconfig.getUrlMap() - protocolConfigs := loadProtocol(srvconfig.Protocol, providerConfig.Protocols) + regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER) + urlMap := c.getUrlMap() + protocolConfigs := loadProtocol(c.Protocol, providerConfig.Protocols) if len(protocolConfigs) == 0 { - logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", srvconfig.InterfaceName, srvconfig.Protocol) + logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol) return nil } for _, proto := range protocolConfigs { // registry the service reflect - methods, err := common.ServiceMap.Register(proto.Name, srvconfig.rpcService) + methods, err := common.ServiceMap.Register(proto.Name, c.rpcService) if err != nil { - err := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", srvconfig.InterfaceName, proto.Name, err.Error()) + err := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", c.InterfaceName, proto.Name, err.Error()) logger.Errorf(err.Error()) return err } - url := common.NewURLWithOptions(common.WithPath(srvconfig.id), + ivkURL := common.NewURLWithOptions( + common.WithPath(c.id), common.WithProtocol(proto.Name), common.WithIp(proto.Ip), common.WithPort(proto.Port), common.WithParams(urlMap), - common.WithParamsValue(constant.BEAN_NAME_KEY, srvconfig.id), + common.WithParamsValue(constant.BEAN_NAME_KEY, c.id), common.WithMethods(strings.Split(methods, ",")), - common.WithToken(srvconfig.Token), + common.WithToken(c.Token), ) if len(regUrls) > 0 { for _, regUrl := range regUrls { - regUrl.SubURL = url + regUrl.SubURL = ivkURL - srvconfig.cacheMutex.Lock() - if srvconfig.cacheProtocol == nil { - logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", url)) - srvconfig.cacheProtocol = extension.GetProtocol("registry") + c.cacheMutex.Lock() + if c.cacheProtocol == nil { + logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", ivkURL)) + c.cacheProtocol = extension.GetProtocol("registry") } - srvconfig.cacheMutex.Unlock() + c.cacheMutex.Unlock() invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) - exporter := srvconfig.cacheProtocol.Export(invoker) + exporter := c.cacheProtocol.Export(invoker) if exporter == nil { - panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, url))) + panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL))) } } } else { - invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*url) + invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL) exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) if exporter == nil { - panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", url))) + panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL))) } } @@ -175,24 +176,24 @@ func (srvconfig *ServiceConfig) Export() error { } // Implement ... -func (srvconfig *ServiceConfig) Implement(s common.RPCService) { - srvconfig.rpcService = s +func (c *ServiceConfig) Implement(s common.RPCService) { + c.rpcService = s } -func (srvconfig *ServiceConfig) getUrlMap() url.Values { +func (c *ServiceConfig) getUrlMap() url.Values { urlMap := url.Values{} // first set user params - for k, v := range srvconfig.Params { + for k, v := range c.Params { urlMap.Set(k, v) } - urlMap.Set(constant.INTERFACE_KEY, srvconfig.InterfaceName) + urlMap.Set(constant.INTERFACE_KEY, c.InterfaceName) urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) - urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster) - urlMap.Set(constant.LOADBALANCE_KEY, srvconfig.Loadbalance) - urlMap.Set(constant.WARMUP_KEY, srvconfig.Warmup) - urlMap.Set(constant.RETRIES_KEY, srvconfig.Retries) - urlMap.Set(constant.GROUP_KEY, srvconfig.Group) - urlMap.Set(constant.VERSION_KEY, srvconfig.Version) + urlMap.Set(constant.CLUSTER_KEY, c.Cluster) + urlMap.Set(constant.LOADBALANCE_KEY, c.Loadbalance) + urlMap.Set(constant.WARMUP_KEY, c.Warmup) + urlMap.Set(constant.RETRIES_KEY, c.Retries) + urlMap.Set(constant.GROUP_KEY, c.Group) + urlMap.Set(constant.VERSION_KEY, c.Version) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) // application info urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name) @@ -204,22 +205,22 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment) // filter - urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, srvconfig.Filter, constant.DEFAULT_SERVICE_FILTERS)) + urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, c.Filter, constant.DEFAULT_SERVICE_FILTERS)) // filter special config - urlMap.Set(constant.ACCESS_LOG_KEY, srvconfig.AccessLog) + urlMap.Set(constant.ACCESS_LOG_KEY, c.AccessLog) // tps limiter - urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, srvconfig.TpsLimitStrategy) - urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, srvconfig.TpsLimitInterval) - urlMap.Set(constant.TPS_LIMIT_RATE_KEY, srvconfig.TpsLimitRate) - urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter) - urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler) + urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, c.TpsLimitStrategy) + urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, c.TpsLimitInterval) + urlMap.Set(constant.TPS_LIMIT_RATE_KEY, c.TpsLimitRate) + urlMap.Set(constant.TPS_LIMITER_KEY, c.TpsLimiter) + urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, c.TpsLimitRejectedHandler) // execute limit filter - urlMap.Set(constant.EXECUTE_LIMIT_KEY, srvconfig.ExecuteLimit) - urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.ExecuteLimitRejectedHandler) + urlMap.Set(constant.EXECUTE_LIMIT_KEY, c.ExecuteLimit) + urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, c.ExecuteLimitRejectedHandler) - for _, v := range srvconfig.Methods { + for _, v := range c.Methods { prefix := "methods." + v.Name + "." urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance) urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries) diff --git a/config_center/apollo/factory.go b/config_center/apollo/factory.go index 47011be4a3e0e421ca7a314620a3547d665111c8..a5a69e121598bea4194398423775a99f04b61ced 100644 --- a/config_center/apollo/factory.go +++ b/config_center/apollo/factory.go @@ -20,7 +20,7 @@ package apollo import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" - . "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center/parser" ) @@ -28,13 +28,13 @@ func init() { extension.SetConfigCenterFactory("apollo", createDynamicConfigurationFactory) } -func createDynamicConfigurationFactory() DynamicConfigurationFactory { +func createDynamicConfigurationFactory() config_center.DynamicConfigurationFactory { return &apolloConfigurationFactory{} } type apolloConfigurationFactory struct{} -func (f *apolloConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) { +func (f *apolloConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { dynamicConfiguration, err := newApolloConfiguration(url) if err != nil { return nil, err diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index f72f988e6c1426c9f0e481bff99e0ce78263330e..85dff14a1ec9ba3905890bf37dc1e1827d59d80f 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -32,7 +32,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" - . "github.com/apache/dubbo-go/config_center" + cc "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center/parser" "github.com/apache/dubbo-go/remoting" ) @@ -58,7 +58,7 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) { configCluster := url.GetParam(constant.CONFIG_CLUSTER_KEY, "") appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "") - namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, DEFAULT_GROUP)) + namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP)) c.appConf = &agollo.AppConfig{ AppId: appId, Cluster: configCluster, @@ -84,8 +84,8 @@ func getChangeType(change agollo.ConfigChangeType) remoting.EventType { } } -func (c *apolloConfiguration) AddListener(key string, listener ConfigurationListener, opts ...Option) { - k := &Options{} +func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) { + k := &cc.Options{} for _, opt := range opts { opt(k) } @@ -95,8 +95,8 @@ func (c *apolloConfiguration) AddListener(key string, listener ConfigurationList l.(*apolloListener).AddListener(listener) } -func (c *apolloConfiguration) RemoveListener(key string, listener ConfigurationListener, opts ...Option) { - k := &Options{} +func (c *apolloConfiguration) RemoveListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) { + k := &cc.Options{} for _, opt := range opts { opt(k) } @@ -116,7 +116,7 @@ func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat) } -func (c *apolloConfiguration) GetInternalProperty(key string, opts ...Option) (string, error) { +func (c *apolloConfiguration) GetInternalProperty(key string, opts ...cc.Option) (string, error) { config := agollo.GetConfig(c.appConf.NamespaceName) if config == nil { return "", errors.New(fmt.Sprintf("nothing in namespace:%s ", key)) @@ -124,11 +124,11 @@ func (c *apolloConfiguration) GetInternalProperty(key string, opts ...Option) (s return config.GetStringValue(key, ""), nil } -func (c *apolloConfiguration) GetRule(key string, opts ...Option) (string, error) { +func (c *apolloConfiguration) GetRule(key string, opts ...cc.Option) (string, error) { return c.GetInternalProperty(key, opts...) } -func (c *apolloConfiguration) GetProperties(key string, opts ...Option) (string, error) { +func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (string, error) { /** * when group is not null, we are getting startup configs(config file) from Config Center, for example: * key=dubbo.propertie diff --git a/config_center/apollo/listener.go b/config_center/apollo/listener.go index 1355be0b76b5c886e81861ef3f846cede5f9e0e7..820d02fb48e2204c3f1eb74fd5624132a63d367e 100644 --- a/config_center/apollo/listener.go +++ b/config_center/apollo/listener.go @@ -29,6 +29,14 @@ type apolloListener struct { listeners map[config_center.ConfigurationListener]struct{} } +// NewApolloListener ... +func NewApolloListener() *apolloListener { + return &apolloListener{ + listeners: make(map[config_center.ConfigurationListener]struct{}, 0), + } +} + +// OnChange ... func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { for key, change := range changeEvent.Changes { for listener := range a.listeners { @@ -41,20 +49,15 @@ func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { } } -// NewApolloListener ... -func NewApolloListener() *apolloListener { - return &apolloListener{ - listeners: make(map[config_center.ConfigurationListener]struct{}, 0), - } -} - -func (al *apolloListener) AddListener(l config_center.ConfigurationListener) { - if _, ok := al.listeners[l]; !ok { - al.listeners[l] = struct{}{} - agollo.AddChangeListener(al) +// AddListener ... +func (a *apolloListener) AddListener(l config_center.ConfigurationListener) { + if _, ok := a.listeners[l]; !ok { + a.listeners[l] = struct{}{} + agollo.AddChangeListener(a) } } -func (al *apolloListener) RemoveListener(l config_center.ConfigurationListener) { - delete(al.listeners, l) +// RemoveListener ... +func (a *apolloListener) RemoveListener(l config_center.ConfigurationListener) { + delete(a.listeners, l) } diff --git a/config_center/configurator/mock.go b/config_center/configurator/mock.go index cf418924893bdf6407d3bc082cb18b2bd3f78022..d294b9195db9cfe60056bc29ec26816f740ea396 100644 --- a/config_center/configurator/mock.go +++ b/config_center/configurator/mock.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package configurator import ( @@ -31,10 +32,12 @@ type mockConfigurator struct { configuratorUrl *common.URL } +// GetUrl ... func (c *mockConfigurator) GetUrl() *common.URL { return c.configuratorUrl } +// Configure ... func (c *mockConfigurator) Configure(url *common.URL) { if cluster := c.GetUrl().GetParam(constant.CLUSTER_KEY, ""); cluster != "" { url.SetParam(constant.CLUSTER_KEY, cluster) diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index 8e8fe5cc1ab91eb779a73f85e3a71984f0ba6798..d0b23ef2f20d065135547536c2cebcec3eec0ce1 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package configurator import ( diff --git a/config_center/configurator/override_test.go b/config_center/configurator/override_test.go index a585f4217f81a5d600ec9a48c12b3b47ff2d5322..b8f417b4602e135d114be99637061851088d4e44 100644 --- a/config_center/configurator/override_test.go +++ b/config_center/configurator/override_test.go @@ -38,6 +38,7 @@ func Test_configureVerison2p6(t *testing.T) { assert.Equal(t, "override", configurator.GetUrl().Protocol) providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + assert.NoError(t, err) configurator.Configure(&providerUrl) assert.Equal(t, "failfast", providerUrl.GetParam(constant.CLUSTER_KEY, "")) @@ -49,6 +50,7 @@ func Test_configureVerisonOverrideAddr(t *testing.T) { assert.Equal(t, "override", configurator.GetUrl().Protocol) providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + assert.NoError(t, err) configurator.Configure(&providerUrl) assert.Equal(t, "failover", providerUrl.GetParam(constant.CLUSTER_KEY, "")) @@ -60,6 +62,7 @@ func Test_configureVerison2p6WithIp(t *testing.T) { assert.Equal(t, "override", configurator.GetUrl().Protocol) providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + assert.NoError(t, err) configurator.Configure(&providerUrl) assert.Equal(t, "failfast", providerUrl.GetParam(constant.CLUSTER_KEY, "")) @@ -71,6 +74,7 @@ func Test_configureVerison2p7(t *testing.T) { configurator := extension.GetConfigurator("default", &url) providerUrl, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&app.version=0.0.1&application=BDTService&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&group=&interface=com.ikurento.user.UserProvider&ip=10.32.20.124&loadbalance=random&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name=BDTService&organization=ikurento.com&owner=ZX&pid=64225&retries=0&service.filter=echo&side=provider×tamp=1562076628&version=&warmup=100") + assert.NoError(t, err) configurator.Configure(&providerUrl) assert.Equal(t, "failfast", providerUrl.GetParam(constant.CLUSTER_KEY, "")) diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index f9c2a8de6c6d079e0613626ae86ecf4451439b75..d6c3b06b327f16c709b09121e589db6694d3663e 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -29,8 +29,12 @@ import ( ////////////////////////////////////////// // DynamicConfiguration ////////////////////////////////////////// -const DEFAULT_GROUP = "dubbo" -const DEFAULT_CONFIG_TIMEOUT = "10s" +const ( + // DEFAULT_GROUP: default group + DEFAULT_GROUP = "dubbo" + // DEFAULT_CONFIG_TIMEOUT: default config timeout + DEFAULT_CONFIG_TIMEOUT = "10s" +) // DynamicConfiguration ... type DynamicConfiguration interface { diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index f0a5dfec1ab6af64b2227acf62c2fed788b5f5bf..4d972b629abb7abd7cc0d0018026e4ccc04a1e4f 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -43,7 +43,7 @@ var ( ) // GetDynamicConfiguration ... -func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) { +func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(_ *common.URL) (DynamicConfiguration, error) { var err error once.Do(func() { dynamicConfiguration = &MockDynamicConfiguration{listener: map[string]ConfigurationListener{}} @@ -89,21 +89,21 @@ type MockDynamicConfiguration struct { } // AddListener ... -func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, opions ...Option) { +func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, _ ...Option) { c.listener[key] = listener } // RemoveListener ... -func (c *MockDynamicConfiguration) RemoveListener(key string, listener ConfigurationListener, opions ...Option) { +func (c *MockDynamicConfiguration) RemoveListener(_ string, _ ConfigurationListener, _ ...Option) { } // GetConfig ... -func (c *MockDynamicConfiguration) GetConfig(key string, opts ...Option) (string, error) { +func (c *MockDynamicConfiguration) GetConfig(_ string, _ ...Option) (string, error) { return c.content, nil } -//For zookeeper, getConfig and getConfigs have the same meaning. +// GetConfigs For zookeeper, getConfig and getConfigs have the same meaning. func (c *MockDynamicConfiguration) GetConfigs(key string, opts ...Option) (string, error) { return c.GetConfig(key, opts...) } @@ -119,11 +119,11 @@ func (c *MockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { } // GetProperties ... -func (c *MockDynamicConfiguration) GetProperties(key string, opts ...Option) (string, error) { +func (c *MockDynamicConfiguration) GetProperties(_ string, _ ...Option) (string, error) { return c.content, nil } -//For zookeeper, getConfig and getConfigs have the same meaning. +// GetInternalProperty For zookeeper, getConfig and getConfigs have the same meaning. func (c *MockDynamicConfiguration) GetInternalProperty(key string, opts ...Option) (string, error) { return c.GetProperties(key, opts...) } diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go index b0c0db34bc8a8c4dfb68c3493e7ed772bb6f54d1..9aaa1f700f7eb581e952485681d90c051ea516f4 100644 --- a/config_center/parser/configuration_parser.go +++ b/config_center/parser/configuration_parser.go @@ -36,8 +36,10 @@ import ( ) const ( + // ScopeApplication ... ScopeApplication = "application" - GeneralType = "general" + // GeneralType ... + GeneralType = "general" ) // ConfigurationParser ... @@ -46,7 +48,7 @@ type ConfigurationParser interface { ParseToUrls(content string) ([]*common.URL, error) } -//for support properties file in config center +// DefaultConfigurationParser for support properties file in config center type DefaultConfigurationParser struct{} // ConfiguratorConfig ... @@ -72,12 +74,12 @@ type ConfigItem struct { // Parse ... func (parser *DefaultConfigurationParser) Parse(content string) (map[string]string, error) { - properties, err := properties.LoadString(content) + pps, err := properties.LoadString(content) if err != nil { logger.Errorf("Parse the content {%v} in DefaultConfigurationParser error ,error message is {%v}", content, err) return nil, err } - return properties.Map(), nil + return pps.Map(), nil } // ParseToUrls ... diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 504d4910581aff52afa74b13fdfce61c9170ca48..6842d9e37711e954a93c7982bc959aa0798a9c93 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -37,7 +37,11 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) -const ZkClient = "zk config_center" +const ( + // ZkClient + //zookeeper client name + ZkClient = "zk config_center" +) type zookeeperDynamicConfiguration struct { url *common.URL @@ -134,10 +138,9 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config content, _, err := c.client.GetContent(c.rootPath + "/" + key) if err != nil { return "", perrors.WithStack(err) - } else { - return string(content), nil } + return string(content), nil } //For zookeeper, getConfig and getConfigs have the same meaning. @@ -156,57 +159,57 @@ func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser) c.parser = p } -func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { - return r.client +func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { + return c.client } -func (r *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { - r.client = client +func (c *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { + c.client = client } -func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { - return &r.cltLock +func (c *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { + return &c.cltLock } -func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { - return &r.wg +func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &c.wg } -func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} { - return r.done +func (c *zookeeperDynamicConfiguration) GetDone() chan struct{} { + return c.done } -func (r *zookeeperDynamicConfiguration) GetUrl() common.URL { - return *r.url +func (c *zookeeperDynamicConfiguration) GetUrl() common.URL { + return *c.url } -func (r *zookeeperDynamicConfiguration) Destroy() { - if r.listener != nil { - r.listener.Close() +func (c *zookeeperDynamicConfiguration) Destroy() { + if c.listener != nil { + c.listener.Close() } - close(r.done) - r.wg.Wait() - r.closeConfigs() + close(c.done) + c.wg.Wait() + c.closeConfigs() } -func (r *zookeeperDynamicConfiguration) IsAvailable() bool { +func (c *zookeeperDynamicConfiguration) IsAvailable() bool { select { - case <-r.done: + case <-c.done: return false default: return true } } -func (r *zookeeperDynamicConfiguration) closeConfigs() { - r.cltLock.Lock() - defer r.cltLock.Unlock() +func (c *zookeeperDynamicConfiguration) closeConfigs() { + c.cltLock.Lock() + defer c.cltLock.Unlock() logger.Infof("begin to close provider zk client") // Close the old client first to close the tmp node - r.client.Close() - r.client = nil + c.client.Close() + c.client = nil } -func (r *zookeeperDynamicConfiguration) RestartCallBack() bool { +func (c *zookeeperDynamicConfiguration) RestartCallBack() bool { return true } diff --git a/filter/filter.go b/filter/filter.go index 6c9e4455476b42d97718b5364d9687ac9671f687..c069510498c7ac68b2bb2169dfe7132a4ef63229 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -24,6 +24,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// Filter // Extension - Filter type Filter interface { Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result diff --git a/filter/filter_impl/access_log_filter.go b/filter/filter_impl/access_log_filter.go index a07f479742a578038f1beeeb12c4950fe850ba32..fbfe7565170c7df468f755a4bd1aadde166a79c1 100644 --- a/filter/filter_impl/access_log_filter.go +++ b/filter/filter_impl/access_log_filter.go @@ -35,13 +35,21 @@ import ( const ( //used in URL. - FileDateFormat = "2006-01-02" + + // FileDateFormat ... + FileDateFormat = "2006-01-02" + // MessageDateLayout ... MessageDateLayout = "2006-01-02 15:04:05" - LogMaxBuffer = 5000 - LogFileMode = 0600 + // LogMaxBuffer ... + LogMaxBuffer = 5000 + // LogFileMode ... + LogFileMode = 0600 // those fields are the data collected by this filter - Types = "types" + + // Types ... + Types = "types" + // Arguments ... Arguments = "arguments" ) @@ -50,6 +58,7 @@ func init() { } /* + * AccessLogFilter * Although the access log filter is a default filter, * you should config "accesslog" in service's config to tell the filter where store the access log. * for example: @@ -88,7 +97,7 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) { } } -func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocation protocol.Invocation) map[string]string { +func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string { dataMap := make(map[string]string, 16) attachments := invocation.Attachments() dataMap[constant.INTERFACE_KEY] = attachments[constant.INTERFACE_KEY] @@ -122,7 +131,7 @@ func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocati } // OnResponse ... -func (ef *AccessLogFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *AccessLogFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/echo_filter.go b/filter/filter_impl/echo_filter.go index 4ccecc2dbc68383071b789692babadac2c80c7fd..a12800a21a8ebe4545b4a8b5bd0f8a30c1462105 100644 --- a/filter/filter_impl/echo_filter.go +++ b/filter/filter_impl/echo_filter.go @@ -20,6 +20,7 @@ package filter_impl import ( "context" ) + import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -29,6 +30,7 @@ import ( ) const ( + // ECHO echo module name ECHO = "echo" ) @@ -36,6 +38,7 @@ func init() { extension.SetFilter(ECHO, GetFilter) } +// EchoFilter // RPCService need a Echo method in consumer, if you want to use EchoFilter // eg: // Echo func(ctx context.Context, arg interface{}, rsp *Xxx) error @@ -56,7 +59,9 @@ func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invo } // OnResponse ... -func (ef *EchoFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *EchoFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, + _ protocol.Invocation) protocol.Result { + return result } diff --git a/filter/filter_impl/execute_limit_filter.go b/filter/filter_impl/execute_limit_filter.go index f9dab06ebe7d7e02be5b6ae23587495d2df7d95b..434c378045456eb13317e0a48630ebd33f244c05 100644 --- a/filter/filter_impl/execute_limit_filter.go +++ b/filter/filter_impl/execute_limit_filter.go @@ -37,13 +37,16 @@ import ( "github.com/apache/dubbo-go/protocol" ) -const name = "execute" +const ( + name = "execute" +) func init() { extension.SetFilter(name, GetExecuteLimitFilter) } /** + * ExecuteLimitFilter * The filter will limit the number of in-progress request and it's thread-safe. * example: * "UserProvider": @@ -80,17 +83,17 @@ type ExecuteState struct { // Invoke ... func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { methodConfigPrefix := "methods." + invocation.MethodName() + "." - url := invoker.GetUrl() - limitTarget := url.ServiceKey() + ivkURL := invoker.GetUrl() + limitTarget := ivkURL.ServiceKey() limitRateConfig := constant.DEFAULT_EXECUTE_LIMIT - methodLevelConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "") + methodLevelConfig := ivkURL.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "") if len(methodLevelConfig) > 0 { // we have the method-level configuration limitTarget = limitTarget + "#" + invocation.MethodName() limitRateConfig = methodLevelConfig } else { - limitRateConfig = url.GetParam(constant.EXECUTE_LIMIT_KEY, constant.DEFAULT_EXECUTE_LIMIT) + limitRateConfig = ivkURL.GetParam(constant.EXECUTE_LIMIT_KEY, constant.DEFAULT_EXECUTE_LIMIT) } limitRate, err := strconv.ParseInt(limitRateConfig, 0, 0) @@ -110,17 +113,17 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok concurrentCount := state.(*ExecuteState).increase() defer state.(*ExecuteState).decrease() if concurrentCount > limitRate { - logger.Errorf("The invocation was rejected due to over the execute limitation, url: %s ", url.String()) - rejectedHandlerConfig := url.GetParam(methodConfigPrefix+constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, - url.GetParam(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY)) - return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(url, invocation) + logger.Errorf("The invocation was rejected due to over the execute limitation, url: %s ", ivkURL.String()) + rejectedHandlerConfig := ivkURL.GetParam(methodConfigPrefix+constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, + ivkURL.GetParam(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY)) + return extension.GetRejectedExecutionHandler(rejectedHandlerConfig).RejectedExecution(ivkURL, invocation) } return invoker.Invoke(ctx, invocation) } // OnResponse ... -func (ef *ExecuteLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *ExecuteLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go index fec1c3aa51451d5cb18e037b14ec778393072a93..e8ff2679b0294d6519aecd1cc1fe37bdeab89e46 100644 --- a/filter/filter_impl/generic_filter.go +++ b/filter/filter_impl/generic_filter.go @@ -22,9 +22,11 @@ import ( "reflect" "strings" ) + import ( hessian "github.com/apache/dubbo-go-hessian2" ) + import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -34,6 +36,8 @@ import ( ) const ( + // GENERIC + //generic module name GENERIC = "generic" ) @@ -70,7 +74,8 @@ func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i } // OnResponse ... -func (ef *GenericFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (ef *GenericFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, + _ protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/generic_filter_test.go b/filter/filter_impl/generic_filter_test.go index c4dc19270e8a81d65e8b56664d4ab0af204f29c5..22948353fc16a99696a85489ce5df7dc9b18a7ba 100644 --- a/filter/filter_impl/generic_filter_test.go +++ b/filter/filter_impl/generic_filter_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" ) + import ( "github.com/stretchr/testify/assert" ) diff --git a/filter/filter_impl/generic_service_filter.go b/filter/filter_impl/generic_service_filter.go index c577ae2077fbd042def0a7209459ec59c62b684f..6272df6b39b0c18a77721f3a8c9e92618133aa6c 100644 --- a/filter/filter_impl/generic_service_filter.go +++ b/filter/filter_impl/generic_service_filter.go @@ -40,7 +40,9 @@ import ( ) const ( - GENERIC_SERVICE = "generic_service" + // GENERIC_SERVICE ... + GENERIC_SERVICE = "generic_service" + // GENERIC_SERIALIZATION_DEFAULT ... GENERIC_SERIALIZATION_DEFAULT = "true" ) diff --git a/filter/filter_impl/generic_service_filter_test.go b/filter/filter_impl/generic_service_filter_test.go index 8211e717564465bba3009772715a3ab1cd3322dd..24ed3b95fcab6111f5c432a12c41dd0b60b4a5a2 100644 --- a/filter/filter_impl/generic_service_filter_test.go +++ b/filter/filter_impl/generic_service_filter_test.go @@ -49,10 +49,10 @@ func (c *TestStruct) JavaClassName() string { return "com.test.testStruct" } -type TestService struct { -} +type TestService struct{} -func (ts *TestService) MethodOne(ctx context.Context, test1 *TestStruct, test2 []TestStruct, +// MethodOne ... +func (ts *TestService) MethodOne(_ context.Context, test1 *TestStruct, test2 []TestStruct, test3 interface{}, test4 []interface{}, test5 *string) (*TestStruct, error) { if test1 == nil { return nil, errors.New("param test1 is nil") @@ -72,7 +72,8 @@ func (ts *TestService) MethodOne(ctx context.Context, test1 *TestStruct, test2 [ return &TestStruct{}, nil } -func (s *TestService) Reference() string { +// Reference ... +func (*TestService) Reference() string { return "com.test.Path" } diff --git a/filter/filter_impl/hystrix_filter.go b/filter/filter_impl/hystrix_filter.go index c2834480e72b81d1c8d5d8973db06e9487692118..9fd97b57b677c9aa8ec492151df9aace6dc78b62 100644 --- a/filter/filter_impl/hystrix_filter.go +++ b/filter/filter_impl/hystrix_filter.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package filter_impl import ( @@ -36,9 +37,12 @@ import ( ) const ( + // HYSTRIX_CONSUMER ... HYSTRIX_CONSUMER = "hystrix_consumer" + // HYSTRIX_PROVIDER ... HYSTRIX_PROVIDER = "hystrix_provider" - HYSTRIX = "hystrix" + // HYSTRIX ... + HYSTRIX = "hystrix" ) var ( diff --git a/filter/filter_impl/hystrix_filter_test.go b/filter/filter_impl/hystrix_filter_test.go index 894573036ae6dd9edca88e8e4cdd92e7643abcb5..66c17d920c14e23f1562773c152e99955a48bfb9 100644 --- a/filter/filter_impl/hystrix_filter_test.go +++ b/filter/filter_impl/hystrix_filter_test.go @@ -21,11 +21,13 @@ import ( "regexp" "testing" ) + import ( "github.com/afex/hystrix-go/hystrix" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) + import ( "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" @@ -126,9 +128,9 @@ type testMockSuccessInvoker struct { protocol.BaseInvoker } -func (iv *testMockSuccessInvoker) Invoke(context context.Context, invocation protocol.Invocation) protocol.Result { +func (iv *testMockSuccessInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { return &protocol.RPCResult{ - Rest: "Sucess", + Rest: "Success", Err: nil, } } @@ -137,7 +139,7 @@ type testMockFailInvoker struct { protocol.BaseInvoker } -func (iv *testMockFailInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { +func (iv *testMockFailInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { return &protocol.RPCResult{ Err: errors.Errorf("exception"), } diff --git a/filter/filter_impl/token_filter.go b/filter/filter_impl/token_filter.go index 2340e7271fb6ae0dac582341c3537931b7fbc4aa..4605416c40a616361868c313881ae784257e6742 100644 --- a/filter/filter_impl/token_filter.go +++ b/filter/filter_impl/token_filter.go @@ -34,6 +34,7 @@ import ( ) const ( + // TOKEN ... TOKEN = "token" ) diff --git a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go b/filter/filter_impl/tps/tps_limit_fix_window_strategy.go index a985724028835f236e8db6c7dd9220b1c5952fbe..a9c2ac15a417ffa6ff8f5b8d78d5c6a94877db30 100644 --- a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_fix_window_strategy.go @@ -29,6 +29,7 @@ import ( ) const ( + // FixedWindowKey ... FixedWindowKey = "fixedWindow" ) @@ -39,6 +40,7 @@ func init() { } /** + * FixedWindowTpsLimitStrategyImpl * It's the same as default implementation in Java * It's not a thread-safe implementation. * It you want to use the thread-safe implementation, please use ThreadSafeFixedWindowTpsLimitStrategyImpl diff --git a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go b/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go index c647380641676b8992df332a81406c331d1d5bce..a781cc7bfbf297d0b9cf84ca0aa9dcfbbef7e14b 100644 --- a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go @@ -33,6 +33,7 @@ func init() { } /** + * SlidingWindowTpsLimitStrategyImpl * it's thread-safe. * "UserProvider": * registry: "hangzhouzk" diff --git a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go b/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go index ee0558dd29c641ec4f4dccd843e82c9486d5ecd8..16624836e6397df5adda3f2aa5a80966721a97fb 100644 --- a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go @@ -33,6 +33,7 @@ func init() { } /** + * ThreadSafeFixedWindowTpsLimitStrategyImpl * it's the thread-safe implementation. * Also, it's a thread-safe decorator of FixedWindowTpsLimitStrategyImpl * "UserProvider": diff --git a/filter/filter_impl/tps/tps_limiter_method_service.go b/filter/filter_impl/tps/tps_limiter_method_service.go index 49f785f354031b1d130f7316d540a9b4f41f1a05..7fe8de9237b82415a09083c2be59df5e232ecaf0 100644 --- a/filter/filter_impl/tps/tps_limiter_method_service.go +++ b/filter/filter_impl/tps/tps_limiter_method_service.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package tps import ( @@ -34,7 +35,9 @@ import ( "github.com/apache/dubbo-go/protocol" ) -const name = "method-service" +const ( + name = "method-service" +) func init() { extension.SetTpsLimiter(constant.DEFAULT_KEY, GetMethodServiceTpsLimiter) @@ -42,6 +45,7 @@ func init() { } /** + * MethodServiceTpsLimiterImpl * This implementation allows developer to config both method-level and service-level tps limiter. * for example: * "UserProvider": diff --git a/filter/filter_impl/tps_limit_filter.go b/filter/filter_impl/tps_limit_filter.go index 52ac5d147904011bd5286c90bc565584f1869f33..fa78288f9678d67d0eb0d025a83b75493f7fda80 100644 --- a/filter/filter_impl/tps_limit_filter.go +++ b/filter/filter_impl/tps_limit_filter.go @@ -31,6 +31,7 @@ import ( ) const ( + // TpsLimitFilterKey key TpsLimitFilterKey = "tps" ) @@ -39,6 +40,7 @@ func init() { } /** + * TpsLimitFilter * if you wish to use the TpsLimiter, please add the configuration into your service provider configuration: * for example: * "UserProvider": @@ -71,7 +73,8 @@ func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in } // OnResponse ... -func (t TpsLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (t TpsLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, + _ protocol.Invocation) protocol.Result { return result } diff --git a/filter/filter_impl/tracing_filter_test.go b/filter/filter_impl/tracing_filter_test.go index 30d9d2b7dde318e2ca86127ba480c9a53d7e6f66..c6d6673f3a45e7d73ffd71373b5a7a2860d36a52 100644 --- a/filter/filter_impl/tracing_filter_test.go +++ b/filter/filter_impl/tracing_filter_test.go @@ -59,7 +59,6 @@ func TestTracingFilter_Invoke(t *testing.T) { // has previous span tf.Invoke(ctx, invoker, inv) - ctx = context.Background() // has remote ctx ctx = context.WithValue(context.Background(), constant.TRACING_REMOTE_SPAN_CTX, span.Context()) tf.Invoke(ctx, invoker, inv) diff --git a/filter/handler/rejected_execution_handler_only_log.go b/filter/handler/rejected_execution_handler_only_log.go index 18a22d3d0d6d94edb49d0166553463b6569a0e48..0f9003c7df2165a2f3a364a5afc47f578db1d243 100644 --- a/filter/handler/rejected_execution_handler_only_log.go +++ b/filter/handler/rejected_execution_handler_only_log.go @@ -30,7 +30,10 @@ import ( "github.com/apache/dubbo-go/protocol" ) -const HandlerName = "log" +const ( + // HandlerName handler name + HandlerName = "log" +) func init() { extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler) @@ -41,6 +44,7 @@ var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler var onlyLogHandlerOnce sync.Once /** + * OnlyLogRejectedExecutionHandler * This implementation only logs the invocation info. * it always return en error inside the result. * "UserProvider": @@ -57,7 +61,9 @@ type OnlyLogRejectedExecutionHandler struct { } // RejectedExecution ... -func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { +func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, + _ protocol.Invocation) protocol.Result { + logger.Errorf("The invocation was rejected. url: %s", url.String()) return &protocol.RPCResult{} } diff --git a/filter/rejected_execution_handler.go b/filter/rejected_execution_handler.go index ce95b54b14d01e0aec6f6089799df8378b5bcca5..caeea1db6631d0968fd58f59f9577ee9272f3ca0 100644 --- a/filter/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package filter import ( @@ -22,6 +23,7 @@ import ( ) /** + * RejectedExecutionHandler * If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter, * the implementation will be used. * The common case is that sometimes you want to return the default value when the request was rejected. diff --git a/filter/tps_limit_strategy.go b/filter/tps_limit_strategy.go index ad7133ca69468b348e76858c493b434114ce8c11..5edf32ce1912642c7ad0ea0b3f6144b45c267eb4 100644 --- a/filter/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -18,6 +18,7 @@ package filter /* + * TpsLimitStrategy * please register your implementation by invoking SetTpsLimitStrategy * "UserProvider": * registry: "hangzhouzk" diff --git a/filter/tps_limiter.go b/filter/tps_limiter.go index 1d2b2341ac7d9b12f75d373909b0baa58bc7295f..dbc9f76838a4406b4788e7757453098613253d58 100644 --- a/filter/tps_limiter.go +++ b/filter/tps_limiter.go @@ -23,6 +23,7 @@ import ( ) /* + * TpsLimiter * please register your implementation by invoking SetTpsLimiter * The usage, for example: * "UserProvider": diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index d723f68387d55c2219e2b69d78ab147d8c51fd3c..3923b7e4e7e543f4c60a89aaebf67f6238916722 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -191,13 +191,13 @@ func NewResponse(reply interface{}, atta map[string]string) *Response { } } -// call one way +// CallOneway call one way func (c *Client) CallOneway(request *Request) error { return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) } -// if @response is nil, the transport layer will get the response without notify the invoker. +// Call if @response is nil, the transport layer will get the response without notify the invoker. func (c *Client) Call(request *Request, response *Response) error { ct := CT_TwoWay @@ -256,7 +256,13 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac if session == nil { return errSessionNotExist } - defer c.pool.release(conn, err) + defer func() { + if err == nil { + c.pool.put(conn) + return + } + conn.close() + }() if err = c.transfer(session, p, rsp); err != nil { return perrors.WithStack(err) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 64d9477ce543d4151812f3c40411b44cce0d1203..3e50eb901dbe1e549aea4ea7414d9617851b5363 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -30,20 +30,24 @@ import ( perrors "github.com/pkg/errors" ) -// serial ID +//SerialID serial ID type SerialID byte const ( + // S_Dubbo dubbo serial id S_Dubbo SerialID = 2 ) -// call type +//CallType call type type CallType int32 const ( + // CT_UNKNOWN unknown call type CT_UNKNOWN CallType = 0 - CT_OneWay CallType = 1 - CT_TwoWay CallType = 2 + // CT_OneWay call one way + CT_OneWay CallType = 1 + // CT_TwoWay call in request/response + CT_TwoWay CallType = 2 ) //////////////////////////////////////////// diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index fde3904079d5708dfe735dedc1c589776227a825..dbc6989c54780afacef717f1d110833d92967f9f 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -47,7 +47,8 @@ type ( SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"` } - // Config holds supported types by the multiconfig package + // ServerConfig + //Config holds supported types by the multiconfig package ServerConfig struct { // session SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` @@ -63,7 +64,8 @@ type ( GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` } - // Config holds supported types by the multiconfig package + // ClientConfig + //Config holds supported types by the multiconfig package ClientConfig struct { ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"` diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 4131c4533742858db1827f0e6256d3080f38f118..607ef007b5302fe07ff6090a463eab96d6ae4fa9 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -35,8 +35,10 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) -// Err_No_Reply ... -var Err_No_Reply = perrors.New("request need @response") +var ( + // ErrNoReply ... + ErrNoReply = perrors.New("request need @response") +) var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} @@ -86,7 +88,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } } else { if inv.Reply() == nil { - result.Err = Err_No_Reply + result.Err = ErrNoReply } else { result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) } diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 9d47cae2f5c310c3245d522f796ee5014eb5298f..355dbc802488338ef4dbdd7290166038b312f183 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -31,7 +31,9 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// dubbo protocol constant const ( + // DUBBO ... DUBBO = "dubbo" ) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 926afa9a103201b04639cf283f5c87c53eb12541..204e8a1c5d2607d3158ff4f68334a39fd1fb7861 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -22,6 +22,7 @@ import ( "fmt" "net/url" "sync" + "sync/atomic" "time" ) @@ -40,7 +41,10 @@ import ( ) // todo: WritePkg_Timeout will entry *.yml -const WritePkg_Timeout = 5 * time.Second +const ( + // WritePkg_Timeout ... + WritePkg_Timeout = 5 * time.Second +) var ( errTooManySessions = perrors.New("too many sessions") @@ -51,6 +55,14 @@ type rpcSession struct { reqNum int32 } +func (s *rpcSession) AddReqNum(num int32) { + atomic.AddInt32(&s.reqNum, num) +} + +func (s *rpcSession) GetReqNum() int32 { + return atomic.LoadInt32(&s.reqNum) +} + //////////////////////////////////////////// // RpcClientHandler //////////////////////////////////////////// diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index b5bf040c67c2e0071222466e59db4de67d9e1ca2..2df1c6935305e0d70635613f509021e5b9203833 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -154,11 +154,11 @@ func (c *gettyRPCClient) addSession(session getty.Session) { } c.lock.Lock() + defer c.lock.Unlock() if c.sessions == nil { c.sessions = make([]*rpcSession, 0, 16) } c.sessions = append(c.sessions, &rpcSession{session: session}) - c.lock.Unlock() } func (c *gettyRPCClient) removeSession(session getty.Session) { @@ -166,21 +166,27 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { return } - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return - } + var removeFlag bool + func() { + c.lock.Lock() + defer c.lock.Unlock() + if c.sessions == nil { + return + } - for i, s := range c.sessions { - if s.session == session { - c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) - logger.Debugf("delete session{%s}, its index{%d}", session.Stat(), i) - break + for i, s := range c.sessions { + if s.session == session { + c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) + logger.Debugf("delete session{%s}, its index{%d}", session.Stat(), i) + break + } } - } - logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) - if len(c.sessions) == 0 { + logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) + if len(c.sessions) == 0 { + removeFlag = true + } + }() + if removeFlag { c.pool.safeRemove(c) c.close() } @@ -190,17 +196,24 @@ func (c *gettyRPCClient) updateSession(session getty.Session) { if session == nil { return } - c.lock.Lock() - defer c.lock.Unlock() - if c.sessions == nil { - return - } - for i, s := range c.sessions { - if s.session == session { - c.sessions[i].reqNum++ - break + var rs *rpcSession + func() { + c.lock.RLock() + defer c.lock.RUnlock() + if c.sessions == nil { + return + } + + for i, s := range c.sessions { + if s.session == session { + rs = c.sessions[i] + break + } } + }() + if rs != nil { + rs.AddReqNum(1) } } @@ -238,28 +251,42 @@ func (c *gettyRPCClient) isAvailable() bool { func (c *gettyRPCClient) close() error { closeErr := perrors.Errorf("close gettyRPCClient{%#v} again", c) c.once.Do(func() { - c.gettyClient.Close() - c.gettyClient = nil - for _, s := range c.sessions { - logger.Infof("close client session{%s, last active:%s, request number:%d}", - s.session.Stat(), s.session.GetActive().String(), s.reqNum) - s.session.Close() - } - c.sessions = c.sessions[:0] + var ( + gettyClient getty.Client + sessions []*rpcSession + ) + func() { + c.lock.Lock() + defer c.lock.Unlock() + + gettyClient = c.gettyClient + c.gettyClient = nil + + sessions = make([]*rpcSession, 0, len(c.sessions)) + for _, s := range c.sessions { + sessions = append(sessions, s) + } + c.sessions = c.sessions[:0] + }() c.updateActive(0) + + go func() { + if gettyClient != nil { + gettyClient.Close() + } + for _, s := range sessions { + logger.Infof("close client session{%s, last active:%s, request number:%d}", + s.session.Stat(), s.session.GetActive().String(), s.GetReqNum()) + s.session.Close() + } + }() + closeErr = nil }) return closeErr } -func (c *gettyRPCClient) safeClose() error { - c.lock.Lock() - defer c.lock.Unlock() - - return c.close() -} - type gettyRPCClientPool struct { rpcClient *Client size int // size of []*gettyRPCClient @@ -284,37 +311,35 @@ func (p *gettyRPCClientPool) close() { p.conns = nil p.Unlock() for _, conn := range conns { - conn.safeClose() + conn.close() } } func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { - var ( - conn *gettyRPCClient - err error - ) - if conn, err = p.selectGettyRpcClient(protocol, addr); err == nil && conn == nil { + conn, err := p.get() + if err == nil && conn == nil { // create new conn return newGettyRPCClientConn(p, protocol, addr) } return conn, err } -func (p *gettyRPCClientPool) selectGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { + +func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) { + now := time.Now().Unix() + p.Lock() defer p.Unlock() if p.conns == nil { return nil, errClientPoolClosed } - now := time.Now().Unix() - for len(p.conns) > 0 { conn := p.conns[len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1] if d := now - conn.getActive(); d > p.ttl { p.remove(conn) - conn.safeClose() + go conn.close() continue } conn.updateActive(now) //update active time @@ -322,13 +347,9 @@ func (p *gettyRPCClientPool) selectGettyRpcClient(protocol, addr string) (*getty } return nil, nil } -func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { - if conn == nil || conn.getActive() == 0 { - return - } - if err != nil { - conn.safeClose() +func (p *gettyRPCClientPool) put(conn *gettyRPCClient) { + if conn == nil || conn.getActive() == 0 { return } @@ -341,8 +362,8 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { if len(p.conns) >= p.size { // delete @conn from client pool - p.remove(conn) - conn.safeClose() + // p.remove(conn) + conn.close() return } p.conns = append(p.conns, conn) diff --git a/protocol/grpc/common_test.go b/protocol/grpc/common_test.go index 165b82fabc5703a720766b04659b158d2b3fdbdf..3d0823b1061a61cfa391358e270c8b9081e9031c 100644 --- a/protocol/grpc/common_test.go +++ b/protocol/grpc/common_test.go @@ -77,7 +77,7 @@ func (g *greeterProviderBase) ServiceDesc() *native_grpc.ServiceDesc { Methods: []native_grpc.MethodDesc{ { MethodName: "SayHello", - Handler: _DUBBO_Greeter_SayHello_Handler, + Handler: dubboGreeterSayHelloHandler, }, }, Streams: []native_grpc.StreamDesc{}, @@ -85,7 +85,9 @@ func (g *greeterProviderBase) ServiceDesc() *native_grpc.ServiceDesc { } } -func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor native_grpc.UnaryServerInterceptor) (interface{}, error) { +func dubboGreeterSayHelloHandler(srv interface{}, ctx context.Context, + dec func(interface{}) error, interceptor native_grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(internal.HelloRequest) if err := dec(in); err != nil { return nil, err diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go index ae6fdf13016a32ab37024ccdbd4db8f2ebebc1c5..0f5625c152cc366289143b8a29d11cafb513b2f2 100644 --- a/protocol/grpc/grpc_protocol.go +++ b/protocol/grpc/grpc_protocol.go @@ -28,7 +28,10 @@ import ( "github.com/apache/dubbo-go/protocol" ) -const GRPC = "grpc" +const ( + // GRPC module name + GRPC = "grpc" +) func init() { extension.SetProtocol(GRPC, GetProtocol) diff --git a/protocol/grpc/internal/client.go b/protocol/grpc/internal/client.go index eb7dc1a456396afccb69293a410a650c200fc943..d236e3046a90e9179fba07a0be5edb07f8c2a3e8 100644 --- a/protocol/grpc/internal/client.go +++ b/protocol/grpc/internal/client.go @@ -33,7 +33,8 @@ func init() { config.SetConsumerService(&GrpcGreeterImpl{}) } -// used for dubbo-grpc biz client +// GrpcGreeterImpl +//used for dubbo-grpc biz client type GrpcGreeterImpl struct { SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error } diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go index 90799f3b4a0899fae607d803c63233c67d624152..064c738a53d2200223b0ca81aca77358afad032b 100644 --- a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go @@ -15,5 +15,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// dubbo plugin for protobuf. +// Package dubbo plugin for protobuf. package dubbo diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 0f42e96d5453229591a47fbc0a3c8f794312fa4a..b207fd0b0cc4eb7de8409a8c46c6fc9ef0baa5c7 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -29,7 +29,9 @@ import ( // /////////////////////////// // Invocation Impletment of RPC // /////////////////////////// + // todo: is it necessary to separate fields of consumer(provider) from RPCInvocation +// RPCInvocation ... type RPCInvocation struct { methodName string parameterTypes []reflect.Type diff --git a/protocol/invoker.go b/protocol/invoker.go index 6805f3fd034ac553e701b7dcbc4e23d93adb1c63..bb71bab1cfa2ede7fb035912ae996f9adb7411e0 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -20,11 +20,13 @@ package protocol import ( "context" ) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" ) +// Invoker ... //go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker // Extension - Invoker type Invoker interface { diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 7ae825e1eb0f4846982bad3237bc0197024b073d..ba7197dbc857c2ed7acda1a9f246a5b826e86915 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -159,6 +159,7 @@ func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, return perrors.WithStack(codec.Read(rspBody, rsp)) } +// Do // !!The high level of complexity and the likelihood that the fasthttp client has not been extensively used // in production means that you would need to expect a very large benefit to justify the adoption of fasthttp today. func (c *HTTPClient) Do(addr, path string, httpHeader http.Header, body []byte) ([]byte, error) { diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index 9f63e5000bce779cb2d1aa146905954b4d95bc83..d1c2a858b4e4223ac32fc1160b56f6ee1862c8ce 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -31,8 +31,10 @@ import ( ) const ( + // MAX_JSONRPC_ID max jsonrpc request/response id MAX_JSONRPC_ID = 0x7FFFFFFF - VERSION = "2.0" + // VERSION jsonrpc version + VERSION = "2.0" ) // CodecData ... @@ -55,7 +57,7 @@ const ( codeServerErrorEnd = -32000 ) -// rsponse Error +// Error response Error type Error struct { Code int `json:"code"` Message string `json:"message"` diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index a1669df7d3178518842d9df34f00c2b18e2bebb5..bed7099ab60a6c05c3799f993c0bb348a4b00f02 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -32,7 +32,11 @@ import ( "github.com/apache/dubbo-go/protocol" ) -const JSONRPC = "jsonrpc" +const ( + // JSONRPC + //module name + JSONRPC = "jsonrpc" +) func init() { extension.SetProtocol(JSONRPC, GetProtocol) diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 2e1bc16986abe1933088dd76ddf31c39e87a9f06..8600f02dad3d32d797613823de0bbe40261d2e71 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -51,9 +51,12 @@ var ( ) const ( - DefaultMaxSleepTime = 1 * time.Second // accept涓棿鏈€澶leep interval + // DefaultMaxSleepTime max sleep interval in accept + DefaultMaxSleepTime = 1 * time.Second + // DefaultHTTPRspBufferSize ... DefaultHTTPRspBufferSize = 1024 - PathPrefix = byte('/') + // PathPrefix ... + PathPrefix = byte('/') ) // Server ... diff --git a/protocol/protocol.go b/protocol/protocol.go index 948da995edf6fe1834b749c00e76e61ff6fdd226..a873469a8ba361c9dfc922b071ffbf256c6a8b98 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -26,6 +26,7 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +// Protocol // Extension - protocol type Protocol interface { Export(invoker Invoker) Exporter @@ -33,6 +34,7 @@ type Protocol interface { Destroy() } +// Exporter // wrapping invoker type Exporter interface { GetInvoker() Invoker diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 08479fe9d139e31821a0a41a851ded52687f87b3..70d2da0faed3bc9797eb23cec653bea05d445d91 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -31,6 +31,7 @@ import ( ) const ( + // FILTER ... FILTER = "filter" ) @@ -38,6 +39,7 @@ func init() { extension.SetProtocol(FILTER, GetProtocol) } +// ProtocolFilterWrapper // protocol in url decide who ProtocolFilterWrapper.protocol is type ProtocolFilterWrapper struct { protocol protocol.Protocol @@ -76,8 +78,8 @@ func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { // The order of filters is from left to right, so loading from right to left for i := len(filtNames) - 1; i >= 0; i-- { - filter := extension.GetFilter(filtNames[i]) - fi := &FilterInvoker{next: next, invoker: invoker, filter: filter} + flt := extension.GetFilter(filtNames[i]) + fi := &FilterInvoker{next: next, invoker: invoker, filter: flt} next = fi } diff --git a/registry/consul/registry.go b/registry/consul/registry.go index 73bf3975bc7c73f4a7748f46280ffb1aa5525ca8..c5b8510a6c87068a5b4f1ce52203d401a896a6c2 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -36,6 +36,7 @@ import ( ) const ( + // RegistryConnDelay ... RegistryConnDelay = 3 ) @@ -137,14 +138,15 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti } for { - if serviceEvent, err := listener.Next(); err != nil { + serviceEvent, err := listener.Next() + if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) } + + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) } } } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 2001cb163be575df41cb54babb05c41a723369ba..6a43319b2c0ee333c771feedbe9882dfbdf3dd2f 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -301,13 +301,14 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.In func (dir *registryDirectory) IsAvailable() bool { if !dir.BaseDirectory.IsAvailable() { return dir.BaseDirectory.IsAvailable() - } else { - for _, ivk := range dir.cacheInvokers { - if ivk.IsAvailable() { - return true - } + } + + for _, ivk := range dir.cacheInvokers { + if ivk.IsAvailable() { + return true } } + return false } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index b058113c69b8007803a8a18c1b5e0c3af8c184f4..0320579286a9bdb4cecadb50430c850d6ae3e61f 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -48,7 +48,9 @@ var ( ) const ( - Name = "etcdv3" + // Name module name + Name = "etcdv3" + // RegistryConnDelay ... RegistryConnDelay = 3 ) @@ -75,21 +77,32 @@ type etcdV3Registry struct { done chan struct{} } +// Client get the etcdv3 client func (r *etcdV3Registry) Client() *etcdv3.Client { return r.client } + +//SetClient set the etcdv3 client func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { r.client = client } + +// func (r *etcdV3Registry) ClientLock() *sync.Mutex { return &r.cltLock } + +//WaitGroup return the wait group handle func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup { return &r.wg } + +// GetDone return the done channel func (r *etcdV3Registry) GetDone() chan struct{} { return r.done } + +//RestartCallBack restart callback func (r *etcdV3Registry) RestartCallBack() bool { services := []common.URL{} @@ -148,10 +161,12 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { return r, nil } +// GetUrl get registry url func (r *etcdV3Registry) GetUrl() common.URL { return *r.URL } +// IsAvailable check the register client is available func (r *etcdV3Registry) IsAvailable() bool { select { @@ -162,6 +177,7 @@ func (r *etcdV3Registry) IsAvailable() bool { } } +// Destroy destroy client func (r *etcdV3Registry) Destroy() { if r.configListener != nil { @@ -183,6 +199,7 @@ func (r *etcdV3Registry) stop() { r.cltLock.Unlock() } +// Register ... func (r *etcdV3Registry) Register(svc common.URL) error { role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) @@ -347,7 +364,7 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { return configListener, nil } -//subscribe from registry +//Subscribe from registry func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { for { if !r.IsAvailable() { @@ -367,16 +384,14 @@ func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.Noti } for { - if serviceEvent, err := listener.Next(); err != nil { + serviceEvent, err := listener.Next() + if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) } - + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) } - } } diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index 6c05a8a83fce50053272181902aeaecdaee9597c..ab997b2916b09e5a6807030707b1872f955a2c4c 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -46,7 +46,8 @@ func initRegistry(t *testing.T) *etcdV3Registry { } out := reg.(*etcdV3Registry) - out.client.CleanKV() + err = out.client.CleanKV() + assert.NoError(t, err) return out } @@ -58,6 +59,7 @@ func (suite *RegistryTestSuite) TestRegister() { reg := initRegistry(t) err := reg.Register(url) + assert.NoError(t, err) children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers") if err != nil { t.Fatal(err) @@ -83,7 +85,8 @@ func (suite *RegistryTestSuite) TestSubscribe() { regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) reg2 := initRegistry(t) - reg2.Register(url) + err = reg2.Register(url) + assert.NoError(t, err) listener, err := reg2.subscribe(&url) if err != nil { t.Fatal(err) @@ -120,7 +123,8 @@ func (suite *RegistryTestSuite) TestProviderDestory() { t := suite.T() reg := initRegistry(t) url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - reg.Register(url) + err := reg.Register(url) + assert.NoError(t, err) //listener.Close() time.Sleep(1e9) diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 0b5cbf0658e6a9af162a35250bfb25156e72dc24..9591928eebd22bf2a99ec9dcfeb285c4519a3b90 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -92,17 +92,16 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } for { - if serviceEvent, err := listener.Next(); err != nil { + serviceEvent, err := listener.Next() + if err != nil { listener.Close() time.Sleep(time.Duration(3) * time.Second) return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) } + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) } - } }() } diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index a8b9fa83fa73858064e570722341c14f974f5c9e..965e91e894ac61562bfd25c8f564f789afd6c8a1 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -47,6 +47,7 @@ var ( ) const ( + //RegistryConnDelay registry connection delay RegistryConnDelay = 3 ) @@ -209,15 +210,15 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } for { - if serviceEvent, err := listener.Next(); err != nil { + serviceEvent, err := listener.Next() + if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) } + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) } } diff --git a/registry/registry.go b/registry/registry.go index e8cf5ecbb9e781d8c15e3e3c1bc8c2070e1526cc..863e781ac7bfa1c09561a6a4ab59c7f7a14e89d2 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -21,7 +21,7 @@ import ( "github.com/apache/dubbo-go/common" ) -// Extension - Registry +// Registry Extension - Registry type Registry interface { common.Node //used for service provider calling , register services to registry @@ -44,7 +44,7 @@ type NotifyListener interface { Notify(*ServiceEvent) } -//Deprecated! +// Listener Deprecated! type Listener interface { Next() (*ServiceEvent, error) Close() diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 1c3a3b67e02217ea48ff4eb38afd3a15f33cb71a..b21d7915f7797d662910f4261e9d5d328c0c2d9e 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -44,9 +44,12 @@ import ( ) const ( - RegistryZkClient = "zk registry" + // RegistryZkClient zk client name + RegistryZkClient = "zk registry" + // RegistryConnDelay connection delay RegistryConnDelay = 3 - MaxWaitInterval = time.Duration(3e9) + // MaxWaitInterval max wait interval + MaxWaitInterval = 3 * time.Second ) var ( diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 0e4b09bcf8552362f58bf3a3e3fbd80cf55affac..ba3ea6e864923b1e70cc4a0d31ee98415807699c 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -36,15 +36,19 @@ import ( ) const ( - ConnDelay = 3 - MaxFailTimes = 15 + // ConnDelay connection dalay + ConnDelay = 3 + // MaxFailTimes max failure times + MaxFailTimes = 15 + // RegistryETCDV3Client client name RegistryETCDV3Client = "etcd registry" ) var ( // ErrNilETCDV3Client ... ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR - ErrKVPairNotFound = perrors.New("k/v pair not found") + // ErrKVPairNotFound ... + ErrKVPairNotFound = perrors.New("k/v pair not found") ) // Options ... diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index 4d2970fe3375e7f0286e5b4038b7062ed0a730a1..a51a68bce78f4f24658f96dac5dc8778a07a6d9a 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -49,7 +49,7 @@ func NewEventListener(client *Client) *EventListener { } } -// Listen on a spec key +// ListenServiceNodeEvent Listen on a spec key // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { @@ -136,7 +136,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin panic("unreachable") } -// Listen on a set of key with spec prefix +// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { l.wg.Add(1) @@ -182,7 +182,7 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener +// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent diff --git a/remoting/listener.go b/remoting/listener.go index 12e2d50e5537dd0e0559ebd97e581fe0277cb245..3713ba0ccf9d98d4470741785a9490e657cf051c 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -34,8 +34,11 @@ type DataListener interface { type EventType int const ( + // EventTypeAdd ... EventTypeAdd = iota + // EventTypeDel ... EventTypeDel + // EventTypeUpdate ... EventTypeUpdate ) diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 095d04ed0f0e49211a6c2a8ccdaa64ed31edb186..b5f02281e9dfc5b5b6f1a289164e85d2b457d1a8 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -35,7 +35,9 @@ import ( ) const ( - ConnDelay = 3 + // ConnDelay connection delay interval + ConnDelay = 3 + // MaxFailTimes max fail times MaxFailTimes = 15 ) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 25805e8deb33685652abe0e4687e830ffac839f6..407cd8a230d730724dc6a40c2a885723f5087d60 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -267,7 +267,7 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener +// ListenServiceEvent is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent