diff --git a/cluster/cluster.go b/cluster/cluster.go index d89df5a21954d63552b857d988fa9a9e7d1fcfb5..617ce5ebf0fa7b5dc7f6047caacec9865aa6960f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// Cluster ... type Cluster interface { Join(Directory) protocol.Invoker } diff --git a/cluster/cluster_impl/available_cluster.go b/cluster/cluster_impl/available_cluster.go index 7e748cd938319ff437bb3fb6c7945b857d316069..2ad140b93e15b97d1517119b07b1080a68a0503f 100644 --- a/cluster/cluster_impl/available_cluster.go +++ b/cluster/cluster_impl/available_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(available, NewAvailableCluster) } +// NewAvailableCluster ... func NewAvailableCluster() cluster.Cluster { return &availableCluster{} } diff --git a/cluster/cluster_impl/available_cluster_invoker.go b/cluster/cluster_impl/available_cluster_invoker.go index bc6705c8156aaeb6a0a52e08b1aa539e179013ca..6f6d2dffbbbf2f6c758097b11713ae0c1b6bd387 100644 --- a/cluster/cluster_impl/available_cluster_invoker.go +++ b/cluster/cluster_impl/available_cluster_invoker.go @@ -35,6 +35,7 @@ type availableClusterInvoker struct { baseClusterInvoker } +// NewAvailableClusterInvoker ... func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { return &availableClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), diff --git a/cluster/cluster_impl/broadcast_cluster.go b/cluster/cluster_impl/broadcast_cluster.go index 50aae3cfab8d67570b50dcab4e53bbfad29d6d30..9b27a4ce37bc73e42b55e4e20deb9593fd837444 100644 --- a/cluster/cluster_impl/broadcast_cluster.go +++ b/cluster/cluster_impl/broadcast_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(broadcast, NewBroadcastCluster) } +// NewBroadcastCluster ... func NewBroadcastCluster() cluster.Cluster { return &broadcastCluster{} } diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster_impl/failback_cluster.go index de22c78e947d0b8124add721ab7ff42efebcdbe4..76573571684c07f63609009f59ab0ac881ae1b50 100644 --- a/cluster/cluster_impl/failback_cluster.go +++ b/cluster/cluster_impl/failback_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(failback, NewFailbackCluster) } +// NewFailbackCluster ... func NewFailbackCluster() cluster.Cluster { return &failbackCluster{} } diff --git a/cluster/cluster_impl/failfast_cluster.go b/cluster/cluster_impl/failfast_cluster.go index 6301d945626103226132b433dd21e8647f53a38b..e0b80ded041cd30b379857ff00d307811e53765d 100644 --- a/cluster/cluster_impl/failfast_cluster.go +++ b/cluster/cluster_impl/failfast_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(failfast, NewFailFastCluster) } +// NewFailFastCluster ... func NewFailFastCluster() cluster.Cluster { return &failfastCluster{} } diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go index 0f1aa0371f57df5414a04a59e2a6772a4cd382b3..b16be3bafd43c7de8e2fadd109a73a3ea710e225 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster_impl/failover_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(name, NewFailoverCluster) } +// NewFailoverCluster ... func NewFailoverCluster() cluster.Cluster { return &failoverCluster{} } diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster_impl/failsafe_cluster.go index 3ff97d25eae80980a90a03e71865bb8f9a63defe..177d24a585b5f72fb0667215beb8d11147cc2922 100644 --- a/cluster/cluster_impl/failsafe_cluster.go +++ b/cluster/cluster_impl/failsafe_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(failsafe, NewFailsafeCluster) } +// NewFailsafeCluster ... func NewFailsafeCluster() cluster.Cluster { return &failsafeCluster{} } diff --git a/cluster/cluster_impl/forking_cluster.go b/cluster/cluster_impl/forking_cluster.go index 0a3c2b313ff3c4e89e592af9256fc42713419914..6b0572b15088e86870b3d9fd911a1d0b022378be 100644 --- a/cluster/cluster_impl/forking_cluster.go +++ b/cluster/cluster_impl/forking_cluster.go @@ -31,6 +31,7 @@ func init() { extension.SetCluster(forking, NewForkingCluster) } +// NewForkingCluster ... func NewForkingCluster() cluster.Cluster { return &forkingCluster{} } diff --git a/cluster/cluster_impl/mock_cluster.go b/cluster/cluster_impl/mock_cluster.go index 50b2735554b61600fb090f382f3d2920b3d445e3..943c2add68281d01e320252d07b7d58e27b51283 100644 --- a/cluster/cluster_impl/mock_cluster.go +++ b/cluster/cluster_impl/mock_cluster.go @@ -24,6 +24,7 @@ import ( type mockCluster struct{} +// NewMockCluster ... func NewMockCluster() cluster.Cluster { return &mockCluster{} } diff --git a/cluster/cluster_impl/registry_aware_cluster.go b/cluster/cluster_impl/registry_aware_cluster.go index f4a28d6dcd5fbab8c62ee1f79bdd8576d8774a4c..079b688da65b3e6f6595212ad6e93c3b6ecc6504 100644 --- a/cluster/cluster_impl/registry_aware_cluster.go +++ b/cluster/cluster_impl/registry_aware_cluster.go @@ -29,6 +29,7 @@ func init() { extension.SetCluster("registryAware", NewRegistryAwareCluster) } +// NewRegistryAwareCluster ... func NewRegistryAwareCluster() cluster.Cluster { return ®istryAwareCluster{} } diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 1d59b51cc36858b80fb43c1d76e368e89e26ae36..e1a38c4c82cbac61fdfd96cd284c4eea44c97ccc 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -27,25 +27,32 @@ import ( "github.com/apache/dubbo-go/common" ) +// BaseDirectory ... type BaseDirectory struct { url *common.URL destroyed *atomic.Bool mutex sync.Mutex } +// NewBaseDirectory ... func NewBaseDirectory(url *common.URL) BaseDirectory { return BaseDirectory{ url: url, destroyed: atomic.NewBool(false), } } + +// GetUrl ... func (dir *BaseDirectory) GetUrl() common.URL { return *dir.url } + +// GetDirectoryUrl ... func (dir *BaseDirectory) GetDirectoryUrl() *common.URL { return dir.url } +// Destroy ... func (dir *BaseDirectory) Destroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { dir.mutex.Lock() @@ -54,6 +61,7 @@ func (dir *BaseDirectory) Destroy(doDestroy func()) { } } +// IsAvailable ... func (dir *BaseDirectory) IsAvailable() bool { return !dir.destroyed.Load() } diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index e7a0e6e569db620ee83521505c9568199d45fe1e..7d2d5490b02d22b12d55385458715fa8b31f2cac 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -27,6 +27,7 @@ type staticDirectory struct { invokers []protocol.Invoker } +// NewStaticDirectory ... func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory { var url common.URL diff --git a/cluster/loadbalance/consistent_hash.go b/cluster/loadbalance/consistent_hash.go index 365e6a66242e4a4618ab922f80b4b4247076484d..8c5f8a5001347d10da4347827c1935ddda1f8a86 100644 --- a/cluster/loadbalance/consistent_hash.go +++ b/cluster/loadbalance/consistent_hash.go @@ -50,13 +50,16 @@ func init() { extension.SetLoadbalance(ConsistentHash, NewConsistentHashLoadBalance) } +// ConsistentHashLoadBalance ... type ConsistentHashLoadBalance struct { } +// NewConsistentHashLoadBalance ... func NewConsistentHashLoadBalance() cluster.LoadBalance { return &ConsistentHashLoadBalance{} } +// Select ... func (lb *ConsistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { methodName := invocation.MethodName() key := invokers[0].GetUrl().ServiceKey() + "." + methodName @@ -79,6 +82,7 @@ func (lb *ConsistentHashLoadBalance) Select(invokers []protocol.Invoker, invocat return selector.Select(invocation) } +// Uint32Slice ... type Uint32Slice []uint32 func (s Uint32Slice) Len() int { @@ -93,6 +97,7 @@ func (s Uint32Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +// ConsistentHashSelector ... type ConsistentHashSelector struct { hashCode uint32 replicaNum int @@ -133,6 +138,7 @@ func newConsistentHashSelector(invokers []protocol.Invoker, methodName string, return selector } +// Select ... func (c *ConsistentHashSelector) Select(invocation protocol.Invocation) protocol.Invoker { key := c.toKey(invocation.Arguments()) digest := md5.Sum([]byte(key)) diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go index a1e8516698d23118fdb42e855dabd1cb485ac41c..773bb9323f02349a221a754f256b6c50ac2911a2 100644 --- a/cluster/loadbalance/least_active.go +++ b/cluster/loadbalance/least_active.go @@ -38,6 +38,7 @@ func init() { type leastActiveLoadBalance struct { } +// NewLeastActiveLoadBalance ... func NewLeastActiveLoadBalance() cluster.LoadBalance { return &leastActiveLoadBalance{} } diff --git a/cluster/loadbalance/random.go b/cluster/loadbalance/random.go index 919792162dc527fa8c1e5cf2911f2933fa8232ef..56f13631b653ed070dae7def5bea97d924141209 100644 --- a/cluster/loadbalance/random.go +++ b/cluster/loadbalance/random.go @@ -38,6 +38,7 @@ func init() { type randomLoadBalance struct { } +// NewRandomLoadBalance ... func NewRandomLoadBalance() cluster.LoadBalance { return &randomLoadBalance{} } diff --git a/cluster/loadbalance/round_robin.go b/cluster/loadbalance/round_robin.go index 075acac7cdc60086ececb7b655dee86ec5198369..653e42c3b5d08cbefb25db98278fb6afa6f02c96 100644 --- a/cluster/loadbalance/round_robin.go +++ b/cluster/loadbalance/round_robin.go @@ -49,6 +49,7 @@ func init() { type roundRobinLoadBalance struct{} +// NewRoundRobinLoadBalance ... func NewRoundRobinLoadBalance() cluster.LoadBalance { return &roundRobinLoadBalance{} } diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go index 7e0c2e265073c0a96032a6dd3294a6d73c1a4001..9f36ad9379a3a09a4a058f6179e3e537b9e105bc 100644 --- a/cluster/loadbalance/util.go +++ b/cluster/loadbalance/util.go @@ -26,6 +26,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// GetWeight ... func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { url := invoker.GetUrl() weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) diff --git a/cluster/router.go b/cluster/router.go index 54a19695574f245fcac236e9308a2469f306a4f8..589eb9a2696e5772070a94e8c764c78c8e0ca8a2 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -24,18 +24,22 @@ import ( // Extension - Router +// RouterFactory ... type RouterFactory interface { Router(*common.URL) (Router, error) } +// Router ... type Router interface { Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker } +// RouterChain ... type RouterChain struct { routers []Router } +// NewRouterChain ... func NewRouterChain(url common.URL) { } diff --git a/cluster/router/condition_router.go b/cluster/router/condition_router.go index ff1537fc71ec0aa5257cd14b49d514f15d86556a..28966e4eac8f289f34f2958a9509f01bdb54d23a 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition_router.go @@ -259,6 +259,7 @@ func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U return result, nil } +// MatchPair ... type MatchPair struct { Matches *gxset.HashSet Mismatches *gxset.HashSet diff --git a/cluster/router/router_factory.go b/cluster/router/router_factory.go index a9794cb885badae98445ef4d7c0bbc2230d25d5f..723050939e5080f1fefd230986dc679dfbdc06ed 100644 --- a/cluster/router/router_factory.go +++ b/cluster/router/router_factory.go @@ -27,11 +27,15 @@ func init() { extension.SetRouterFactory("condition", NewConditionRouterFactory) } +// ConditionRouterFactory ... type ConditionRouterFactory struct{} +// NewConditionRouterFactory ... func NewConditionRouterFactory() cluster.RouterFactory { return ConditionRouterFactory{} } + +// Router ... func (c ConditionRouterFactory) Router(url *common.URL) (cluster.Router, error) { return newConditionRouter(url) } diff --git a/common/config/environment.go b/common/config/environment.go index 931f0460917d68d3bd7adcd6a6bc1b49c7d3bba8..256741b99968b292330b26cd6c46f6ee421a55a2 100644 --- a/common/config/environment.go +++ b/common/config/environment.go @@ -45,12 +45,15 @@ var ( once sync.Once ) +// GetEnvInstance ... func GetEnvInstance() *Environment { once.Do(func() { instance = &Environment{configCenterFirst: true} }) return instance } + +// NewEnvInstance ... func NewEnvInstance() { instance = &Environment{configCenterFirst: true} } @@ -63,18 +66,21 @@ func NewEnvInstance() { // return env.configCenterFirst //} +// UpdateExternalConfigMap ... func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) { for k, v := range externalMap { env.externalConfigMap.Store(k, v) } } +// UpdateAppExternalConfigMap ... func (env *Environment) UpdateAppExternalConfigMap(externalMap map[string]string) { for k, v := range externalMap { env.appExternalConfigMap.Store(k, v) } } +// Configuration ... func (env *Environment) Configuration() *list.List { list := list.New() // The sequence would be: SystemConfiguration -> ExternalConfiguration -> AppExternalConfiguration -> AbstractConfig -> PropertiesConfiguration @@ -83,14 +89,17 @@ func (env *Environment) Configuration() *list.List { return list } +// SetDynamicConfiguration ... func (env *Environment) SetDynamicConfiguration(dc config_center.DynamicConfiguration) { env.dynamicConfiguration = dc } +// GetDynamicConfiguration ... func (env *Environment) GetDynamicConfiguration() config_center.DynamicConfiguration { return env.dynamicConfiguration } +// InmemoryConfiguration ... type InmemoryConfiguration struct { store *sync.Map } @@ -99,6 +108,7 @@ func newInmemoryConfiguration(p *sync.Map) *InmemoryConfiguration { return &InmemoryConfiguration{store: p} } +// GetProperty ... func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) { if conf.store == nil { return false, "" @@ -112,6 +122,7 @@ func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) { return false, "" } +// GetSubProperty ... func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} { if conf.store == nil { return nil diff --git a/common/extension/cluster.go b/common/extension/cluster.go index 91e9f953b505e31c1a4f448e1504a6ae50a9663f..b2d81f6b1e56bb487b1d408b878308f6dfe042e4 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -25,10 +25,12 @@ var ( clusters = make(map[string]func() cluster.Cluster) ) +// SetCluster ... func SetCluster(name string, fcn func() cluster.Cluster) { clusters[name] = fcn } +// GetCluster ... func GetCluster(name string) cluster.Cluster { if clusters[name] == nil { panic("cluster for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_center.go b/common/extension/config_center.go index be4b62ccdd9c36500c306c7f16abd054f91ae86b..03d27db46c94b0ea0e212646077d97f948a8e328 100644 --- a/common/extension/config_center.go +++ b/common/extension/config_center.go @@ -26,10 +26,12 @@ var ( configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error)) ) +// SetConfigCenter ... func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) { configCenters[name] = v } +// GetConfigCenter ... func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) { if configCenters[name] == nil { panic("config center for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_center_factory.go b/common/extension/config_center_factory.go index 82e0ef6ebcf632ccff32aec5c69c2082a28c51af..85913fdce1ed3472c2bd9eb4aadbb0f631481dbd 100644 --- a/common/extension/config_center_factory.go +++ b/common/extension/config_center_factory.go @@ -25,10 +25,12 @@ var ( configCenterFactories = make(map[string]func() config_center.DynamicConfigurationFactory) ) +// SetConfigCenterFactory ... func SetConfigCenterFactory(name string, v func() config_center.DynamicConfigurationFactory) { configCenterFactories[name] = v } +// GetConfigCenterFactory ... func GetConfigCenterFactory(name string) config_center.DynamicConfigurationFactory { if configCenterFactories[name] == nil { panic("config center for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/configurator.go b/common/extension/configurator.go index 40d134f474ae792afb76f1d8e3f56d172bfd07e2..63bcc8c55dc48ce1feb43ea0dc82172f6ea48526 100644 --- a/common/extension/configurator.go +++ b/common/extension/configurator.go @@ -30,10 +30,12 @@ var ( configurator = make(map[string]getConfiguratorFunc) ) +// SetConfigurator ... func SetConfigurator(name string, v getConfiguratorFunc) { configurator[name] = v } +// GetConfigurator ... func GetConfigurator(name string, url *common.URL) config_center.Configurator { if configurator[name] == nil { panic("configurator for " + name + " is not existing, make sure you have import the package.") @@ -41,10 +43,13 @@ func GetConfigurator(name string, url *common.URL) config_center.Configurator { return configurator[name](url) } + +// SetDefaultConfigurator ... func SetDefaultConfigurator(v getConfiguratorFunc) { configurator[DefaultKey] = v } +// GetDefaultConfigurator ... func GetDefaultConfigurator(url *common.URL) config_center.Configurator { if configurator[DefaultKey] == nil { panic("configurator for default is not existing, make sure you have import the package.") @@ -52,6 +57,8 @@ func GetDefaultConfigurator(url *common.URL) config_center.Configurator { return configurator[DefaultKey](url) } + +// GetDefaultConfiguratorFunc ... func GetDefaultConfiguratorFunc() getConfiguratorFunc { if configurator[DefaultKey] == nil { panic("configurator for default is not existing, make sure you have import the package.") diff --git a/common/extension/filter.go b/common/extension/filter.go index 0b5c4b40aa78f7ea489a306f4a52efbb07243b41..deea2d908bc2741e0f15ecc36e9d4fc5975e531e 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -26,10 +26,12 @@ var ( rejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler) ) +// SetFilter ... func SetFilter(name string, v func() filter.Filter) { filters[name] = v } +// GetFilter ... func GetFilter(name string) filter.Filter { if filters[name] == nil { panic("filter for " + name + " is not existing, make sure you have imported the package.") @@ -37,10 +39,12 @@ func GetFilter(name string) filter.Filter { return filters[name]() } +// SetRejectedExecutionHandler ... func SetRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { rejectedExecutionHandler[name] = creator } +// GetRejectedExecutionHandler ... func GetRejectedExecutionHandler(name string) filter.RejectedExecutionHandler { creator, ok := rejectedExecutionHandler[name] if !ok { diff --git a/common/extension/graceful_shutdown.go b/common/extension/graceful_shutdown.go index c8807fcc28c18c1a6fddb4e97708e9b0d5cda243..bc03a2ff4a440aabfef4233374308fc486f5618e 100644 --- a/common/extension/graceful_shutdown.go +++ b/common/extension/graceful_shutdown.go @@ -48,6 +48,7 @@ func AddCustomShutdownCallback(callback func()) { customShutdownCallbacks.PushBack(callback) } +// GetAllCustomShutdownCallbacks ... func GetAllCustomShutdownCallbacks() *list.List { return customShutdownCallbacks } diff --git a/common/extension/loadbalance.go b/common/extension/loadbalance.go index f1f97b9399a2b33a3e06213fc0b2f84e73b002b7..0d557a4640ed892a18ad59a3247763ab5807a593 100644 --- a/common/extension/loadbalance.go +++ b/common/extension/loadbalance.go @@ -25,10 +25,12 @@ var ( loadbalances = make(map[string]func() cluster.LoadBalance) ) +// SetLoadbalance ... func SetLoadbalance(name string, fcn func() cluster.LoadBalance) { loadbalances[name] = fcn } +// GetLoadbalance ... func GetLoadbalance(name string) cluster.LoadBalance { if loadbalances[name] == nil { panic("loadbalance for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/protocol.go b/common/extension/protocol.go index 50d339476d024c04b182c38632689a99bc5c1680..009687a17ace8cea567248af655e04604d09d9b8 100644 --- a/common/extension/protocol.go +++ b/common/extension/protocol.go @@ -25,10 +25,12 @@ var ( protocols = make(map[string]func() protocol.Protocol) ) +// SetProtocol ... func SetProtocol(name string, v func() protocol.Protocol) { protocols[name] = v } +// GetProtocol ... func GetProtocol(name string) protocol.Protocol { if protocols[name] == nil { panic("protocol for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/proxy_factory.go b/common/extension/proxy_factory.go index 53cbbee54054bf8ad87964393b01ca6601106066..7b9a5b860ba1413f69d46e0657b1145e3c285304 100644 --- a/common/extension/proxy_factory.go +++ b/common/extension/proxy_factory.go @@ -25,9 +25,12 @@ var ( proxy_factories = make(map[string]func(...proxy.Option) proxy.ProxyFactory) ) +// SetProxyFactory ... func SetProxyFactory(name string, f func(...proxy.Option) proxy.ProxyFactory) { proxy_factories[name] = f } + +// GetProxyFactory ... func GetProxyFactory(name string) proxy.ProxyFactory { if name == "" { name = "default" diff --git a/common/extension/registry.go b/common/extension/registry.go index 776c2b5df542e56f8c120c850f20093a971d8602..6ba746dc47382927d12ce39b7936212c5d75153d 100644 --- a/common/extension/registry.go +++ b/common/extension/registry.go @@ -26,10 +26,12 @@ var ( registrys = make(map[string]func(config *common.URL) (registry.Registry, error)) ) +// SetRegistry ... func SetRegistry(name string, v func(config *common.URL) (registry.Registry, error)) { registrys[name] = v } +// GetRegistry ... func GetRegistry(name string, config *common.URL) (registry.Registry, error) { if registrys[name] == nil { panic("registry for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index 6f27aafaebf87147116e74272cc229657f436201..c77cc291369ab02c5f58dfc6c283902ac0df4b95 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -25,10 +25,12 @@ var ( routers = make(map[string]func() cluster.RouterFactory) ) +// SetRouterFactory ... func SetRouterFactory(name string, fun func() cluster.RouterFactory) { routers[name] = fun } +// GetRouterFactory ... func GetRouterFactory(name string) cluster.RouterFactory { if routers[name] == nil { panic("router_factory for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index 8c131fafa3159047d25b43ae0a57d674418a2170..c72c2b030fc0f391362189bfe18a65582543693a 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -26,10 +26,12 @@ var ( tpsLimiter = make(map[string]func() filter.TpsLimiter) ) +// SetTpsLimiter ... func SetTpsLimiter(name string, creator func() filter.TpsLimiter) { tpsLimiter[name] = creator } +// GetTpsLimiter ... func GetTpsLimiter(name string) filter.TpsLimiter { creator, ok := tpsLimiter[name] if !ok { @@ -39,10 +41,12 @@ func GetTpsLimiter(name string) filter.TpsLimiter { return creator() } +// SetTpsLimitStrategy ... func SetTpsLimitStrategy(name string, creator filter.TpsLimitStrategyCreator) { tpsLimitStrategy[name] = creator } +// GetTpsLimitStrategyCreator ... func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator { creator, ok := tpsLimitStrategy[name] if !ok { diff --git a/common/logger/logger.go b/common/logger/logger.go index db91d2e7c1e5f7a647eefbfa5aec14073c2b14a7..016afe69808f2007541c617f406db64beb511f1c 100644 --- a/common/logger/logger.go +++ b/common/logger/logger.go @@ -40,11 +40,13 @@ var ( logger Logger ) +// DubboLogger ... type DubboLogger struct { Logger dynamicLevel zap.AtomicLevel } +// Logger ... type Logger interface { Info(args ...interface{}) Warn(args ...interface{}) @@ -65,6 +67,7 @@ func init() { } } +// InitLog ... func InitLog(logConfFile string) error { if logConfFile == "" { InitLogger(nil) @@ -93,6 +96,7 @@ func InitLog(logConfFile string) error { return nil } +// InitLogger ... func InitLogger(conf *zap.Config) { var zapLoggerConfig zap.Config if conf == nil { @@ -121,15 +125,18 @@ func InitLogger(conf *zap.Config) { getty.SetLogger(logger) } +// SetLogger ... func SetLogger(log Logger) { logger = log getty.SetLogger(logger) } +// GetLogger ... func GetLogger() Logger { return logger } +// SetLoggerLevel ... func SetLoggerLevel(level string) bool { if l, ok := logger.(OpsLogger); ok { l.SetLoggerLevel(level) @@ -138,11 +145,13 @@ func SetLoggerLevel(level string) bool { return false } +// OpsLogger ... type OpsLogger interface { Logger SetLoggerLevel(level string) } +// SetLoggerLevel ... func (dl *DubboLogger) SetLoggerLevel(level string) { l := new(zapcore.Level) l.Set(level) diff --git a/common/logger/logging.go b/common/logger/logging.go index 4638c9a41dfe986d256e1ff4d52b690c1747fc94..36d48ee61e8a4a986abfbaa79f3d361cd81494f4 100644 --- a/common/logger/logging.go +++ b/common/logger/logging.go @@ -17,27 +17,42 @@ package logger +// Info ... func Info(args ...interface{}) { logger.Info(args...) } + +// Warn ... func Warn(args ...interface{}) { logger.Warn(args...) } + +// Error ... func Error(args ...interface{}) { logger.Error(args...) } + +// Debug ... func Debug(args ...interface{}) { logger.Debug(args...) } + +// Infof ... func Infof(fmt string, args ...interface{}) { logger.Infof(fmt, args...) } + +// Warnf ... func Warnf(fmt string, args ...interface{}) { logger.Warnf(fmt, args...) } + +// Errorf ... func Errorf(fmt string, args ...interface{}) { logger.Errorf(fmt, args...) } + +// Debugf ... func Debugf(fmt string, args ...interface{}) { logger.Debugf(fmt, args...) } diff --git a/common/node.go b/common/node.go index 992ead38d8acf85bbb14f02eebca4c4fe5a0a1e2..979eee31ef3a63eb21af6c9045aee7f6d784f2ba 100644 --- a/common/node.go +++ b/common/node.go @@ -17,6 +17,7 @@ package common +// Node ... type Node interface { GetUrl() URL IsAvailable() bool diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 9610b5970584ebf7111510c97d4866bebbdf2c43..d0be491d406170ea4c52e65f70f0dfbe7b1b3cb6 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -42,6 +42,7 @@ type Proxy struct { var typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() +// NewProxy ... func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy { return &Proxy{ invoke: invoke, @@ -189,10 +190,12 @@ func (p *Proxy) Implement(v common.RPCService) { } +// Get ... func (p *Proxy) Get() common.RPCService { return p.rpc } +// GetCallback ... func (p *Proxy) GetCallback() interface{} { return p.callBack } diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 116cfe06693b6923ca10e0df6964317dabd91d0e..7b249a3e9754b097130a80bf3819d282dad6b6e8 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -22,10 +22,12 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// ProxyFactory ... type ProxyFactory interface { GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy GetInvoker(url common.URL) protocol.Invoker } +// Option ... type Option func(ProxyFactory) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 1c299c9cef73e17bf87d4c19768606ceed8b0309..114cfee2363022da5f7957a825a16fc42b8c928f 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -40,6 +40,7 @@ func init() { extension.SetProxyFactory("default", NewDefaultProxyFactory) } +// DefaultProxyFactory ... type DefaultProxyFactory struct { //delegate ProxyFactory } @@ -52,13 +53,17 @@ type DefaultProxyFactory struct { // } //} +// NewDefaultProxyFactory ... func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } + +// GetProxy ... func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { return factory.GetAsyncProxy(invoker, nil, url) } +// GetAsyncProxy ... func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} @@ -66,16 +71,19 @@ func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, call return proxy.NewProxy(invoker, callBack, attachments) } +// GetInvoker ... func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), } } +// ProxyInvoker ... type ProxyInvoker struct { protocol.BaseInvoker } +// Invoke ... func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { result := &protocol.RPCResult{} result.SetAttachments(invocation.Attachments()) diff --git a/common/rpc_service.go b/common/rpc_service.go index 4c9f083dd0850c3f110491ef820c7b677c8009aa..b819cf28f5c2499d39ac3bcd977c0d1b442daff5 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -72,6 +72,7 @@ var ( // info of method ////////////////////////// +// MethodType ... type MethodType struct { method reflect.Method ctxType reflect.Type // request context @@ -79,18 +80,27 @@ type MethodType struct { replyType reflect.Type // return value, otherwise it is nil } +// Method ... func (m *MethodType) Method() reflect.Method { return m.method } + +// CtxType ... func (m *MethodType) CtxType() reflect.Type { return m.ctxType } + +// ArgsType ... func (m *MethodType) ArgsType() []reflect.Type { return m.argsType } + +// ReplyType ... func (m *MethodType) ReplyType() reflect.Type { return m.replyType } + +// SuiteContext ... func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value { if contextv := reflect.ValueOf(ctx); contextv.IsValid() { return contextv @@ -102,6 +112,7 @@ func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value { // info of service interface ////////////////////////// +// Service ... type Service struct { name string rcvr reflect.Value @@ -109,12 +120,17 @@ type Service struct { methods map[string]*MethodType } +// Method ... func (s *Service) Method() map[string]*MethodType { return s.methods } + +// RcvrType ... func (s *Service) RcvrType() reflect.Type { return s.rcvrType } + +// Rcvr ... func (s *Service) Rcvr() reflect.Value { return s.rcvr } diff --git a/common/url.go b/common/url.go index 7a854293142bb237df7faa0a081104bfabdd0bb4..6283bf454135dc63e089a303f9131c65f9cded9e 100644 --- a/common/url.go +++ b/common/url.go @@ -53,16 +53,19 @@ const ( ) var ( + // DubboNodes ... DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} DubboRole = [...]string{"consumer", "", "", "provider"} ) +// RoleType ... type RoleType int func (t RoleType) String() string { return DubboNodes[t] } +// Role ... func (t RoleType) Role() string { return DubboRole[t] } @@ -79,6 +82,7 @@ type baseUrl struct { ctx context.Context } +// URL ... type URL struct { baseUrl Path string // like /com.ikurento.dubbo.UserProvider3 @@ -91,66 +95,77 @@ type URL struct { type option func(*URL) +// WithUsername ... func WithUsername(username string) option { return func(url *URL) { url.Username = username } } +// WithPassword ... func WithPassword(pwd string) option { return func(url *URL) { url.Password = pwd } } +// WithMethods ... func WithMethods(methods []string) option { return func(url *URL) { url.Methods = methods } } +// WithParams ... func WithParams(params url.Values) option { return func(url *URL) { url.params = params } } +// WithParamsValue ... func WithParamsValue(key, val string) option { return func(url *URL) { url.SetParam(key, val) } } +// WithProtocol ... func WithProtocol(proto string) option { return func(url *URL) { url.Protocol = proto } } +// WithIp ... func WithIp(ip string) option { return func(url *URL) { url.Ip = ip } } +// WithPort ... func WithPort(port string) option { return func(url *URL) { url.Port = port } } +// WithPath ... func WithPath(path string) option { return func(url *URL) { url.Path = "/" + strings.TrimPrefix(path, "/") } } +// WithLocation ... func WithLocation(location string) option { return func(url *URL) { url.Location = location } } +// WithToken ... func WithToken(token string) option { return func(url *URL) { if len(token) > 0 { @@ -163,6 +178,7 @@ func WithToken(token string) option { } } +// NewURLWithOptions ... func NewURLWithOptions(opts ...option) *URL { url := &URL{} for _, opt := range opts { @@ -172,6 +188,7 @@ func NewURLWithOptions(opts ...option) *URL { return url } +// NewURL ... func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error) { var ( @@ -227,6 +244,7 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error) return s, nil } +// URLEqual ... func (c URL) URLEqual(url URL) bool { c.Ip = "" c.Port = "" @@ -282,6 +300,7 @@ func (c URL) String() string { return buildString } +// Key ... func (c URL) Key() string { buildString := fmt.Sprintf( "%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s", @@ -290,6 +309,7 @@ func (c URL) Key() string { //return c.ServiceKey() } +// ServiceKey ... func (c URL) ServiceKey() string { intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if intf == "" { @@ -313,15 +333,18 @@ func (c URL) ServiceKey() string { return buf.String() } +// EncodedServiceKey ... func (c *URL) EncodedServiceKey() string { serviceKey := c.ServiceKey() return strings.Replace(serviceKey, "/", "*", 1) } +// Context ... func (c URL) Context() context.Context { return c.ctx } +// Service ... func (c URL) Service() string { service := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if service != "" { @@ -335,18 +358,21 @@ func (c URL) Service() string { return "" } +// AddParam ... func (c *URL) AddParam(key string, value string) { c.paramsLock.Lock() c.params.Add(key, value) c.paramsLock.Unlock() } +// SetParam ... func (c *URL) SetParam(key string, value string) { c.paramsLock.Lock() c.params.Set(key, value) c.paramsLock.Unlock() } +// RangeParams ... func (c *URL) RangeParams(f func(key, value string) bool) { c.paramsLock.RLock() defer c.paramsLock.RUnlock() @@ -357,6 +383,7 @@ func (c *URL) RangeParams(f func(key, value string) bool) { } } +// GetParam ... func (c URL) GetParam(s string, d string) string { var r string c.paramsLock.RLock() @@ -367,10 +394,12 @@ func (c URL) GetParam(s string, d string) string { return r } +// GetParams ... func (c URL) GetParams() url.Values { return c.params } +// GetParamAndDecoded ... func (c URL) GetParamAndDecoded(key string) (string, error) { c.paramsLock.RLock() defer c.paramsLock.RUnlock() @@ -379,6 +408,7 @@ func (c URL) GetParamAndDecoded(key string) (string, error) { return value, err } +// GetRawParam ... func (c URL) GetRawParam(key string) string { switch key { case "protocol": @@ -409,6 +439,7 @@ func (c URL) GetParamBool(s string, d bool) bool { return r } +// GetParamInt ... func (c URL) GetParamInt(s string, d int64) int64 { var r int var err error @@ -419,6 +450,7 @@ func (c URL) GetParamInt(s string, d int64) int64 { return int64(r) } +// GetMethodParamInt ... func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { var r int var err error @@ -430,6 +462,7 @@ func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { return int64(r) } +// GetMethodParamInt64 ... func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 { r := c.GetMethodParamInt(method, key, math.MinInt64) if r == math.MinInt64 { @@ -439,6 +472,7 @@ func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 { return r } +// GetMethodParam ... func (c URL) GetMethodParam(method string, key string, d string) string { var r string if r = c.GetParam("methods."+method+"."+key, ""); r == "" { @@ -447,11 +481,13 @@ func (c URL) GetMethodParam(method string, key string, d string) string { return r } +// GetMethodParamBool ... func (c URL) GetMethodParamBool(method string, key string, d bool) bool { r := c.GetParamBool("methods."+method+"."+key, d) return r } +// RemoveParams ... func (c *URL) RemoveParams(set *gxset.HashSet) { c.paramsLock.Lock() defer c.paramsLock.Unlock() @@ -461,6 +497,7 @@ func (c *URL) RemoveParams(set *gxset.HashSet) { } } +// SetParams ... func (c *URL) SetParams(m url.Values) { for k := range m { c.SetParam(k, m.Get(k)) @@ -512,6 +549,7 @@ func (c URL) ToMap() map[string]string { // in this function we should merge the reference local url config into the service url from registry. //TODO configuration merge, in the future , the configuration center's config should merge too. +// MergeUrl ... func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { mergedUrl := serviceUrl.Clone() @@ -540,6 +578,8 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { return mergedUrl } + +// Clone ... func (c *URL) Clone() *URL { newUrl := &URL{} copier.Copy(newUrl, c) diff --git a/config/application_config.go b/config/application_config.go index fcd4d38c9b55963c32d58fdd1b80375083a76d8c..23ab7d34aceaba02d7f592906d6f4e3d6cf36dae 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -25,6 +25,7 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ApplicationConfig ... type ApplicationConfig struct { Organization string `yaml:"organization" json:"organization,omitempty" property:"organization"` Name string `yaml:"name" json:"name,omitempty" property:"name"` @@ -34,15 +35,22 @@ type ApplicationConfig struct { Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"` } +// Prefix ... func (*ApplicationConfig) Prefix() string { return constant.DUBBO + ".application." } + +// Id ... func (c *ApplicationConfig) Id() string { return "" } + +// SetId ... func (c *ApplicationConfig) SetId(id string) { } + +// UnmarshalYAML ... func (c *ApplicationConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err diff --git a/config/base_config.go b/config/base_config.go index 64418f0a6d4c09270d48e6e9e6366a02783508d3..4e4773fa48d70315c7049404407bca344bd00ecd 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -39,6 +39,7 @@ type multiConfiger interface { Prefix() string } +// BaseConfig ... type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` configCenterUrl *common.URL @@ -308,6 +309,7 @@ func (c *BaseConfig) freshInternalConfig(config *config.InmemoryConfiguration) { setFieldValue(val, reflect.Value{}, config) } +// SetFatherConfig ... func (c *BaseConfig) SetFatherConfig(fatherConfig interface{}) { c.fatherConfig = fatherConfig } diff --git a/config/config_center_config.go b/config/config_center_config.go index aee9ae8c1251c2ce45e30afd3fe3edcf47328ebe..40b9b6517186a8a4f7956db3d23f0a1cdfbdc8cb 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -31,6 +31,7 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ConfigCenterConfig ... type ConfigCenterConfig struct { context context.Context Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` @@ -47,6 +48,7 @@ type ConfigCenterConfig struct { timeout time.Duration } +// UnmarshalYAML ... func (c *ConfigCenterConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err @@ -58,6 +60,7 @@ func (c *ConfigCenterConfig) UnmarshalYAML(unmarshal func(interface{}) error) er return nil } +// GetUrlMap ... func (c *ConfigCenterConfig) GetUrlMap() url.Values { urlMap := url.Values{} urlMap.Set(constant.CONFIG_NAMESPACE_KEY, c.Namespace) diff --git a/config/consumer_config.go b/config/consumer_config.go index 97d6124b79c62bcc0e330562f8d12710571a7388..7a35d8ef7541a005c972ac261e9e95cf7b703ea5 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -39,6 +39,7 @@ import ( // consumerConfig ///////////////////////// +// ConsumerConfig ... type ConsumerConfig struct { BaseConfig `yaml:",inline"` Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` @@ -61,6 +62,7 @@ type ConsumerConfig struct { ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` } +// UnmarshalYAML ... func (c *ConsumerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err @@ -72,14 +74,17 @@ func (c *ConsumerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error return nil } +// Prefix ... func (*ConsumerConfig) Prefix() string { return constant.ConsumerConfigPrefix } +// SetConsumerConfig ... func SetConsumerConfig(c ConsumerConfig) { consumerConfig = &c } +// GetConsumerConfig ... func GetConsumerConfig() ConsumerConfig { if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") @@ -88,6 +93,7 @@ func GetConsumerConfig() ConsumerConfig { return *consumerConfig } +// ConsumerInit ... func ConsumerInit(confConFile string) error { if confConFile == "" { return perrors.Errorf("application configure(consumer) file name is nil") diff --git a/config/generic_service.go b/config/generic_service.go index 8a4e88df9788554bc4a5ee33884166e4ccede37f..e0171418ceaa72ecb9ad64055781baa2e5afdc30 100644 --- a/config/generic_service.go +++ b/config/generic_service.go @@ -16,15 +16,18 @@ */ package config +// GenericService ... type GenericService struct { Invoke func(req []interface{}) (interface{}, error) `dubbo:"$invoke"` referenceStr string } +// NewGenericService ... func NewGenericService(referenceStr string) *GenericService { return &GenericService{referenceStr: referenceStr} } +// Reference ... func (u *GenericService) Reference() string { return u.referenceStr } diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index 83e2589c7b2869a5822c5e90de1699b3bd27df92..382f05c8d57c4363108873433fd03565d03b9a50 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -52,6 +52,7 @@ import ( * We define them by using 'package build' feature https://golang.org/pkg/go/build/ */ +// GracefulShutdownInit ... func GracefulShutdownInit() { signals := make(chan os.Signal, 1) @@ -82,6 +83,7 @@ func GracefulShutdownInit() { }() } +// BeforeShutdown ... func BeforeShutdown() { destroyAllRegistries() diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go index df55728565f6cf14ce4357f8c9c7927c30d80e40..6bbabebf2538effcbbe4bddc50857acf5f962a61 100644 --- a/config/graceful_shutdown_config.go +++ b/config/graceful_shutdown_config.go @@ -31,6 +31,7 @@ const ( defaultStepTimeout = 10 * time.Second ) +// ShutdownConfig ... type ShutdownConfig struct { /* * Total timeout. Even though we don't release all resources, @@ -57,10 +58,12 @@ type ShutdownConfig struct { RequestsFinished bool } +// Prefix ... func (config *ShutdownConfig) Prefix() string { return constant.ShutdownConfigPrefix } +// GetTimeout ... func (config *ShutdownConfig) GetTimeout() time.Duration { result, err := time.ParseDuration(config.Timeout) if err != nil { @@ -71,6 +74,7 @@ func (config *ShutdownConfig) GetTimeout() time.Duration { return result } +// GetStepTimeout ... func (config *ShutdownConfig) GetStepTimeout() time.Duration { result, err := time.ParseDuration(config.StepTimeout) if err != nil { diff --git a/config/graceful_shutdown_signal_darwin.go b/config/graceful_shutdown_signal_darwin.go index 59c1a5d149c2e9db8e9ac981adec107cafc863ad..c6932bf981d8857615f19c8ead3ef0f93dd74358 100644 --- a/config/graceful_shutdown_signal_darwin.go +++ b/config/graceful_shutdown_signal_darwin.go @@ -22,9 +22,11 @@ import ( "syscall" ) +// ShutdownSignals ... var ShutdownSignals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS} +// DumpHeapShutdownSignals ... var DumpHeapShutdownSignals = []os.Signal{syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS} diff --git a/config/method_config.go b/config/method_config.go index e10548e667e6a16d33690f011ebc9958af1eea71..6dd8099a6310a861d6645d478b0c1688fcdebf77 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -24,6 +24,7 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// MethodConfig ... type MethodConfig struct { InterfaceId string InterfaceName string @@ -40,6 +41,7 @@ type MethodConfig struct { RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } +// Prefix ... func (c *MethodConfig) Prefix() string { if c.InterfaceId != "" { return constant.DUBBO + "." + c.InterfaceName + "." + c.InterfaceId + "." + c.Name + "." @@ -48,6 +50,7 @@ func (c *MethodConfig) Prefix() string { } } +// UnmarshalYAML ... func (c *MethodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err diff --git a/config/mock_rpcservice.go b/config/mock_rpcservice.go index 64d431ffb6dfbc7e25a988c6093cf0ab5cbd2db5..6c43699128247bf0ec483eb83f879bf4c3b67a37 100644 --- a/config/mock_rpcservice.go +++ b/config/mock_rpcservice.go @@ -21,16 +21,20 @@ import ( "context" ) +// MockService ... type MockService struct{} +// Reference ... func (*MockService) Reference() string { return "MockService" } +// GetUser ... func (*MockService) GetUser(ctx context.Context, itf []interface{}, str *struct{}) error { return nil } +// GetUser1 ... func (*MockService) GetUser1(ctx context.Context, itf []interface{}, str *struct{}) error { return nil } diff --git a/config/protocol_config.go b/config/protocol_config.go index 2803456dbcd44211fb6deef883beb7f5dbbf54ad..9495a7fd892354f2b7611a73760b0a2885794534 100644 --- a/config/protocol_config.go +++ b/config/protocol_config.go @@ -24,12 +24,14 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ProtocolConfig ... type ProtocolConfig struct { Name string `required:"true" yaml:"name" json:"name,omitempty" property:"name"` Ip string `required:"true" yaml:"ip" json:"ip,omitempty" property:"ip"` Port string `required:"true" yaml:"port" json:"port,omitempty" property:"port"` } +// Prefix ... func (c *ProtocolConfig) Prefix() string { return constant.ProtocolConfigPrefix } diff --git a/config/provider_config.go b/config/provider_config.go index 0fed44c81b124cd40825695981a5394c273203fa..537608d4b51e5a24b269f9baa295764f7c6330ed 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -37,6 +37,7 @@ import ( // providerConfig ///////////////////////// +// ProviderConfig ... type ProviderConfig struct { BaseConfig `yaml:",inline"` Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` @@ -52,6 +53,7 @@ type ProviderConfig struct { ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` } +// UnmarshalYAML ... func (c *ProviderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err @@ -63,13 +65,17 @@ func (c *ProviderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error return nil } +// Prefix ... func (*ProviderConfig) Prefix() string { return constant.ProviderConfigPrefix } +// SetProviderConfig ... func SetProviderConfig(p ProviderConfig) { providerConfig = &p } + +// GetProviderConfig ... func GetProviderConfig() ProviderConfig { if providerConfig == nil { logger.Warnf("providerConfig is nil!") @@ -78,6 +84,7 @@ func GetProviderConfig() ProviderConfig { return *providerConfig } +// ProviderInit ... func ProviderInit(confProFile string) error { if len(confProFile) == 0 { return perrors.Errorf("application configure(provider) file name is nil") diff --git a/config/reference_config.go b/config/reference_config.go index 0009dc87c9f2a6deb16b5557e5ecb177016b1b1e..e3fe856b1228d445c0f53c56991f5eb5a6fb2d34 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -39,6 +39,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// ReferenceConfig ... type ReferenceConfig struct { context context.Context pxy *proxy.Proxy @@ -64,6 +65,7 @@ type ReferenceConfig struct { RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } +// Prefix ... func (c *ReferenceConfig) Prefix() string { return constant.ReferenceConfigPrefix + c.InterfaceName + "." } @@ -73,6 +75,7 @@ func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig { return &ReferenceConfig{id: id, context: ctx} } +// UnmarshalYAML ... func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { type rf ReferenceConfig @@ -89,6 +92,7 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro return nil } +// Refer ... func (refconfig *ReferenceConfig) Refer(impl interface{}) { url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), @@ -160,6 +164,7 @@ func (refconfig *ReferenceConfig) Implement(v common.RPCService) { refconfig.pxy.Implement(v) } +// GetRPCService ... func (refconfig *ReferenceConfig) GetRPCService() common.RPCService { return refconfig.pxy.Get() } @@ -214,6 +219,8 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { return urlMap } + +// GenericLoad ... func (refconfig *ReferenceConfig) GenericLoad(id string) { genericService := NewGenericService(refconfig.id) SetConsumerService(genericService) diff --git a/config/registry_config.go b/config/registry_config.go index 9ffa41eb5b5b3b5ae4dc9f77812c0aef5ce9835f..b387f6fdb8755536b20e6e1130d0007938102a08 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -34,6 +34,7 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +// RegistryConfig ... type RegistryConfig struct { Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig @@ -46,6 +47,7 @@ type RegistryConfig struct { Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } +// UnmarshalYAML ... func (c *RegistryConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err @@ -57,6 +59,7 @@ func (c *RegistryConfig) UnmarshalYAML(unmarshal func(interface{}) error) error return nil } +// Prefix ... func (*RegistryConfig) Prefix() string { return constant.RegistryConfigPrefix + "|" + constant.SingleRegistryConfigPrefix } diff --git a/config/service.go b/config/service.go index f1b51790ca13df0534882837397181e45e56ffa3..b2ff15c7895f357b501cb2d066de0a729e4f73a0 100644 --- a/config/service.go +++ b/config/service.go @@ -36,14 +36,17 @@ func SetProviderService(service common.RPCService) { proServices[service.Reference()] = service } +// GetConsumerService ... func GetConsumerService(name string) common.RPCService { return conServices[name] } +// GetProviderService ... func GetProviderService(name string) common.RPCService { return proServices[name] } +// GetCallback ... func GetCallback(name string) func(response common.CallbackResponse) { service := GetConsumerService(name) if sv, ok := service.(common.AsyncCallbackService); ok { diff --git a/config/service_config.go b/config/service_config.go index a2ed13966072f319693671137ba991978a0dcdb9..2e947bb6c322e52c643569c5bc06ca10b2851ec5 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -42,6 +42,7 @@ import ( "github.com/apache/dubbo-go/protocol/protocolwrapper" ) +// ServiceConfig ... type ServiceConfig struct { context context.Context id string @@ -74,10 +75,12 @@ type ServiceConfig struct { cacheMutex sync.Mutex } +// Prefix ... func (c *ServiceConfig) Prefix() string { return constant.ServiceConfigPrefix + c.InterfaceName + "." } +// UnmarshalYAML ... func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := defaults.Set(c); err != nil { return err @@ -101,6 +104,7 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { } +// Export ... func (srvconfig *ServiceConfig) Export() error { // TODO: config center start here @@ -170,6 +174,7 @@ func (srvconfig *ServiceConfig) Export() error { } +// Implement ... func (srvconfig *ServiceConfig) Implement(s common.RPCService) { srvconfig.rpcService = s } diff --git a/config_center/apollo/listener.go b/config_center/apollo/listener.go index d81e1cbf34e405f7d2974e29558414029308b36b..1355be0b76b5c886e81861ef3f846cede5f9e0e7 100644 --- a/config_center/apollo/listener.go +++ b/config_center/apollo/listener.go @@ -41,6 +41,7 @@ func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { } } +// NewApolloListener ... func NewApolloListener() *apolloListener { return &apolloListener{ listeners: make(map[config_center.ConfigurationListener]struct{}, 0), diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go index 1419bcdd0ce10ec15d0c24c2439bb02747ce5391..e70e4f68075c51c33f1110ef44a7b703e36fb78d 100644 --- a/config_center/configuration_listener.go +++ b/config_center/configuration_listener.go @@ -25,10 +25,12 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// ConfigurationListener ... type ConfigurationListener interface { Process(*ConfigChangeEvent) } +// ConfigChangeEvent ... type ConfigChangeEvent struct { Key string Value interface{} diff --git a/config_center/configurator.go b/config_center/configurator.go index 3ba293ec60302b76becce357f49b2baa543f69cd..ffa9034e05c4c3d4cc254886e2ed19576f155dec 100644 --- a/config_center/configurator.go +++ b/config_center/configurator.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go/common" ) +// Configurator ... type Configurator interface { GetUrl() *common.URL Configure(url *common.URL) diff --git a/config_center/configurator/mock.go b/config_center/configurator/mock.go index 1f03d107c8f588cfd4c23c9086bb0fbe42e05fff..cf418924893bdf6407d3bc082cb18b2bd3f78022 100644 --- a/config_center/configurator/mock.go +++ b/config_center/configurator/mock.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-go/config_center" ) +// NewMockConfigurator ... func NewMockConfigurator(url *common.URL) config_center.Configurator { return &mockConfigurator{configuratorUrl: url} } diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 0546d39732deaa83ace948275a0d4448b1b24cf8..4e815234bb76c89860cb9bbbb213b79bd23e9ee3 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -31,6 +31,7 @@ import ( const DEFAULT_GROUP = "dubbo" const DEFAULT_CONFIG_TIMEOUT = "10s" +// DynamicConfiguration ... type DynamicConfiguration interface { Parser() parser.ConfigurationParser SetParser(parser.ConfigurationParser) @@ -46,19 +47,23 @@ type DynamicConfiguration interface { GetInternalProperty(string, ...Option) (string, error) } +// Options ... type Options struct { Group string Timeout time.Duration } +// Option ... type Option func(*Options) +// WithGroup ... func WithGroup(group string) Option { return func(opt *Options) { opt.Group = group } } +// WithTimeout ... func WithTimeout(time time.Duration) Option { return func(opt *Options) { opt.Timeout = time diff --git a/config_center/dynamic_configuration_factory.go b/config_center/dynamic_configuration_factory.go index 0720896fb615f8639c20a46d2078c3dfcd112c32..9f9b13227f6623a02b0261c46d8d1e43624005f8 100644 --- a/config_center/dynamic_configuration_factory.go +++ b/config_center/dynamic_configuration_factory.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go/common" ) +// DynamicConfigurationFactory ... type DynamicConfigurationFactory interface { GetDynamicConfiguration(*common.URL) (DynamicConfiguration, error) } diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 79c7c171945400a52563e0b66ef29c2896db0b99..f0a5dfec1ab6af64b2227acf62c2fed788b5f5bf 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -32,6 +32,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// MockDynamicConfigurationFactory ... type MockDynamicConfigurationFactory struct { Content string } @@ -41,6 +42,7 @@ var ( dynamicConfiguration *MockDynamicConfiguration ) +// GetDynamicConfiguration ... func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) { var err error once.Do(func() { @@ -79,19 +81,23 @@ func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(url *common.UR } +// MockDynamicConfiguration ... type MockDynamicConfiguration struct { parser parser.ConfigurationParser content string listener map[string]ConfigurationListener } +// AddListener ... func (c *MockDynamicConfiguration) AddListener(key string, listener ConfigurationListener, opions ...Option) { c.listener[key] = listener } +// RemoveListener ... func (c *MockDynamicConfiguration) RemoveListener(key string, listener ConfigurationListener, opions ...Option) { } +// GetConfig ... func (c *MockDynamicConfiguration) GetConfig(key string, opts ...Option) (string, error) { return c.content, nil @@ -102,12 +108,17 @@ func (c *MockDynamicConfiguration) GetConfigs(key string, opts ...Option) (strin return c.GetConfig(key, opts...) } +// Parser ... func (c *MockDynamicConfiguration) Parser() parser.ConfigurationParser { return c.parser } + +// SetParser ... func (c *MockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } + +// GetProperties ... func (c *MockDynamicConfiguration) GetProperties(key string, opts ...Option) (string, error) { return c.content, nil } @@ -117,10 +128,12 @@ func (c *MockDynamicConfiguration) GetInternalProperty(key string, opts ...Optio return c.GetProperties(key, opts...) } +// GetRule ... func (c *MockDynamicConfiguration) GetRule(key string, opts ...Option) (string, error) { return c.GetProperties(key, opts...) } +// MockServiceConfigEvent ... func (c *MockDynamicConfiguration) MockServiceConfigEvent() { config := &parser.ConfiguratorConfig{ ConfigVersion: "2.7.1", @@ -142,6 +155,7 @@ func (c *MockDynamicConfiguration) MockServiceConfigEvent() { c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd}) } +// MockApplicationConfigEvent ... func (c *MockDynamicConfiguration) MockApplicationConfigEvent() { config := &parser.ConfiguratorConfig{ ConfigVersion: "2.7.1", diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go index 85033ce97f3e9038d57b6156e6dc68139363e8c3..b0c0db34bc8a8c4dfb68c3493e7ed772bb6f54d1 100644 --- a/config_center/parser/configuration_parser.go +++ b/config_center/parser/configuration_parser.go @@ -40,6 +40,7 @@ const ( GeneralType = "general" ) +// ConfigurationParser ... type ConfigurationParser interface { Parse(string) (map[string]string, error) ParseToUrls(content string) ([]*common.URL, error) @@ -48,6 +49,7 @@ type ConfigurationParser interface { //for support properties file in config center type DefaultConfigurationParser struct{} +// ConfiguratorConfig ... type ConfiguratorConfig struct { ConfigVersion string `yaml:"configVersion"` Scope string `yaml:"scope"` @@ -56,6 +58,7 @@ type ConfiguratorConfig struct { Configs []ConfigItem `yaml:"configs"` } +// ConfigItem ... type ConfigItem struct { Type string `yaml:"type"` Enabled bool `yaml:"enabled"` @@ -67,6 +70,7 @@ type ConfigItem struct { Side string `yaml:"side"` } +// Parse ... func (parser *DefaultConfigurationParser) Parse(content string) (map[string]string, error) { properties, err := properties.LoadString(content) if err != nil { @@ -76,6 +80,7 @@ func (parser *DefaultConfigurationParser) Parse(content string) (map[string]stri return properties.Map(), nil } +// ParseToUrls ... func (parser *DefaultConfigurationParser) ParseToUrls(content string) ([]*common.URL, error) { config := ConfiguratorConfig{} if err := yaml.Unmarshal([]byte(content), &config); err != nil { diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index 7128b6f5a39e243840a1076f9fc506d94c7ed2ed..122dfaf4f268a706151de6acdaa78bb46e59f8fb 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -27,25 +27,30 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// CacheListener ... type CacheListener struct { keyListeners sync.Map rootPath string } +// NewCacheListener ... func NewCacheListener(rootPath string) *CacheListener { return &CacheListener{rootPath: rootPath} } + +// AddListener ... func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure // make a map[your type]struct{} like set in java - listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: struct{}{}}) + listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}}) if loaded { listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{} l.keyListeners.Store(key, listeners) } } +// RemoveListener ... func (l *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) { listeners, loaded := l.keyListeners.Load(key) if loaded { @@ -53,6 +58,7 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config } } +// DataChange ... func (l *CacheListener) DataChange(event remoting.Event) bool { if event.Content == "" { //meanings new node diff --git a/filter/filter_impl/access_log_filter.go b/filter/filter_impl/access_log_filter.go index 468393ba5be0c0991b9ab218ebc440d699382c20..a07f479742a578038f1beeeb12c4950fe850ba32 100644 --- a/filter/filter_impl/access_log_filter.go +++ b/filter/filter_impl/access_log_filter.go @@ -67,6 +67,7 @@ type AccessLogFilter struct { logChan chan AccessLogData } +// Invoke ... func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { accessLog := invoker.GetUrl().GetParam(constant.ACCESS_LOG_KEY, "") if len(accessLog) > 0 { @@ -120,6 +121,7 @@ func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocati return dataMap } +// OnResponse ... func (ef *AccessLogFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } @@ -173,6 +175,7 @@ func isDefault(accessLog string) bool { return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog) } +// GetAccessLogFilter ... func GetAccessLogFilter() filter.Filter { accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)} go func() { @@ -183,6 +186,7 @@ func GetAccessLogFilter() filter.Filter { return accessLogFilter } +// AccessLogData ... type AccessLogData struct { accessLog string data map[string]string diff --git a/filter/filter_impl/active_filter.go b/filter/filter_impl/active_filter.go index cc46fc9d8624f6e756ccfe5c491c3177450e10b5..23f2c8e25609dff89392107251715fe6f5175f09 100644 --- a/filter/filter_impl/active_filter.go +++ b/filter/filter_impl/active_filter.go @@ -39,19 +39,20 @@ func init() { extension.SetFilter(active, GetActiveFilter) } +// ActiveFilter ... type ActiveFilter struct { } +// Invoke ... func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments())) - invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10)) protocol.BeginCount(invoker.GetUrl(), invocation.MethodName()) return invoker.Invoke(ctx, invocation) } +// OnResponse ... func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64) if err != nil { result.SetError(err) @@ -63,6 +64,7 @@ func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, return result } +// GetActiveFilter ... func GetActiveFilter() filter.Filter { return &ActiveFilter{} } diff --git a/filter/filter_impl/echo_filter.go b/filter/filter_impl/echo_filter.go index f6bdd4a4e8398c65303d426a48f104e12314ded3..4ccecc2dbc68383071b789692babadac2c80c7fd 100644 --- a/filter/filter_impl/echo_filter.go +++ b/filter/filter_impl/echo_filter.go @@ -41,6 +41,7 @@ func init() { // Echo func(ctx context.Context, arg interface{}, rsp *Xxx) error type EchoFilter struct{} +// Invoke ... func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking echo filter.") logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) @@ -54,10 +55,12 @@ func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invo return invoker.Invoke(ctx, invocation) } +// OnResponse ... func (ef *EchoFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } +// GetFilter ... func GetFilter() filter.Filter { return &EchoFilter{} } diff --git a/filter/filter_impl/execute_limit_filter.go b/filter/filter_impl/execute_limit_filter.go index f9ff87751b21979f9d794db88deb9f4d8527f0d1..f9dab06ebe7d7e02be5b6ae23587495d2df7d95b 100644 --- a/filter/filter_impl/execute_limit_filter.go +++ b/filter/filter_impl/execute_limit_filter.go @@ -72,10 +72,12 @@ type ExecuteLimitFilter struct { executeState *concurrent.Map } +// ExecuteState ... type ExecuteState struct { concurrentCount int64 } +// Invoke ... func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { methodConfigPrefix := "methods." + invocation.MethodName() + "." url := invoker.GetUrl() @@ -117,6 +119,7 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok return invoker.Invoke(ctx, invocation) } +// OnResponse ... func (ef *ExecuteLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } @@ -132,6 +135,7 @@ func (state *ExecuteState) decrease() { var executeLimitOnce sync.Once var executeLimitFilter *ExecuteLimitFilter +// GetExecuteLimitFilter ... func GetExecuteLimitFilter() filter.Filter { executeLimitOnce.Do(func() { executeLimitFilter = &ExecuteLimitFilter{ diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go index 9d3804d9434ce2ab108dfa8be4607a6425f2d29c..fec1c3aa51451d5cb18e037b14ec778393072a93 100644 --- a/filter/filter_impl/generic_filter.go +++ b/filter/filter_impl/generic_filter.go @@ -43,8 +43,10 @@ func init() { // when do a generic invoke, struct need to be map +// GenericFilter ... type GenericFilter struct{} +// Invoke ... func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { oldArguments := invocation.Arguments() @@ -67,10 +69,12 @@ func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i return invoker.Invoke(ctx, invocation) } +// OnResponse ... func (ef *GenericFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } +// GetGenericFilter ... func GetGenericFilter() filter.Filter { return &GenericFilter{} } diff --git a/filter/filter_impl/generic_service_filter.go b/filter/filter_impl/generic_service_filter.go index 6beebf4566b657d4d4ea0d2c737cdf3344bdcbe4..c577ae2077fbd042def0a7209459ec59c62b684f 100644 --- a/filter/filter_impl/generic_service_filter.go +++ b/filter/filter_impl/generic_service_filter.go @@ -48,8 +48,10 @@ func init() { extension.SetFilter(GENERIC_SERVICE, GetGenericServiceFilter) } +// GenericServiceFilter ... type GenericServiceFilter struct{} +// Invoke ... func (ef *GenericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { logger.Infof("invoking generic service filter.") logger.Debugf("generic service filter methodName:%v,args:%v", invocation.MethodName(), len(invocation.Arguments())) @@ -111,6 +113,7 @@ func (ef *GenericServiceFilter) Invoke(ctx context.Context, invoker protocol.Inv return invoker.Invoke(ctx, newInvocation) } +// OnResponse ... func (ef *GenericServiceFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 && result.Result() != nil { v := reflect.ValueOf(result.Result()) @@ -122,6 +125,7 @@ func (ef *GenericServiceFilter) OnResponse(ctx context.Context, result protocol. return result } +// GetGenericServiceFilter ... func GetGenericServiceFilter() filter.Filter { return &GenericServiceFilter{} } diff --git a/filter/filter_impl/hystrix_filter.go b/filter/filter_impl/hystrix_filter.go index 0f40d815ffbd4c199ad30cad44eb1a94e93cf916..c2834480e72b81d1c8d5d8973db06e9487692118 100644 --- a/filter/filter_impl/hystrix_filter.go +++ b/filter/filter_impl/hystrix_filter.go @@ -58,6 +58,7 @@ func init() { extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider) } +// HystrixFilterError ... type HystrixFilterError struct { err error failByHystrix bool @@ -67,9 +68,12 @@ func (hfError *HystrixFilterError) Error() string { return hfError.err.Error() } +// FailByHystrix ... func (hfError *HystrixFilterError) FailByHystrix() bool { return hfError.failByHystrix } + +// NewHystrixFilterError ... func NewHystrixFilterError(err error, failByHystrix bool) error { return &HystrixFilterError{ err: err, @@ -77,14 +81,15 @@ func NewHystrixFilterError(err error, failByHystrix bool) error { } } +// HystrixFilter ... type HystrixFilter struct { COrP bool //true for consumer res map[string][]*regexp.Regexp ifNewMap sync.Map } +// Invoke ... func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName()) // Do the configuration if the circuit breaker is created for the first time @@ -145,9 +150,12 @@ func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i return result } +// OnResponse ... func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } + +// GetHystrixFilterConsumer ... func GetHystrixFilterConsumer() filter.Filter { //When first called, load the config in consumerConfigOnce.Do(func() { @@ -158,6 +166,7 @@ func GetHystrixFilterConsumer() filter.Filter { return &HystrixFilter{COrP: true} } +// GetHystrixFilterProvider ... func GetHystrixFilterProvider() filter.Filter { providerConfigOnce.Do(func() { if err := initHystrixConfigProvider(); err != nil { @@ -216,6 +225,7 @@ func initHystrixConfigConsumer() error { } return nil } + func initHystrixConfigProvider() error { if config.GetProviderConfig().FilterConf == nil { return perrors.Errorf("no config for hystrix") @@ -242,6 +252,7 @@ func initHystrixConfigProvider() error { // return initHystrixConfig() //} +// CommandConfigWithError ... type CommandConfigWithError struct { Timeout int `yaml:"timeout"` MaxConcurrentRequests int `yaml:"max_concurrent_requests"` @@ -259,11 +270,14 @@ type CommandConfigWithError struct { //- ErrorPercentThreshold: it causes circuits to open once the rolling measure of errors exceeds this percent of requests //See hystrix doc +// HystrixFilterConfig ... type HystrixFilterConfig struct { Configs map[string]*CommandConfigWithError Default string Services map[string]ServiceHystrixConfig } + +// ServiceHystrixConfig ... type ServiceHystrixConfig struct { ServiceConfig string `yaml:"service_config"` Methods map[string]string diff --git a/filter/filter_impl/token_filter.go b/filter/filter_impl/token_filter.go index 702ee33d4d2e9756ab3b4dbb4bfc9b7c42907080..2340e7271fb6ae0dac582341c3537931b7fbc4aa 100644 --- a/filter/filter_impl/token_filter.go +++ b/filter/filter_impl/token_filter.go @@ -41,8 +41,10 @@ func init() { extension.SetFilter(TOKEN, GetTokenFilter) } +// TokenFilter ... type TokenFilter struct{} +// Invoke ... func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { invokerTkn := invoker.GetUrl().GetParam(constant.TOKEN_KEY, "") if len(invokerTkn) > 0 { @@ -58,10 +60,12 @@ func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, inv return invoker.Invoke(ctx, invocation) } +// OnResponse ... func (tf *TokenFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } +// GetTokenFilter ... func GetTokenFilter() filter.Filter { return &TokenFilter{} } diff --git a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go b/filter/filter_impl/tps/tps_limit_fix_window_strategy.go index 6ea5dc10333739848a96881b6dcf7e4bb54ccbe9..a985724028835f236e8db6c7dd9220b1c5952fbe 100644 --- a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_fix_window_strategy.go @@ -63,6 +63,7 @@ type FixedWindowTpsLimitStrategyImpl struct { timestamp int64 } +// IsAllowable ... func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { current := time.Now().UnixNano() diff --git a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go b/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go index 40ea2d14be91a948752455ad8e1a7e611354017a..c647380641676b8992df332a81406c331d1d5bce 100644 --- a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go @@ -53,6 +53,7 @@ type SlidingWindowTpsLimitStrategyImpl struct { queue *list.List } +// IsAllowable ... func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { impl.mutex.Lock() defer impl.mutex.Unlock() diff --git a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go b/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go index faec9b6ec1466e0c2c7d5df9ca0dd82a965494ec..ee0558dd29c641ec4f4dccd843e82c9486d5ecd8 100644 --- a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go +++ b/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go @@ -52,6 +52,7 @@ type ThreadSafeFixedWindowTpsLimitStrategyImpl struct { fixedWindow *FixedWindowTpsLimitStrategyImpl } +// IsAllowable ... func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool { impl.mutex.Lock() defer impl.mutex.Unlock() diff --git a/filter/filter_impl/tps/tps_limiter_method_service.go b/filter/filter_impl/tps/tps_limiter_method_service.go index ac4498a33d195128ad89828f9696b90cbd2db082..49f785f354031b1d130f7316d540a9b4f41f1a05 100644 --- a/filter/filter_impl/tps/tps_limiter_method_service.go +++ b/filter/filter_impl/tps/tps_limiter_method_service.go @@ -111,6 +111,7 @@ type MethodServiceTpsLimiterImpl struct { tpsState *concurrent.Map } +// IsAllowable ... func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool { methodConfigPrefix := "methods." + invocation.MethodName() + "." @@ -178,6 +179,7 @@ func getLimitConfig(methodLevelConfig string, var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl var methodServiceTpsLimiterOnce sync.Once +// GetMethodServiceTpsLimiter ... func GetMethodServiceTpsLimiter() filter.TpsLimiter { methodServiceTpsLimiterOnce.Do(func() { methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ diff --git a/filter/filter_impl/tps_limit_filter.go b/filter/filter_impl/tps_limit_filter.go index 8852260e9e7b4b833728da97dc8f273d3e52dec7..52ac5d147904011bd5286c90bc565584f1869f33 100644 --- a/filter/filter_impl/tps_limit_filter.go +++ b/filter/filter_impl/tps_limit_filter.go @@ -54,6 +54,7 @@ func init() { type TpsLimitFilter struct { } +// Invoke ... func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { url := invoker.GetUrl() tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "") @@ -69,10 +70,12 @@ func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in return invoker.Invoke(ctx, invocation) } +// OnResponse ... func (t TpsLimitFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result } +// GetTpsLimitFilter ... func GetTpsLimitFilter() filter.Filter { return &TpsLimitFilter{} } diff --git a/filter/handler/rejected_execution_handler_only_log.go b/filter/handler/rejected_execution_handler_only_log.go index 83d85fd1e1a80462e454ef5a8bcd375f5a2b0bcb..18a22d3d0d6d94edb49d0166553463b6569a0e48 100644 --- a/filter/handler/rejected_execution_handler_only_log.go +++ b/filter/handler/rejected_execution_handler_only_log.go @@ -56,11 +56,13 @@ var onlyLogHandlerOnce sync.Once type OnlyLogRejectedExecutionHandler struct { } +// RejectedExecution ... func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result { logger.Errorf("The invocation was rejected. url: %s", url.String()) return &protocol.RPCResult{} } +// GetOnlyLogRejectedExecutionHandler ... func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler { onlyLogHandlerOnce.Do(func() { onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} diff --git a/filter/tps_limit_strategy.go b/filter/tps_limit_strategy.go index 1051c3d96d37619e0e507cc845f144a45a9bb421..ad7133ca69468b348e76858c493b434114ce8c11 100644 --- a/filter/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -35,6 +35,7 @@ type TpsLimitStrategy interface { IsAllowable() bool } +// TpsLimitStrategyCreator ... type TpsLimitStrategyCreator interface { Create(rate int, interval int) TpsLimitStrategy } diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 3e2a243103b888d8b94c2e50fe00daabb3d5a032..d723f68387d55c2219e2b69d78ab147d8c51fd3c 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -85,6 +85,7 @@ func init() { setClientGrpool() } +// SetClientConf ... func SetClientConf(c ClientConfig) { clientConf = &c err := clientConf.CheckValidity() @@ -95,6 +96,7 @@ func SetClientConf(c ClientConfig) { setClientGrpool() } +// GetClientConf ... func GetClientConf() ClientConfig { return *clientConf } @@ -106,6 +108,7 @@ func setClientGrpool() { } } +// Options ... type Options struct { // connect timeout ConnectTimeout time.Duration @@ -123,6 +126,7 @@ type AsyncCallbackResponse struct { Reply interface{} } +// Client ... type Client struct { opts Options conf ClientConfig @@ -132,6 +136,7 @@ type Client struct { pendingResponses *sync.Map } +// NewClient ... func NewClient(opt Options) *Client { switch { @@ -152,6 +157,7 @@ func NewClient(opt Options) *Client { return c } +// Request ... type Request struct { addr string svcUrl common.URL @@ -160,6 +166,7 @@ type Request struct { atta map[string]string } +// NewRequest ... func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request { return &Request{ addr: addr, @@ -170,11 +177,13 @@ func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, } } +// Response ... type Response struct { reply interface{} atta map[string]string } +// NewResponse ... func NewResponse(reply interface{}, atta map[string]string) *Response { return &Response{ reply: reply, @@ -199,6 +208,7 @@ func (c *Client) Call(request *Request, response *Response) error { return perrors.WithStack(c.call(ct, request, response, nil)) } +// AsyncCall ... func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error { return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) @@ -267,6 +277,7 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac return perrors.WithStack(err) } +// Close ... func (c *Client) Close() { if c.pool != nil { c.pool.close() diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 6b41d5e7d76d31ea23f08b77c841d0f87986bef7..64d9477ce543d4151812f3c40411b44cce0d1203 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -50,8 +50,10 @@ const ( // dubbo package //////////////////////////////////////////// +// SequenceType ... type SequenceType int64 +// DubboPackage ... type DubboPackage struct { Header hessian.DubboHeader Service hessian.Service @@ -63,6 +65,7 @@ func (p DubboPackage) String() string { return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) } +// Marshal ... func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { codec := hessian.NewHessianCodec(nil) @@ -74,6 +77,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { return bytes.NewBuffer(pkg), nil } +// Unmarshal ... func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, buf.Len())) @@ -111,6 +115,7 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { // PendingResponse //////////////////////////////////////////// +// PendingResponse ... type PendingResponse struct { seq uint64 err error @@ -121,6 +126,7 @@ type PendingResponse struct { done chan struct{} } +// NewPendingResponse ... func NewPendingResponse() *PendingResponse { return &PendingResponse{ start: time.Now(), @@ -129,6 +135,7 @@ func NewPendingResponse() *PendingResponse { } } +// GetCallResponse ... func (r PendingResponse) GetCallResponse() common.CallbackResponse { return AsyncCallbackResponse{ Cause: r.err, diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index 2c70f9ffbd67918eef09d5439165f30aeab0ab56..fde3904079d5708dfe735dedc1c589776227a825 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -27,6 +27,7 @@ import ( ) type ( + // GettySessionParam ... GettySessionParam struct { CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` @@ -91,6 +92,7 @@ type ( } ) +// GetDefaultClientConfig ... func GetDefaultClientConfig() ClientConfig { return ClientConfig{ ReconnectInterval: 0, @@ -118,6 +120,7 @@ func GetDefaultClientConfig() ClientConfig { }} } +// GetDefaultServerConfig ... func GetDefaultServerConfig() ServerConfig { return ServerConfig{ SessionTimeout: "180s", @@ -142,6 +145,7 @@ func GetDefaultServerConfig() ServerConfig { } } +// CheckValidity ... func (c *GettySessionParam) CheckValidity() error { var err error @@ -164,6 +168,7 @@ func (c *GettySessionParam) CheckValidity() error { return nil } +// CheckValidity ... func (c *ClientConfig) CheckValidity() error { var err error @@ -185,6 +190,7 @@ func (c *ClientConfig) CheckValidity() error { return perrors.WithStack(c.GettySessionParam.CheckValidity()) } +// CheckValidity ... func (c *ServerConfig) CheckValidity() error { var err error diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go index cb06b6b69c9d0873342af5ea49fae054f029608c..f4cd0cc1234f71bdcf6ce746f01ff3618d820fc5 100644 --- a/protocol/dubbo/dubbo_exporter.go +++ b/protocol/dubbo/dubbo_exporter.go @@ -28,16 +28,19 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// DubboExporter ... type DubboExporter struct { protocol.BaseExporter } +// NewDubboExporter ... func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *DubboExporter { return &DubboExporter{ BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), } } +// Unexport ... func (de *DubboExporter) Unexport() { serviceId := de.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") de.BaseExporter.Unexport() diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 8100fbe3a6760e456a9eecedfd39e5230dd2c797..4131c4533742858db1827f0e6256d3080f38f118 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -35,18 +35,21 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) +// Err_No_Reply ... var Err_No_Reply = perrors.New("request need @response") var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} ) +// DubboInvoker ... type DubboInvoker struct { protocol.BaseInvoker client *Client quitOnce sync.Once } +// NewDubboInvoker ... func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), @@ -54,8 +57,8 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { } } +// Invoke ... func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - var ( err error result protocol.RPCResult @@ -97,6 +100,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati return &result } +// Destroy ... func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { di.BaseInvoker.Destroy() diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index eed22a29cde59b520f1b6ebf9b5baafabcac931f..9d47cae2f5c310c3245d522f796ee5014eb5298f 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -43,12 +43,14 @@ var ( dubboProtocol *DubboProtocol ) +// DubboProtocol ... type DubboProtocol struct { protocol.BaseProtocol serverMap map[string]*Server serverLock sync.Mutex } +// NewDubboProtocol ... func NewDubboProtocol() *DubboProtocol { return &DubboProtocol{ BaseProtocol: protocol.NewBaseProtocol(), @@ -56,6 +58,7 @@ func NewDubboProtocol() *DubboProtocol { } } +// Export ... func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := url.ServiceKey() @@ -68,6 +71,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } +// Refer ... func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { //default requestTimeout var requestTimeout = config.GetConsumerConfig().RequestTimeout @@ -86,6 +90,7 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { return invoker } +// Destroy ... func (dp *DubboProtocol) Destroy() { logger.Infof("DubboProtocol destroy.") @@ -117,6 +122,7 @@ func (dp *DubboProtocol) openServer(url common.URL) { } } +// GetProtocol ... func GetProtocol() protocol.Protocol { if dubboProtocol == nil { dubboProtocol = NewDubboProtocol() diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 1ed6e9cf57f3399ce2a7a8134bad9924d6799460..926afa9a103201b04639cf283f5c87c53eb12541 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -55,29 +55,35 @@ type rpcSession struct { // RpcClientHandler //////////////////////////////////////////// +// RpcClientHandler ... type RpcClientHandler struct { conn *gettyRPCClient } +// NewRpcClientHandler ... func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler { return &RpcClientHandler{conn: client} } +// OnOpen ... func (h *RpcClientHandler) OnOpen(session getty.Session) error { h.conn.addSession(session) return nil } +// OnError ... func (h *RpcClientHandler) OnError(session getty.Session, err error) { logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) h.conn.removeSession(session) } +// OnClose ... func (h *RpcClientHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) h.conn.removeSession(session) } +// OnMessage ... func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { p, ok := pkg.(*DubboPackage) if !ok { @@ -121,6 +127,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { } } +// OnCron ... func (h *RpcClientHandler) OnCron(session getty.Session) { rpcSession, err := h.conn.getClientRpcSession(session) if err != nil { @@ -142,6 +149,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { // RpcServerHandler //////////////////////////////////////////// +// RpcServerHandler ... type RpcServerHandler struct { maxSessionNum int sessionTimeout time.Duration @@ -149,6 +157,7 @@ type RpcServerHandler struct { rwlock sync.RWMutex } +// NewRpcServerHandler ... func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { return &RpcServerHandler{ maxSessionNum: maxSessionNum, @@ -157,6 +166,7 @@ func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcSe } } +// OnOpen ... func (h *RpcServerHandler) OnOpen(session getty.Session) error { var err error h.rwlock.RLock() @@ -175,6 +185,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error { return nil } +// OnError ... func (h *RpcServerHandler) OnError(session getty.Session, err error) { logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) h.rwlock.Lock() @@ -182,6 +193,7 @@ func (h *RpcServerHandler) OnError(session getty.Session, err error) { h.rwlock.Unlock() } +// OnClose ... func (h *RpcServerHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) h.rwlock.Lock() @@ -189,6 +201,7 @@ func (h *RpcServerHandler) OnClose(session getty.Session) { h.rwlock.Unlock() } +// OnMessage ... func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { h.rwlock.Lock() if _, ok := h.sessionMap[session]; ok { @@ -276,6 +289,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { reply(session, p, hessian.PackageResponse) } +// OnCron ... func (h *RpcServerHandler) OnCron(session getty.Session) { var ( flag bool diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index e9dff1cfc77fb34ba75e604334d9c7ab5cfa36d7..b5c4f509190dbdc85825ad424656240b234786df 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -38,10 +38,12 @@ import ( // RpcClientPackageHandler //////////////////////////////////////////// +// RpcClientPackageHandler ... type RpcClientPackageHandler struct { client *Client } +// NewRpcClientPackageHandler ... func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { return &RpcClientPackageHandler{client: client} } @@ -94,6 +96,7 @@ var ( rpcServerPkgHandler = &RpcServerPackageHandler{} ) +// RpcServerPackageHandler ... type RpcServerPackageHandler struct{} func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 648c9f8aa8d24a321bfda85279a6470c745dbfa1..bd2b37b7a9f055745e183524d19a442af03360f4 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -74,6 +74,7 @@ func init() { SetServerGrpool() } +// SetServerConfig ... func SetServerConfig(s ServerConfig) { srvConf = &s err := srvConf.CheckValidity() @@ -84,10 +85,12 @@ func SetServerConfig(s ServerConfig) { SetServerGrpool() } +// GetServerConfig ... func GetServerConfig() ServerConfig { return *srvConf } +// SetServerGrpool ... func SetServerGrpool() { if srvConf.GrPoolSize > 1 { srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen), @@ -95,12 +98,14 @@ func SetServerGrpool() { } } +// Server ... type Server struct { conf ServerConfig tcpServer getty.Server rpcHandler *RpcServerHandler } +// NewServer ... func NewServer() *Server { s := &Server{ @@ -151,6 +156,7 @@ func (s *Server) newSession(session getty.Session) error { return nil } +// Start ... func (s *Server) Start(url common.URL) { var ( addr string @@ -167,6 +173,7 @@ func (s *Server) Start(url common.URL) { } +// Stop ... func (s *Server) Stop() { s.tcpServer.Close() } diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go index 126f3774e65c97049b4239e0e31ff25e3fe00d4e..d35a2c770cd8b9bda805715889791ccf53c562db 100644 --- a/protocol/grpc/client.go +++ b/protocol/grpc/client.go @@ -31,11 +31,13 @@ import ( "github.com/apache/dubbo-go/config" ) +// Client ... type Client struct { *grpc.ClientConn invoker reflect.Value } +// NewClient ... func NewClient(url common.URL) *Client { conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go index 8446d319f1caf6463b29b27d270dfb9a94d437f2..3c38ef974ca22a582ce83102718d01a8edd4258f 100644 --- a/protocol/grpc/grpc_exporter.go +++ b/protocol/grpc/grpc_exporter.go @@ -28,16 +28,19 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// GrpcExporter ... type GrpcExporter struct { *protocol.BaseExporter } +// NewGrpcExporter ... func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *GrpcExporter { return &GrpcExporter{ BaseExporter: protocol.NewBaseExporter(key, invoker, exporterMap), } } +// Unexport ... func (gg *GrpcExporter) Unexport() { serviceId := gg.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") gg.BaseExporter.Unexport() diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go index 88149397e79aa435a6a9d41911ae0e603754534e..26bc86f3aa46c8048b16284bd61cb5d9fb4664f9 100644 --- a/protocol/grpc/grpc_invoker.go +++ b/protocol/grpc/grpc_invoker.go @@ -35,14 +35,17 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// ErrNoReply ... var ErrNoReply = errors.New("request need @response") +// GrpcInvoker ... type GrpcInvoker struct { protocol.BaseInvoker quitOnce sync.Once client *Client } +// NewGrpcInvoker ... func NewGrpcInvoker(url common.URL, client *Client) *GrpcInvoker { return &GrpcInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), @@ -50,6 +53,7 @@ func NewGrpcInvoker(url common.URL, client *Client) *GrpcInvoker { } } +// Invoke ... func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.RPCResult @@ -78,14 +82,17 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio return &result } +// IsAvailable ... func (gi *GrpcInvoker) IsAvailable() bool { return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown } +// IsDestroyed ... func (gi *GrpcInvoker) IsDestroyed() bool { return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown } +// Destroy ... func (gi *GrpcInvoker) Destroy() { gi.quitOnce.Do(func() { gi.BaseInvoker.Destroy() diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go index cad75752ad5bbd66084379d37280cc04dbc72e2f..ae6fdf13016a32ab37024ccdbd4db8f2ebebc1c5 100644 --- a/protocol/grpc/grpc_protocol.go +++ b/protocol/grpc/grpc_protocol.go @@ -36,12 +36,14 @@ func init() { var grpcProtocol *GrpcProtocol +// GrpcProtocol ... type GrpcProtocol struct { protocol.BaseProtocol serverMap map[string]*Server serverLock sync.Mutex } +// NewGRPCProtocol ... func NewGRPCProtocol() *GrpcProtocol { return &GrpcProtocol{ BaseProtocol: protocol.NewBaseProtocol(), @@ -49,6 +51,7 @@ func NewGRPCProtocol() *GrpcProtocol { } } +// Export ... func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := url.ServiceKey() @@ -78,6 +81,7 @@ func (gp *GrpcProtocol) openServer(url common.URL) { } } +// Refer ... func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker { invoker := NewGrpcInvoker(url, NewClient(url)) gp.SetInvokers(invoker) @@ -85,6 +89,7 @@ func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker { return invoker } +// Destroy ... func (gp *GrpcProtocol) Destroy() { logger.Infof("GrpcProtocol destroy.") @@ -96,6 +101,7 @@ func (gp *GrpcProtocol) Destroy() { } } +// GetProtocol ... func GetProtocol() protocol.Protocol { if grpcProtocol == nil { grpcProtocol = NewGRPCProtocol() diff --git a/protocol/grpc/internal/client.go b/protocol/grpc/internal/client.go index bac3ce94885394140215b5466fbcb480da05812b..eb7dc1a456396afccb69293a410a650c200fc943 100644 --- a/protocol/grpc/internal/client.go +++ b/protocol/grpc/internal/client.go @@ -38,10 +38,12 @@ type GrpcGreeterImpl struct { SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error } +// Reference ... func (u *GrpcGreeterImpl) Reference() string { return "GrpcGreeterImpl" } +// GetDubboStub ... func (u *GrpcGreeterImpl) GetDubboStub(cc *grpc.ClientConn) GreeterClient { return NewGreeterClient(cc) } diff --git a/protocol/grpc/internal/server.go b/protocol/grpc/internal/server.go index 6491a5c2182b4276c4bd5b0b4a1377ef1684fd55..a0759f757dc44153e7f09b726db5e66176796c96 100644 --- a/protocol/grpc/internal/server.go +++ b/protocol/grpc/internal/server.go @@ -42,6 +42,7 @@ func (s *server) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, e return &HelloReply{Message: "Hello " + in.GetName()}, nil } +// InitGrpcServer ... func InitGrpcServer() { port := ":30000" @@ -56,6 +57,7 @@ func InitGrpcServer() { } } +// ShutdownGrpcServer ... func ShutdownGrpcServer() { if s == nil { return diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go index 0777c09bb46ed370553187048adfde1b219b1eb9..19b9db4ac743ceefcf035d399c0bbcdd99f1fa80 100644 --- a/protocol/grpc/server.go +++ b/protocol/grpc/server.go @@ -35,20 +35,24 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// Server ... type Server struct { grpcServer *grpc.Server } +// NewServer ... func NewServer() *Server { return &Server{} } +// DubboGrpcService ... type DubboGrpcService interface { SetProxyImpl(impl protocol.Invoker) GetProxyImpl() protocol.Invoker ServiceDesc() *grpc.ServiceDesc } +// Start ... func (s *Server) Start(url common.URL) { var ( addr string @@ -96,6 +100,7 @@ func (s *Server) Start(url common.URL) { }() } +// Stop ... func (s *Server) Stop() { s.grpcServer.Stop() } diff --git a/protocol/invocation.go b/protocol/invocation.go index b0ccab39e89c600dc8694cba989a905d9de5e48c..f32f2c3449ac063ecb89952bd4653312a07a3df4 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -21,6 +21,7 @@ import ( "reflect" ) +// Invocation ... type Invocation interface { MethodName() string ParameterTypes() []reflect.Type diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 960e51d61747d0ecf26bba45f9c8549997d759ff..0f42e96d5453229591a47fbc0a3c8f794312fa4a 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -42,6 +42,7 @@ type RPCInvocation struct { lock sync.RWMutex } +// NewRPCInvocation ... func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation { return &RPCInvocation{ methodName: methodName, @@ -50,6 +51,7 @@ func NewRPCInvocation(methodName string, arguments []interface{}, attachments ma } } +// NewRPCInvocationWithOptions ... func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation { invo := &RPCInvocation{} for _, opt := range opts { @@ -58,34 +60,42 @@ func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation { return invo } +// MethodName ... func (r *RPCInvocation) MethodName() string { return r.methodName } +// ParameterTypes ... func (r *RPCInvocation) ParameterTypes() []reflect.Type { return r.parameterTypes } +// ParameterValues ... func (r *RPCInvocation) ParameterValues() []reflect.Value { return r.parameterValues } +// Arguments ... func (r *RPCInvocation) Arguments() []interface{} { return r.arguments } +// Reply ... func (r *RPCInvocation) Reply() interface{} { return r.reply } +// SetReply ... func (r *RPCInvocation) SetReply(reply interface{}) { r.reply = reply } +// Attachments ... func (r *RPCInvocation) Attachments() map[string]string { return r.attachments } +// AttachmentsByKey ... func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string { r.lock.RLock() defer r.lock.RUnlock() @@ -99,6 +109,7 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string return defaultValue } +// SetAttachments ... func (r *RPCInvocation) SetAttachments(key string, value string) { r.lock.Lock() defer r.lock.Unlock() @@ -108,18 +119,22 @@ func (r *RPCInvocation) SetAttachments(key string, value string) { r.attachments[key] = value } +// Invoker ... func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } +// SetInvoker ... func (r *RPCInvocation) SetInvoker() protocol.Invoker { return r.invoker } +// CallBack ... func (r *RPCInvocation) CallBack() interface{} { return r.callBack } +// SetCallBack ... func (r *RPCInvocation) SetCallBack(c interface{}) { r.callBack = c } @@ -130,48 +145,56 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { type option func(invo *RPCInvocation) +// WithMethodName ... func WithMethodName(methodName string) option { return func(invo *RPCInvocation) { invo.methodName = methodName } } +// WithParameterTypes ... func WithParameterTypes(parameterTypes []reflect.Type) option { return func(invo *RPCInvocation) { invo.parameterTypes = parameterTypes } } +// WithParameterValues ... func WithParameterValues(parameterValues []reflect.Value) option { return func(invo *RPCInvocation) { invo.parameterValues = parameterValues } } +// WithArguments ... func WithArguments(arguments []interface{}) option { return func(invo *RPCInvocation) { invo.arguments = arguments } } +// WithReply ... func WithReply(reply interface{}) option { return func(invo *RPCInvocation) { invo.reply = reply } } +// WithCallBack ... func WithCallBack(callBack interface{}) option { return func(invo *RPCInvocation) { invo.callBack = callBack } } +// WithAttachments ... func WithAttachments(attachments map[string]string) option { return func(invo *RPCInvocation) { invo.attachments = attachments } } +// WithInvoker ... func WithInvoker(invoker protocol.Invoker) option { return func(invo *RPCInvocation) { invo.invoker = invoker diff --git a/protocol/invoker.go b/protocol/invoker.go index a1cf6264ae2b9f631b1bb12f88e8378ad5857919..6805f3fd034ac553e701b7dcbc4e23d93adb1c63 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -36,12 +36,14 @@ type Invoker interface { // base invoker ///////////////////////////// +// BaseInvoker ... type BaseInvoker struct { url common.URL available bool destroyed bool } +// NewBaseInvoker ... func NewBaseInvoker(url common.URL) *BaseInvoker { return &BaseInvoker{ url: url, @@ -50,22 +52,27 @@ func NewBaseInvoker(url common.URL) *BaseInvoker { } } +// GetUrl ... func (bi *BaseInvoker) GetUrl() common.URL { return bi.url } +// IsAvailable ... func (bi *BaseInvoker) IsAvailable() bool { return bi.available } +// IsDestroyed ... func (bi *BaseInvoker) IsDestroyed() bool { return bi.destroyed } +// Invoke ... func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result { return &RPCResult{} } +// Destroy ... func (bi *BaseInvoker) Destroy() { logger.Infof("Destroy invoker: %s", bi.GetUrl().String()) bi.destroyed = true diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 6dc5255d63a52b2fd2e05d1777b7c27f10ac9ce5..7ae825e1eb0f4846982bad3237bc0197024b073d 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -47,6 +47,7 @@ import ( // Request // //////////////////////////////////////////// +// Request ... type Request struct { ID int64 group string @@ -62,6 +63,7 @@ type Request struct { // HTTP Client // //////////////////////////////////////////// +// HTTPOptions ... type HTTPOptions struct { HandshakeTimeout time.Duration HTTPTimeout time.Duration @@ -72,11 +74,13 @@ var defaultHTTPOptions = HTTPOptions{ HTTPTimeout: 3 * time.Second, } +// HTTPClient ... type HTTPClient struct { ID int64 options HTTPOptions } +// NewHTTPClient ... func NewHTTPClient(opt *HTTPOptions) *HTTPClient { if opt == nil { opt = &defaultHTTPOptions @@ -96,6 +100,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } +// NewRequest ... func (c *HTTPClient) NewRequest(service common.URL, method string, args interface{}) *Request { return &Request{ @@ -109,6 +114,7 @@ func (c *HTTPClient) NewRequest(service common.URL, method string, args interfac } } +// Call ... func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, rsp interface{}) error { // header httpHeader := http.Header{} diff --git a/protocol/jsonrpc/json.go b/protocol/jsonrpc/json.go index 7ee454e8ad16d2ee96ed08e7e5f55b2209a81054..9f63e5000bce779cb2d1aa146905954b4d95bc83 100644 --- a/protocol/jsonrpc/json.go +++ b/protocol/jsonrpc/json.go @@ -35,6 +35,7 @@ const ( VERSION = "2.0" ) +// CodecData ... type CodecData struct { ID int64 Method string @@ -278,12 +279,14 @@ type serverResponse struct { Error interface{} `json:"error,omitempty"` } +// ServerCodec ... type ServerCodec struct { req serverRequest } var ( - null = json.RawMessage([]byte("null")) + null = json.RawMessage([]byte("null")) + // Version ... Version = "2.0" ) @@ -291,6 +294,7 @@ func newServerCodec() *ServerCodec { return &ServerCodec{} } +// ReadHeader ... func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error { if header["HttpMethod"] != "POST" { return &Error{Code: -32601, Message: "Method not found"} @@ -322,6 +326,7 @@ func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error { return nil } +// ReadBody ... func (c *ServerCodec) ReadBody(x interface{}) error { // If x!=nil and return error e: // - Write() will be called with e.Error() in r.Error @@ -355,6 +360,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error { return nil } +// NewError ... func NewError(code int, message string) *Error { return &Error{Code: code, Message: message} } diff --git a/protocol/jsonrpc/jsonrpc_exporter.go b/protocol/jsonrpc/jsonrpc_exporter.go index 6720330494a3b833d4a67d8b2408377ce62b1ddf..7f8fd491854f1ab25e63410a22ef5664db92f614 100644 --- a/protocol/jsonrpc/jsonrpc_exporter.go +++ b/protocol/jsonrpc/jsonrpc_exporter.go @@ -28,16 +28,19 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// JsonrpcExporter ... type JsonrpcExporter struct { protocol.BaseExporter } +// NewJsonrpcExporter ... func NewJsonrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *JsonrpcExporter { return &JsonrpcExporter{ BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), } } +// Unexport ... func (je *JsonrpcExporter) Unexport() { serviceId := je.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") je.BaseExporter.Unexport() diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index 566bbac84fbcf8432cef1d441973ad6cc7108eb0..b6e194ce0e93e84c164eccf8574e5eb20430f6e8 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -29,11 +29,13 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) +// JsonrpcInvoker ... type JsonrpcInvoker struct { protocol.BaseInvoker client *HTTPClient } +// NewJsonrpcInvoker ... func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker { return &JsonrpcInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), @@ -41,6 +43,7 @@ func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker { } } +// Invoke ... func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index a2e7afe69a138e0cd6dbbe05b3f386647895ee15..a1669df7d3178518842d9df34f00c2b18e2bebb5 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -40,12 +40,14 @@ func init() { var jsonrpcProtocol *JsonrpcProtocol +// JsonrpcProtocol ... type JsonrpcProtocol struct { protocol.BaseProtocol serverMap map[string]*Server serverLock sync.Mutex } +// NewJsonrpcProtocol ... func NewJsonrpcProtocol() *JsonrpcProtocol { return &JsonrpcProtocol{ BaseProtocol: protocol.NewBaseProtocol(), @@ -53,6 +55,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol { } } +// Export ... func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := strings.TrimPrefix(url.Path, "/") @@ -67,6 +70,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } +// Refer ... func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { //default requestTimeout var requestTimeout = config.GetConsumerConfig().RequestTimeout @@ -85,6 +89,7 @@ func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { return invoker } +// Destroy ... func (jp *JsonrpcProtocol) Destroy() { logger.Infof("jsonrpcProtocol destroy.") @@ -116,6 +121,7 @@ func (jp *JsonrpcProtocol) openServer(url common.URL) { } } +// GetProtocol ... func GetProtocol() protocol.Protocol { if jsonrpcProtocol == nil { jsonrpcProtocol = NewJsonrpcProtocol() diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 10a29e843efd8e013b58bcaa3ff4fc45a9fccc7a..2e1bc16986abe1933088dd76ddf31c39e87a9f06 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -56,6 +56,7 @@ const ( PathPrefix = byte('/') ) +// Server ... type Server struct { done chan struct{} once sync.Once @@ -65,6 +66,7 @@ type Server struct { timeout time.Duration } +// NewServer ... func NewServer() *Server { return &Server{ done: make(chan struct{}), @@ -223,6 +225,7 @@ func accept(listener net.Listener, fn func(net.Conn)) error { } } +// Start ... func (s *Server) Start(url common.URL) { listener, err := net.Listen("tcp", url.Location) if err != nil { @@ -249,6 +252,7 @@ func (s *Server) Start(url common.URL) { }() } +// Stop ... func (s *Server) Stop() { s.once.Do(func() { close(s.done) diff --git a/protocol/protocol.go b/protocol/protocol.go index 814a85163a99aa3b161b5eafbfed5f13ac4e3eb4..948da995edf6fe1834b749c00e76e61ff6fdd226 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -43,37 +43,45 @@ type Exporter interface { // base protocol ///////////////////////////// +// BaseProtocol ... type BaseProtocol struct { exporterMap *sync.Map invokers []Invoker } +// NewBaseProtocol ... func NewBaseProtocol() BaseProtocol { return BaseProtocol{ exporterMap: new(sync.Map), } } +// SetExporterMap ... func (bp *BaseProtocol) SetExporterMap(key string, exporter Exporter) { bp.exporterMap.Store(key, exporter) } +// ExporterMap ... func (bp *BaseProtocol) ExporterMap() *sync.Map { return bp.exporterMap } +// SetInvokers ... func (bp *BaseProtocol) SetInvokers(invoker Invoker) { bp.invokers = append(bp.invokers, invoker) } +// Invokers ... func (bp *BaseProtocol) Invokers() []Invoker { return bp.invokers } +// Export ... func (bp *BaseProtocol) Export(invoker Invoker) Exporter { return NewBaseExporter("base", invoker, bp.exporterMap) } +// Refer ... func (bp *BaseProtocol) Refer(url common.URL) Invoker { return NewBaseInvoker(url) } @@ -103,12 +111,14 @@ func (bp *BaseProtocol) Destroy() { // base exporter ///////////////////////////// +// BaseExporter ... type BaseExporter struct { key string invoker Invoker exporterMap *sync.Map } +// NewBaseExporter ... func NewBaseExporter(key string, invoker Invoker, exporterMap *sync.Map) *BaseExporter { return &BaseExporter{ key: key, @@ -117,11 +127,13 @@ func NewBaseExporter(key string, invoker Invoker, exporterMap *sync.Map) *BaseEx } } +// GetInvoker ... func (de *BaseExporter) GetInvoker() Invoker { return de.invoker } +// Unexport ... func (de *BaseExporter) Unexport() { logger.Infof("Exporter unexport.") de.invoker.Destroy() diff --git a/protocol/protocolwrapper/mock_protocol_filter.go b/protocol/protocolwrapper/mock_protocol_filter.go index 2efc34da4469cf369d4bbeb871ccfbdb73123f6a..dedf8aa64b6ae1b7b4782350e2625b02171aac44 100644 --- a/protocol/protocolwrapper/mock_protocol_filter.go +++ b/protocol/protocolwrapper/mock_protocol_filter.go @@ -28,6 +28,7 @@ import ( type mockProtocolFilter struct{} +// NewMockProtocolFilter ... func NewMockProtocolFilter() protocol.Protocol { return &mockProtocolFilter{} } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 33ea38201251df3abc6639b416200611cc993e56..08479fe9d139e31821a0a41a851ded52687f87b3 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -43,6 +43,7 @@ type ProtocolFilterWrapper struct { protocol protocol.Protocol } +// Export ... func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(invoker.GetUrl().Protocol) @@ -51,6 +52,7 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo return pfw.protocol.Export(invoker) } +// Refer ... func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(url.Protocol) @@ -58,6 +60,7 @@ func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) } +// Destroy ... func (pfw *ProtocolFilterWrapper) Destroy() { pfw.protocol.Destroy() } @@ -81,6 +84,7 @@ func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { return next } +// GetProtocol ... func GetProtocol() protocol.Protocol { return &ProtocolFilterWrapper{} } @@ -89,25 +93,30 @@ func GetProtocol() protocol.Protocol { // filter invoker /////////////////////////// +// FilterInvoker ... type FilterInvoker struct { next protocol.Invoker invoker protocol.Invoker filter filter.Filter } +// GetUrl ... func (fi *FilterInvoker) GetUrl() common.URL { return fi.invoker.GetUrl() } +// IsAvailable ... func (fi *FilterInvoker) IsAvailable() bool { return fi.invoker.IsAvailable() } +// Invoke ... func (fi *FilterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { result := fi.filter.Invoke(ctx, fi.next, invocation) return fi.filter.OnResponse(ctx, result, fi.invoker, invocation) } +// Destroy ... func (fi *FilterInvoker) Destroy() { fi.invoker.Destroy() } diff --git a/protocol/result.go b/protocol/result.go index dcdb62310d359d441067395ea92f8460df97eb22..34e76d2dddbaed33b2e2c015631443565cfaea87 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -17,6 +17,7 @@ package protocol +// Result ... type Result interface { SetError(error) Error() error @@ -32,12 +33,14 @@ type Result interface { // Result Impletment of RPC ///////////////////////////// +// RPCResult ... type RPCResult struct { Attrs map[string]string Err error Rest interface{} } +// SetError ... func (r *RPCResult) SetError(err error) { r.Err = err } @@ -46,26 +49,32 @@ func (r *RPCResult) Error() error { return r.Err } +// SetResult ... func (r *RPCResult) SetResult(rest interface{}) { r.Rest = rest } +// Result ... func (r *RPCResult) Result() interface{} { return r.Rest } +// SetAttachments ... func (r *RPCResult) SetAttachments(attr map[string]string) { r.Attrs = attr } +// Attachments ... func (r *RPCResult) Attachments() map[string]string { return r.Attrs } +// AddAttachment ... func (r *RPCResult) AddAttachment(key, value string) { r.Attrs[key] = value } +// Attachment ... func (r *RPCResult) Attachment(key, defaultValue string) string { v, ok := r.Attrs[key] if !ok { diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 67f05e98020298b04096d2ba05874143324a7c7e..639fd559aa16689a249d035895fc037dc3bc3f8b 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -32,6 +32,7 @@ var ( serviceStatistic sync.Map // url -> RPCStatus ) +// RPCStatus ... type RPCStatus struct { active int32 failed int32 @@ -45,51 +46,63 @@ type RPCStatus struct { lastRequestFailedTimestamp int64 } +// GetActive ... func (rpc *RPCStatus) GetActive() int32 { return atomic.LoadInt32(&rpc.active) } +// GetFailed ... func (rpc *RPCStatus) GetFailed() int32 { return atomic.LoadInt32(&rpc.failed) } +// GetTotal ... func (rpc *RPCStatus) GetTotal() int32 { return atomic.LoadInt32(&rpc.total) } +// GetTotalElapsed ... func (rpc *RPCStatus) GetTotalElapsed() int64 { return atomic.LoadInt64(&rpc.totalElapsed) } +// GetFailedElapsed ... func (rpc *RPCStatus) GetFailedElapsed() int64 { return atomic.LoadInt64(&rpc.failedElapsed) } +// GetMaxElapsed ... func (rpc *RPCStatus) GetMaxElapsed() int64 { return atomic.LoadInt64(&rpc.maxElapsed) } +// GetFailedMaxElapsed ... func (rpc *RPCStatus) GetFailedMaxElapsed() int64 { return atomic.LoadInt64(&rpc.failedMaxElapsed) } +// GetSucceededMaxElapsed ... func (rpc *RPCStatus) GetSucceededMaxElapsed() int64 { return atomic.LoadInt64(&rpc.succeededMaxElapsed) } +// GetLastRequestFailedTimestamp ... func (rpc *RPCStatus) GetLastRequestFailedTimestamp() int64 { return atomic.LoadInt64(&rpc.lastRequestFailedTimestamp) } +// GetSuccessiveRequestFailureCount ... func (rpc *RPCStatus) GetSuccessiveRequestFailureCount() int32 { return atomic.LoadInt32(&rpc.successiveRequestFailureCount) } +// GetURLStatus ... func GetURLStatus(url common.URL) *RPCStatus { rpcStatus, _ := serviceStatistic.LoadOrStore(url.Key(), &RPCStatus{}) return rpcStatus.(*RPCStatus) } +// GetMethodStatus ... func GetMethodStatus(url common.URL, methodName string) *RPCStatus { identifier := url.Key() methodMap, found := methodStatistics.Load(identifier) @@ -109,11 +122,13 @@ func GetMethodStatus(url common.URL, methodName string) *RPCStatus { return status } +// BeginCount ... func BeginCount(url common.URL, methodName string) { beginCount0(GetURLStatus(url)) beginCount0(GetMethodStatus(url, methodName)) } +// EndCount ... func EndCount(url common.URL, methodName string, elapsed int64, succeeded bool) { endCount0(GetURLStatus(url), elapsed, succeeded) endCount0(GetMethodStatus(url, methodName), elapsed, succeeded) @@ -148,6 +163,7 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { } } +// CurrentTimeMillis ... func CurrentTimeMillis() int64 { return time.Now().UnixNano() / int64(time.Millisecond) } diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 056a93aaff1ec657db89f21b4a6b28efc354b49b..55418318dfc52ed9f17f1ec6a18ad9ef9d8163bf 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -29,15 +29,19 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// BaseConfigurationListener ... type BaseConfigurationListener struct { configurators []config_center.Configurator dynamicConfiguration config_center.DynamicConfiguration defaultConfiguratorFunc func(url *common.URL) config_center.Configurator } +// Configurators ... func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator { return bcl.configurators } + +// InitWith ... func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) { bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() if bcl.dynamicConfiguration == nil { @@ -56,6 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente } } +// Process ... func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) if event.ConfigType == remoting.EventTypeDel { @@ -76,12 +81,15 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin bcl.configurators = ToConfigurators(urls, bcl.defaultConfiguratorFunc) return nil } + +// OverrideUrl ... func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { for _, v := range bcl.configurators { v.Configure(url) } } +// ToConfigurators ... func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { if len(urls) == 0 { return nil diff --git a/registry/directory/directory.go b/registry/directory/directory.go index e88c611f6f20bc182c3630e328caab848affc08b..4e221087f9f9c7214d81f9fc35da5f32bee678f5 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -41,10 +41,12 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// Options ... type Options struct { serviceTTL time.Duration } +// Option ... type Option func(*Options) type registryDirectory struct { @@ -61,6 +63,7 @@ type registryDirectory struct { Options } +// NewRegistryDirectory ... func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) { options := Options{ //default 300s diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 31d62fa916e5659cf424839cedf8f063fabedaa0..30e0cec67e2dca7242a6d04bab1a74cf92a7aabd 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -39,6 +39,7 @@ type dataListener struct { listener config_center.ConfigurationListener } +// NewRegistryDataListener ... func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { return &dataListener{listener: listener, interestedURL: []*common.URL{}} } @@ -77,6 +78,7 @@ type configurationListener struct { events chan *config_center.ConfigChangeEvent } +// NewConfigurationListener ... func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { // add a new waiter reg.wg.Add(1) diff --git a/registry/event.go b/registry/event.go index 24f5b72e8b27d4dc727e72d641d8bae3e00ff165..37d863d2162cb3b9d6a9f7eba8823286eb99441c 100644 --- a/registry/event.go +++ b/registry/event.go @@ -36,6 +36,7 @@ func init() { // service event ////////////////////////////////////////// +// ServiceEvent ... type ServiceEvent struct { Action remoting.EventType Service common.URL diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 512c452e39082d619ffceae7f82d28127fbe2975..0b5cbf0658e6a9af162a35250bfb25156e72dc24 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -30,11 +30,13 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +// MockRegistry ... type MockRegistry struct { listener *listener destroyed *atomic.Bool } +// NewMockRegistry ... func NewMockRegistry(url *common.URL) (Registry, error) { registry := &MockRegistry{ destroyed: atomic.NewBool(false), @@ -43,17 +45,24 @@ func NewMockRegistry(url *common.URL) (Registry, error) { registry.listener = listener return registry, nil } + +// Register ... func (*MockRegistry) Register(url common.URL) error { return nil } +// Destroy ... func (r *MockRegistry) Destroy() { if r.destroyed.CAS(false, true) { } } + +// IsAvailable ... func (r *MockRegistry) IsAvailable() bool { return !r.destroyed.Load() } + +// GetUrl ... func (r *MockRegistry) GetUrl() common.URL { return common.URL{} } @@ -61,6 +70,8 @@ func (r *MockRegistry) GetUrl() common.URL { func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { return r.listener, nil } + +// Subscribe ... func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { go func() { for { @@ -113,6 +124,7 @@ func (*listener) Close() { } +// MockEvent ... func (r *MockRegistry) MockEvent(event *ServiceEvent) { r.listener.listenChan <- event } diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 25cd3d09b5711e4e7db56cd8e40f3283f3252e10..a2237dca265f25b07b19a8e1f4fe5a5f6ea9183e 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -51,6 +51,7 @@ type nacosListener struct { subscribeParam *vo.SubscribeParam } +// NewNacosListener ... func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { listener := &nacosListener{ namingClient: namingClient, diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 9e6b9999b976d5cfcc76560731f383a52c2642f4..748b8204d97e60c9803821290184fc5717c41025 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -338,6 +338,7 @@ func setProviderUrl(regURL *common.URL, providerURL *common.URL) { regURL.SubURL = providerURL } +// GetProtocol ... func GetProtocol() protocol.Protocol { if regProtocol != nil { return regProtocol diff --git a/registry/registry.go b/registry/registry.go index c7279a29e1f423ca200aa2bf9390c127efcf10cb..e8cf5ecbb9e781d8c15e3e3c1bc8c2070e1526cc 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -38,6 +38,8 @@ type Registry interface { //mode2 : callback mode, subscribe with notify(notify listener). Subscribe(*common.URL, NotifyListener) } + +// NotifyListener ... type NotifyListener interface { Notify(*ServiceEvent) } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 53a592609153003d7d6c24881bccde0dfe6cdde6..0105087751f5e819fce245a4617e005a3bfaafd6 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -36,18 +36,23 @@ import ( zk "github.com/apache/dubbo-go/remoting/zookeeper" ) +// RegistryDataListener ... type RegistryDataListener struct { interestedURL []*common.URL listener config_center.ConfigurationListener } +// NewRegistryDataListener ... func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener { return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}} } + +// AddInterestedURL ... func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } +// DataChange ... func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { // Intercept the last bit index := strings.Index(eventType.Path, "/providers/") @@ -71,6 +76,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { return false } +// RegistryConfigurationListener ... type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry @@ -79,14 +85,18 @@ type RegistryConfigurationListener struct { closeOnce sync.Once } +// NewRegistryConfigurationListener ... func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.wg.Add(1) return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } + +// Process ... func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } +// Next ... func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -111,6 +121,8 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { } } } + +// Close ... func (l *RegistryConfigurationListener) Close() { // ensure that the listener will be closed at most once. l.closeOnce.Do(func() { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 24c4158e8cbe977e428ded523382288b4a93a0e1..e08266f71304aa8d2244ec459f979a7255e85839 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -113,10 +113,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { return r, nil } +// Options ... type Options struct { client *zookeeper.ZookeeperClient } +// Option ... type Option func(*Options) func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) { diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 050968565387fd31871b0aa8e9969496d39f6534..0e4b09bcf8552362f58bf3a3e3fbd80cf55affac 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -42,10 +42,12 @@ const ( ) var ( + // ErrNilETCDV3Client ... ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR ErrKVPairNotFound = perrors.New("k/v pair not found") ) +// Options ... type Options struct { name string endpoints []string @@ -54,30 +56,38 @@ type Options struct { heartbeat int // heartbeat second } +// Option ... type Option func(*Options) +// WithEndpoints ... func WithEndpoints(endpoints ...string) Option { return func(opt *Options) { opt.endpoints = endpoints } } + +// WithName ... func WithName(name string) Option { return func(opt *Options) { opt.name = name } } + +// WithTimeout ... func WithTimeout(timeout time.Duration) Option { return func(opt *Options) { opt.timeout = timeout } } +// WithHeartbeat ... func WithHeartbeat(heartbeat int) Option { return func(opt *Options) { opt.heartbeat = heartbeat } } +// ValidateClient ... func ValidateClient(container clientFacade, opts ...Option) error { options := &Options{ @@ -117,6 +127,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { return nil } +// Client ... type Client struct { lock sync.RWMutex @@ -191,6 +202,7 @@ func (c *Client) stop() bool { return false } +// Close ... func (c *Client) Close() { if c == nil { @@ -309,6 +321,7 @@ func (c *Client) get(k string) (string, error) { return string(resp.Kvs[0].Value), nil } +// CleanKV ... func (c *Client) CleanKV() error { c.lock.RLock() @@ -408,10 +421,12 @@ func (c *Client) keepAliveKV(k string, v string) error { return nil } +// Done ... func (c *Client) Done() <-chan struct{} { return c.exit } +// Valid ... func (c *Client) Valid() bool { select { case <-c.exit: @@ -428,6 +443,7 @@ func (c *Client) Valid() bool { return true } +// Create ... func (c *Client) Create(k string, v string) error { err := c.put(k, v) @@ -437,6 +453,7 @@ func (c *Client) Create(k string, v string) error { return nil } +// Delete ... func (c *Client) Delete(k string) error { err := c.delete(k) @@ -447,6 +464,7 @@ func (c *Client) Delete(k string) error { return nil } +// RegisterTemp ... func (c *Client) RegisterTemp(basePath string, node string) (string, error) { completeKey := path.Join(basePath, node) @@ -459,6 +477,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { return completeKey, nil } +// GetChildrenKVList ... func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { kList, vList, err := c.getChildren(k) @@ -468,6 +487,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { return kList, vList, nil } +// Get ... func (c *Client) Get(k string) (string, error) { v, err := c.get(k) @@ -478,6 +498,7 @@ func (c *Client) Get(k string) (string, error) { return v, nil } +// Watch ... func (c *Client) Watch(k string) (clientv3.WatchChan, error) { wc, err := c.watch(k) @@ -487,6 +508,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) { return wc, nil } +// WatchWithPrefix ... func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { wc, err := c.watchWithPrefix(prefix) diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index 499044b8d77d3dcd8d32b0cb70cb78f84fae8ec4..d00620661d0faf909907d3dc7c08e999de134fee 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -43,6 +43,7 @@ type clientFacade interface { common.Node } +// HandleClientRestart ... func HandleClientRestart(r clientFacade) { var ( diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index a4d5805a6dbf3c76f43cb6085653c791b33ab119..4d2970fe3375e7f0286e5b4038b7062ed0a730a1 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -33,6 +33,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// EventListener ... type EventListener struct { client *Client keyMapLock sync.Mutex @@ -40,6 +41,7 @@ type EventListener struct { wg sync.WaitGroup } +// NewEventListener ... func NewEventListener(client *Client) *EventListener { return &EventListener{ client: client, @@ -229,6 +231,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis }(key) } +// Close ... func (l *EventListener) Close() { l.wg.Wait() } diff --git a/remoting/listener.go b/remoting/listener.go index 8d1e357d37ff92e7bf60121133998dc1745c9af8..12e2d50e5537dd0e0559ebd97e581fe0277cb245 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -21,6 +21,7 @@ import ( "fmt" ) +// DataListener ... type DataListener interface { DataChange(eventType Event) bool //bool is return for interface implement is interesting } @@ -29,6 +30,7 @@ type DataListener interface { // event type ////////////////////////////////////////// +// EventType ... type EventType int const ( @@ -51,6 +53,7 @@ func (t EventType) String() string { // service event ////////////////////////////////////////// +// Event ... type Event struct { Path string Action EventType diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 19d65291e94835c60ba412414090999b34bc4d48..095d04ed0f0e49211a6c2a8ccdaa64ed31edb186 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -43,6 +43,7 @@ var ( errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") ) +// ZookeeperClient ... type ZookeeperClient struct { name string ZkAddrs []string @@ -54,6 +55,7 @@ type ZookeeperClient struct { eventRegistry map[string][]*chan struct{} } +// StateToString ... func StateToString(state zk.State) string { switch state { case zk.StateDisconnected: @@ -85,6 +87,7 @@ func StateToString(state zk.State) string { return "zookeeper unknown state" } +// Options ... type Options struct { zkName string client *ZookeeperClient @@ -92,14 +95,17 @@ type Options struct { ts *zk.TestCluster } +// Option ... type Option func(*Options) +// WithZkName ... func WithZkName(name string) Option { return func(opt *Options) { opt.zkName = name } } +// ValidateZookeeperClient ... func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { var ( err error @@ -173,12 +179,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (* return z, nil } +// WithTestCluster ... func WithTestCluster(ts *zk.TestCluster) Option { return func(opt *Options) { opt.ts = ts } } +// NewMockZookeeperClient ... func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) { var ( err error @@ -224,6 +232,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) return ts, z, event, nil } +// HandleZkEvent ... func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { var ( state int @@ -282,6 +291,7 @@ LOOP: } } +// RegisterEvent ... func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" || event == nil { return @@ -296,6 +306,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { z.Unlock() } +// UnregisterEvent ... func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return @@ -322,6 +333,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { } } +// Done ... func (z *ZookeeperClient) Done() <-chan struct{} { return z.exit } @@ -337,6 +349,7 @@ func (z *ZookeeperClient) stop() bool { return false } +// ZkConnValid ... func (z *ZookeeperClient) ZkConnValid() bool { select { case <-z.exit: @@ -354,6 +367,7 @@ func (z *ZookeeperClient) ZkConnValid() bool { return valid } +// Close ... func (z *ZookeeperClient) Close() { if z == nil { return @@ -370,6 +384,7 @@ func (z *ZookeeperClient) Close() { logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) } +// Create ... func (z *ZookeeperClient) Create(basePath string) error { var ( err error @@ -398,6 +413,7 @@ func (z *ZookeeperClient) Create(basePath string) error { return nil } +// Delete ... func (z *ZookeeperClient) Delete(basePath string) error { var ( err error @@ -413,6 +429,7 @@ func (z *ZookeeperClient) Delete(basePath string) error { return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath) } +// RegisterTemp ... func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) { var ( err error @@ -439,6 +456,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er return tmpPath, nil } +// RegisterTempSeq ... func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) { var ( err error @@ -467,6 +485,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, return tmpPath, nil } +// GetChildrenW ... func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) { var ( err error @@ -498,6 +517,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, return children, event, nil } +// GetChildren ... func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { var ( err error @@ -528,6 +548,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { return children, nil } +// ExistW ... func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { var ( exist bool @@ -553,6 +574,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { return event, nil } +// GetContent ... func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) { return z.Conn.Get(zkPath) } diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index cdc7ead61226906a629fdb99b6b966ada5ee5253..18f1a049883bbd5bd7d698dc05432e93eea6ce83 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -40,6 +40,7 @@ type zkClientFacade interface { common.Node } +// HandleClientRestart ... func HandleClientRestart(r zkClientFacade) { var ( err error diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 0b9db5e09d9e2a81d4545bb03b979b3623cd217d..25805e8deb33685652abe0e4687e830ffac839f6 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -35,6 +35,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// ZkEventListener ... type ZkEventListener struct { client *ZookeeperClient pathMapLock sync.Mutex @@ -42,6 +43,7 @@ type ZkEventListener struct { wg sync.WaitGroup } +// NewZkEventListener ... func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { return &ZkEventListener{ client: client, @@ -49,10 +51,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { } } +// SetClient ... func (l *ZkEventListener) SetClient(client *ZookeeperClient) { l.client = client } +// ListenServiceNodeEvent ... func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { l.wg.Add(1) defer l.wg.Done() @@ -324,6 +328,7 @@ func (l *ZkEventListener) valid() bool { return l.client.ZkConnValid() } +// Close ... func (l *ZkEventListener) Close() { l.wg.Wait() }