diff --git a/cluster/router/condition_router.go b/cluster/router/condition_router.go index a196ceb5771422f06a820986a02499f9fe3523dc..efae65ccb34eb8a78e281cfaf7b1fcec79b3d163 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition_router.go @@ -156,10 +156,13 @@ func parseRule(rule string) (map[string]MatchPair, error) { if len(rule) == 0 { return condition, nil } - var pair MatchPair + + var ( + pair MatchPair + startIndex int + ) values := gxset.NewSet() reg := regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`) - var startIndex = 0 if indexTuple := reg.FindIndex([]byte(rule)); len(indexTuple) > 0 { startIndex = indexTuple[0] } @@ -227,7 +230,7 @@ func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U if sample == nil { return true, perrors.Errorf("url is not allowed be nil") } - result := false + var result bool for key, matchPair := range pairs { var sampleValue string diff --git a/common/constant/key.go b/common/constant/key.go index 6d6e322f15900336a13813547a99021a356e9bcd..17368b45ae49d06310ecff4b9cf05e7b8b4d26f7 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -103,6 +103,7 @@ const ( const ( CONFIG_NAMESPACE_KEY = "config.namespace" CONFIG_GROUP_KEY = "config.group" + CONFIG_APP_ID_KEY = "config.appId" CONFIG_CLUSTER_KEY = "config.cluster" CONFIG_CHECK_KEY = "config.check" CONFIG_TIMEOUT_KET = "config.timeout" diff --git a/common/extension/filter.go b/common/extension/filter.go index e2a66c7449448a2229c53eabb478f2c96a429bc7..93f7f8cf7ccc4108fe1120b685fad36a2f9f83df 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -33,7 +33,7 @@ func SetFilter(name string, v func() filter.Filter) { func GetFilter(name string) filter.Filter { if filters[name] == nil { - panic("filter for " + name + " is not existing, make sure you have import the package.") + panic("filter for " + name + " is not existing, make sure you have imported the package.") } return filters[name]() } diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index 65891c79336224f59b66f8312693c6b5151a7e28..151c33ad5e64ffa4059489e2cbcfae6f2e823328 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -22,7 +22,7 @@ import ( ) var ( - tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy) + tpsLimitStrategy = make(map[string]tps.TpsLimitStrategyCreator) tpsLimiter = make(map[string]func() tps.TpsLimiter) ) @@ -39,11 +39,11 @@ func GetTpsLimiter(name string) tps.TpsLimiter { return creator() } -func SetTpsLimitStrategy(name string, creator func(rate int, interval int) tps.TpsLimitStrategy) { +func SetTpsLimitStrategy(name string, creator tps.TpsLimitStrategyCreator) { tpsLimitStrategy[name] = creator } -func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) tps.TpsLimitStrategy { +func GetTpsLimitStrategyCreator(name string) tps.TpsLimitStrategyCreator { creator, ok := tpsLimitStrategy[name] if !ok { panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " + diff --git a/common/logger/logger.go b/common/logger/logger.go index f41e95744f954da69b0e3695c97ba3389c69160a..db91d2e7c1e5f7a647eefbfa5aec14073c2b14a7 100644 --- a/common/logger/logger.go +++ b/common/logger/logger.go @@ -40,6 +40,11 @@ var ( logger Logger ) +type DubboLogger struct { + Logger + dynamicLevel zap.AtomicLevel +} + type Logger interface { Info(args ...interface{}) Warn(args ...interface{}) @@ -109,7 +114,8 @@ func InitLogger(conf *zap.Config) { zapLoggerConfig = *conf } zapLogger, _ := zapLoggerConfig.Build(zap.AddCallerSkip(1)) - logger = zapLogger.Sugar() + //logger = zapLogger.Sugar() + logger = &DubboLogger{Logger: zapLogger.Sugar(), dynamicLevel: zapLoggerConfig.Level} // set getty log getty.SetLogger(logger) @@ -123,3 +129,22 @@ func SetLogger(log Logger) { func GetLogger() Logger { return logger } + +func SetLoggerLevel(level string) bool { + if l, ok := logger.(OpsLogger); ok { + l.SetLoggerLevel(level) + return true + } + return false +} + +type OpsLogger interface { + Logger + SetLoggerLevel(level string) +} + +func (dl *DubboLogger) SetLoggerLevel(level string) { + l := new(zapcore.Level) + l.Set(level) + dl.dynamicLevel.SetLevel(*l) +} diff --git a/common/logger/logger_test.go b/common/logger/logger_test.go index e29b7cbc8e9bbd67df41df5ac687a079621c3360..6081f71aecccbfab5fd574335effe7788b6ce799 100644 --- a/common/logger/logger_test.go +++ b/common/logger/logger_test.go @@ -65,3 +65,19 @@ func TestInitLog(t *testing.T) { Warnf("%s", "warn") Errorf("%s", "error") } + +func TestSetLevel(t *testing.T) { + err := InitLog("./log.yml") + assert.NoError(t, err) + Debug("debug") + Info("info") + + assert.True(t, SetLoggerLevel("info")) + Debug("debug") + Info("info") + + SetLogger(GetLogger().(*DubboLogger).Logger) + assert.False(t, SetLoggerLevel("debug")) + Debug("debug") + Info("info") +} diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 1c079f6bca52bf8f6e8c5ebb168da82ab8ccb5f2..d13646dba86eea04adb3726d33ee9d20457276b6 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -181,3 +181,7 @@ func (p *Proxy) Implement(v common.RPCService) { func (p *Proxy) Get() common.RPCService { return p.rpc } + +func (p *Proxy) GetCallback() interface{} { + return p.callBack +} diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 2567e0ee09cf7fa5aef7fde46872eb88205d8e45..116cfe06693b6923ca10e0df6964317dabd91d0e 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -24,6 +24,7 @@ import ( type ProxyFactory interface { GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy GetInvoker(url common.URL) protocol.Invoker } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index bafba60b400ec59d99e2d68ecf4d067c906ba6fb..06824fdc1e27cde5e1905be3277451dd4395049c 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -55,11 +55,16 @@ func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { + return factory.GetAsyncProxy(invoker, nil, url) +} + +func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - return proxy.NewProxy(invoker, nil, attachments) + return proxy.NewProxy(invoker, callBack, attachments) } + func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go index b6a6b675baf992b2d64ffd19291ee2dc009bd1e3..7159b4b00eb2fcddb0f20f701f56b3179e57c4a0 100644 --- a/common/proxy/proxy_factory/default_test.go +++ b/common/proxy/proxy_factory/default_test.go @@ -18,6 +18,7 @@ package proxy_factory import ( + "fmt" "testing" ) @@ -37,6 +38,21 @@ func Test_GetProxy(t *testing.T) { assert.NotNil(t, proxy) } +type TestAsync struct { +} + +func (u *TestAsync) CallBack(res common.CallbackResponse) { + fmt.Println("CallBack res:", res) +} + +func Test_GetAsyncProxy(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions() + async := &TestAsync{} + proxy := proxyFactory.GetAsyncProxy(protocol.NewBaseInvoker(*url), async.CallBack, url) + assert.NotNil(t, proxy) +} + func Test_GetInvoker(t *testing.T) { proxyFactory := NewDefaultProxyFactory() url := common.NewURLWithOptions() diff --git a/common/rpc_service.go b/common/rpc_service.go index 4741a6fa3c0daef97f044f639a5e64a38fe4a187..4c9f083dd0850c3f110491ef820c7b677c8009aa 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -39,6 +39,18 @@ type RPCService interface { Reference() string // rpc service id or reference id } +//AsyncCallbackService callback interface for async +type AsyncCallbackService interface { + CallBack(response CallbackResponse) // callback +} + +//CallbackResponse for different protocol +type CallbackResponse interface { +} + +//AsyncCallback async callback method +type AsyncCallback func(response CallbackResponse) + // for lowercase func // func MethodMapper() map[string][string] { // return map[string][string]{} diff --git a/config/base_config.go b/config/base_config.go index 6678e7c6813230d9ce2563621ae684dbd155eedf..64418f0a6d4c09270d48e6e9e6366a02783508d3 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -68,7 +68,7 @@ func (c *BaseConfig) prepareEnvironment() error { logger.Errorf("Get dynamic configuration error , error message is %v", err) return perrors.WithStack(err) } - content, err := dynamicConfig.GetConfig(c.ConfigCenterConfig.ConfigFile, config_center.WithGroup(c.ConfigCenterConfig.Group)) + content, err := dynamicConfig.GetProperties(c.ConfigCenterConfig.ConfigFile, config_center.WithGroup(c.ConfigCenterConfig.Group)) if err != nil { logger.Errorf("Get config content in dynamic configuration error , error message is %v", err) return perrors.WithStack(err) @@ -88,7 +88,7 @@ func (c *BaseConfig) prepareEnvironment() error { if len(configFile) == 0 { configFile = c.ConfigCenterConfig.ConfigFile } - appContent, err = dynamicConfig.GetConfig(configFile, config_center.WithGroup(appGroup)) + appContent, err = dynamicConfig.GetProperties(configFile, config_center.WithGroup(appGroup)) } //global config file mapContent, err := dynamicConfig.Parser().Parse(content) diff --git a/config/config_center_config.go b/config/config_center_config.go index 9c100b34974af604a9613eb8e1559360c36b7a6b..013d23946a042906021d3b1d37b38f326f67f50a 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -42,6 +42,7 @@ type ConfigCenterConfig struct { ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"` Namespace string `default:"dubbo.properties" yaml:"namespace" json:"namespace,omitempty"` AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` + AppId string `default:"dubbo" yaml:"app_id" json:"app_id,omitempty"` TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` timeout time.Duration } @@ -62,5 +63,6 @@ func (c *ConfigCenterConfig) GetUrlMap() url.Values { urlMap.Set(constant.CONFIG_NAMESPACE_KEY, c.Namespace) urlMap.Set(constant.CONFIG_GROUP_KEY, c.Group) urlMap.Set(constant.CONFIG_CLUSTER_KEY, c.Cluster) + urlMap.Set(constant.CONFIG_APP_ID_KEY, c.AppId) return urlMap } diff --git a/config/reference_config.go b/config/reference_config.go index 8703c459bab306f98beb1668a1f9438126586f24..6b34f5535964a98516fbb215312575c9d3cfeb86 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -55,7 +55,7 @@ type ReferenceConfig struct { Group string `yaml:"group" json:"group,omitempty" property:"group"` Version string `yaml:"version" json:"version,omitempty" property:"version"` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - async bool `yaml:"async" json:"async,omitempty" property:"async"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL @@ -141,7 +141,12 @@ func (refconfig *ReferenceConfig) Refer() { } //create proxy - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + if refconfig.Async { + callback := GetCallback(refconfig.id) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(refconfig.invoker, callback, url) + } else { + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + } } // @v is service provider implemented RPCService @@ -169,7 +174,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) //getty invoke async or sync - urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) + urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) //application info urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index a81dbf06cef7d275cf6af4a7f651ff8d1600a3c9..a7af925cabcf6b4e7db9213f2bb6953bea965699 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -81,6 +81,7 @@ func doInitConsumer() { }, References: map[string]*ReferenceConfig{ "MockService": { + id: "MockProvider", Params: map[string]string{ "serviceid": "soa.mock", "forks": "5", @@ -110,6 +111,26 @@ func doInitConsumer() { } } +var mockProvider = new(MockProvider) + +type MockProvider struct { +} + +func (m *MockProvider) Reference() string { + return "MockProvider" +} + +func (m *MockProvider) CallBack(res common.CallbackResponse) { +} + +func doInitConsumerAsync() { + doInitConsumer() + SetConsumerService(mockProvider) + for _, v := range consumerConfig.References { + v.Async = true + } +} + func doInitConsumerWithSingleRegistry() { consumerConfig = &ConsumerConfig{ ApplicationConfig: &ApplicationConfig{ @@ -181,6 +202,22 @@ func Test_Refer(t *testing.T) { } consumerConfig = nil } + +func Test_ReferAsync(t *testing.T) { + doInitConsumerAsync() + extension.SetProtocol("registry", GetProtocol) + extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + + for _, reference := range consumerConfig.References { + reference.Refer() + assert.Equal(t, "soa.mock", reference.Params["serviceid"]) + assert.NotNil(t, reference.invoker) + assert.NotNil(t, reference.pxy) + assert.NotNil(t, reference.pxy.GetCallback()) + } + consumerConfig = nil +} + func Test_ReferP2P(t *testing.T) { doInitConsumer() extension.SetProtocol("dubbo", GetProtocol) diff --git a/config/service.go b/config/service.go index 2bceac4a8c20bb598dc2607c90c8206e4a448808..f1b51790ca13df0534882837397181e45e56ffa3 100644 --- a/config/service.go +++ b/config/service.go @@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService { func GetProviderService(name string) common.RPCService { return proServices[name] } + +func GetCallback(name string) func(response common.CallbackResponse) { + service := GetConsumerService(name) + if sv, ok := service.(common.AsyncCallbackService); ok { + return sv.CallBack + } + return nil +} diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index 4eff318e54c795428732c3e045c7a77321d3e777..ed46d4f9635d4d480a21d09fce0ec4ec84d47a66 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -57,7 +57,7 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) { configAddr := c.getAddressWithProtocolPrefix(url) configCluster := url.GetParam(constant.CONFIG_CLUSTER_KEY, "") - appId := url.GetParam(constant.CONFIG_GROUP_KEY, DEFAULT_GROUP) + appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "") namespaces := url.GetParam(constant.CONFIG_NAMESPACE_KEY, getProperties(DEFAULT_GROUP)) c.appConf = &agollo.AppConfig{ AppId: appId, @@ -116,32 +116,28 @@ func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat) } -func (c *apolloConfiguration) GetConfig(key string, opts ...Option) (string, error) { - k := &Options{} - for _, opt := range opts { - opt(k) +func (c *apolloConfiguration) GetInternalProperty(key string, opts ...Option) (string, error) { + config := agollo.GetConfig(c.appConf.NamespaceName) + if config == nil { + return "", errors.New(fmt.Sprintf("nothing in namespace:%s ", key)) } + return config.GetStringValue(key, ""), nil +} + +func (c *apolloConfiguration) GetRule(key string, opts ...Option) (string, error) { + return c.GetInternalProperty(key, opts...) +} + +func (c *apolloConfiguration) GetProperties(key string, opts ...Option) (string, error) { /** * when group is not null, we are getting startup configs(config file) from Config Center, for example: * key=dubbo.propertie */ - if len(k.Group) != 0 { - config := agollo.GetConfig(key) - if config == nil { - return "", errors.New(fmt.Sprintf("nothiing in namespace:%s ", key)) - } - return config.GetContent(agollo.Properties), nil - } - - /** - * when group is null, we are fetching governance rules(config item) from Config Center, for example: - * namespace=use default, key =application.organization - */ - config := agollo.GetConfig(c.appConf.NamespaceName) + config := agollo.GetConfig(key) if config == nil { - return "", errors.New(fmt.Sprintf("nothiing in namespace:%s ", key)) + return "", errors.New(fmt.Sprintf("nothing in namespace:%s ", key)) } - return config.GetStringValue(key, ""), nil + return config.GetContent(agollo.Properties), nil } func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string { @@ -170,7 +166,3 @@ func (c *apolloConfiguration) Parser() parser.ConfigurationParser { func (c *apolloConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } - -func (c *apolloConfiguration) GetConfigs(key string, opts ...Option) (string, error) { - return c.GetConfig(key, opts...) -} diff --git a/config_center/apollo/impl_test.go b/config_center/apollo/impl_test.go index 2bb8b0ad69e3e506ae4df5faf07936d4daf00312..e898be91ee356180f5967f9dd5a02df0dbcfb311 100644 --- a/config_center/apollo/impl_test.go +++ b/config_center/apollo/impl_test.go @@ -165,7 +165,7 @@ func runMockConfigServer(handlerMap map[string]func(http.ResponseWriter, *http.R func Test_GetConfig(t *testing.T) { configuration := initMockApollo(t) - configs, err := configuration.GetConfig(mockNamespace, config_center.WithGroup("dubbo")) + configs, err := configuration.GetProperties(mockNamespace, config_center.WithGroup("dubbo")) assert.NoError(t, err) configuration.SetParser(&parser.DefaultConfigurationParser{}) mapContent, err := configuration.Parser().Parse(configs) @@ -175,7 +175,7 @@ func Test_GetConfig(t *testing.T) { func Test_GetConfigItem(t *testing.T) { configuration := initMockApollo(t) - configs, err := configuration.GetConfig("application.organization") + configs, err := configuration.GetInternalProperty("application.organization") assert.NoError(t, err) configuration.SetParser(&parser.DefaultConfigurationParser{}) assert.NoError(t, err) @@ -186,7 +186,7 @@ func initMockApollo(t *testing.T) *apolloConfiguration { c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{ Protocol: "apollo", Address: "106.12.25.204:8080", - Group: "testApplication_yang", + AppId: "testApplication_yang", Cluster: "dev", Namespace: "mockDubbog.properties", }} diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 1028b26d963cfcb02636113abc3e482bb22192a0..0546d39732deaa83ace948275a0d4448b1b24cf8 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -36,8 +36,14 @@ type DynamicConfiguration interface { SetParser(parser.ConfigurationParser) AddListener(string, ConfigurationListener, ...Option) RemoveListener(string, ConfigurationListener, ...Option) - GetConfig(string, ...Option) (string, error) - GetConfigs(string, ...Option) (string, error) + //GetProperties get properties file + GetProperties(string, ...Option) (string, error) + + //GetRule get Router rule properties file + GetRule(string, ...Option) (string, error) + + //GetInternalProperty get value by key in Default properties file(dubbo.properties) + GetInternalProperty(string, ...Option) (string, error) } type Options struct { diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 47b509231d225491e6791e295a707756256f61d5..79c7c171945400a52563e0b66ef29c2896db0b99 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -108,6 +108,18 @@ func (c *MockDynamicConfiguration) Parser() parser.ConfigurationParser { func (c *MockDynamicConfiguration) SetParser(p parser.ConfigurationParser) { c.parser = p } +func (c *MockDynamicConfiguration) GetProperties(key string, opts ...Option) (string, error) { + return c.content, nil +} + +//For zookeeper, getConfig and getConfigs have the same meaning. +func (c *MockDynamicConfiguration) GetInternalProperty(key string, opts ...Option) (string, error) { + return c.GetProperties(key, opts...) +} + +func (c *MockDynamicConfiguration) GetRule(key string, opts ...Option) (string, error) { + return c.GetProperties(key, opts...) +} func (c *MockDynamicConfiguration) MockServiceConfigEvent() { config := &parser.ConfiguratorConfig{ diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 84e4b54e237fabb5775bfd0dfeb7043f1794a7ae..504d4910581aff52afa74b13fdfce61c9170ca48 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -109,7 +109,7 @@ func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener conf c.cacheListener.RemoveListener(key, listener) } -func (c *zookeeperDynamicConfiguration) GetConfig(key string, opts ...config_center.Option) (string, error) { +func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { tmpOpts := &config_center.Options{} for _, opt := range opts { @@ -141,8 +141,12 @@ func (c *zookeeperDynamicConfiguration) GetConfig(key string, opts ...config_cen } //For zookeeper, getConfig and getConfigs have the same meaning. -func (c *zookeeperDynamicConfiguration) GetConfigs(key string, opts ...config_center.Option) (string, error) { - return c.GetConfig(key, opts...) +func (c *zookeeperDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { + return c.GetProperties(key, opts...) +} + +func (c *zookeeperDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { + return c.GetProperties(key, opts...) } func (c *zookeeperDynamicConfiguration) Parser() parser.ConfigurationParser { diff --git a/config_center/zookeeper/impl_test.go b/config_center/zookeeper/impl_test.go index 2f620457f75b7e35f713423e3841d0272cbd0730..e614009faa5b32873c6245dea5c85cc2747e19ea 100644 --- a/config_center/zookeeper/impl_test.go +++ b/config_center/zookeeper/impl_test.go @@ -81,7 +81,7 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC func Test_GetConfig(t *testing.T) { ts, reg := initZkData("dubbo", t) defer ts.Stop() - configs, err := reg.GetConfig("dubbo.properties", config_center.WithGroup("dubbo")) + configs, err := reg.GetProperties("dubbo.properties", config_center.WithGroup("dubbo")) assert.NoError(t, err) m, err := reg.Parser().Parse(configs) assert.NoError(t, err) diff --git a/filter/common/impl/rejected_execution_handler_mock.go b/filter/common/impl/rejected_execution_handler_mock.go index 2f7869d61ea2cf4cf8e490dd004ab086b9492132..dace1894668d3a4a154a87bfbdbcc860a97a11ec 100644 --- a/filter/common/impl/rejected_execution_handler_mock.go +++ b/filter/common/impl/rejected_execution_handler_mock.go @@ -20,11 +20,17 @@ // Package filter is a generated GoMock package. package impl +import ( + reflect "reflect" +) + +import ( + gomock "github.com/golang/mock/gomock" +) + import ( common "github.com/apache/dubbo-go/common" protocol "github.com/apache/dubbo-go/protocol" - gomock "github.com/golang/mock/gomock" - reflect "reflect" ) // MockRejectedExecutionHandler is a mock of RejectedExecutionHandler interface diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go b/filter/impl/tps/impl/tps_limit_fix_window_strategy.go index 7290619d5551ebe29209e7cfa717bd66442ca2df..285ecfa658cf838cc1140ba716bd72e1976b86fe 100644 --- a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go +++ b/filter/impl/tps/impl/tps_limit_fix_window_strategy.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" ) + import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -32,8 +33,9 @@ const ( ) func init() { - extension.SetTpsLimitStrategy(FixedWindowKey, NewFixedWindowTpsLimitStrategyImpl) - extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, NewFixedWindowTpsLimitStrategyImpl) + creator := &fixedWindowStrategyCreator{} + extension.SetTpsLimitStrategy(FixedWindowKey, creator) + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, creator) } /** @@ -75,10 +77,12 @@ func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { return atomic.AddInt32(&impl.count, 1) <= impl.rate } -func NewFixedWindowTpsLimitStrategyImpl(rate int, interval int) tps.TpsLimitStrategy { +type fixedWindowStrategyCreator struct{} + +func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { return &FixedWindowTpsLimitStrategyImpl{ rate: int32(rate), - interval: int64(interval * 1000), // convert to ns + interval: int64(interval) * int64(time.Millisecond), // convert to ns count: 0, timestamp: time.Now().UnixNano(), } diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go b/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go index 55d0b14b75e69b44cf9ebe3a615e1a05c60d4b41..7ef539ed3b2b93da5c56a05f606e75282226d1ef 100644 --- a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go +++ b/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go @@ -27,16 +27,17 @@ import ( ) func TestFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { - strategy := NewFixedWindowTpsLimitStrategyImpl(2, 60000) + creator := &fixedWindowStrategyCreator{} + strategy := creator.Create(2, 60000) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) - strategy = NewFixedWindowTpsLimitStrategyImpl(2, 2000) + strategy = creator.Create(2, 2000) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) - time.Sleep(time.Duration(2100 * 1000)) + time.Sleep(2100 * time.Millisecond) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go b/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go index de98eb7528f541ed57b04309e2c9c74b8310cc64..d1a5db6e259ffa63282065f881f6cc8360c8d25b 100644 --- a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go +++ b/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go @@ -29,7 +29,7 @@ import ( ) func init() { - extension.SetTpsLimitStrategy("slidingWindow", NewSlidingWindowTpsLimitStrategyImpl) + extension.SetTpsLimitStrategy("slidingWindow", &slidingWindowStrategyCreator{}) } /** @@ -80,10 +80,12 @@ func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { return false } -func NewSlidingWindowTpsLimitStrategyImpl(rate int, interval int) tps.TpsLimitStrategy { +type slidingWindowStrategyCreator struct{} + +func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { return &SlidingWindowTpsLimitStrategyImpl{ rate: rate, - interval: int64(interval * 1000), + interval: int64(interval) * int64(time.Millisecond), mutex: &sync.Mutex{}, queue: list.New(), } diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go b/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go index 1d0187fa201741a32f109abe51ce63b5568e4cc4..075f1d9d2be2d18edfee7dc8691b71da65f5da45 100644 --- a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go +++ b/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go @@ -27,18 +27,19 @@ import ( ) func TestSlidingWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { - strategy := NewSlidingWindowTpsLimitStrategyImpl(2, 60000) + creator := &slidingWindowStrategyCreator{} + strategy := creator.Create(2, 60000) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) - time.Sleep(time.Duration(2100 * 1000)) + time.Sleep(2100 * time.Millisecond) assert.False(t, strategy.IsAllowable()) - strategy = NewSlidingWindowTpsLimitStrategyImpl(2, 2000) + strategy = creator.Create(2, 2000) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) - time.Sleep(time.Duration(2100 * 1000)) + time.Sleep(2100 * time.Millisecond) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go index 5f43e8c3bf6c1db268282a0fb5d9ecc55fb357df..9a1b21a3349845e32cb0fe38b07a7f932ec4f454 100644 --- a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go +++ b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go @@ -27,7 +27,9 @@ import ( ) func init() { - extension.SetTpsLimitStrategy("threadSafeFixedWindow", NewThreadSafeFixedWindowTpsLimitStrategyImpl) + extension.SetTpsLimitStrategy("threadSafeFixedWindow", &threadSafeFixedWindowStrategyCreator{ + fixedWindowStrategyCreator: &fixedWindowStrategyCreator{}, + }) } /** @@ -56,8 +58,12 @@ func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool { return impl.fixedWindow.IsAllowable() } -func NewThreadSafeFixedWindowTpsLimitStrategyImpl(rate int, interval int) tps.TpsLimitStrategy { - fixedWindowStrategy := NewFixedWindowTpsLimitStrategyImpl(rate, interval).(*FixedWindowTpsLimitStrategyImpl) +type threadSafeFixedWindowStrategyCreator struct { + fixedWindowStrategyCreator *fixedWindowStrategyCreator +} + +func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { + fixedWindowStrategy := creator.fixedWindowStrategyCreator.Create(rate, interval).(*FixedWindowTpsLimitStrategyImpl) return &ThreadSafeFixedWindowTpsLimitStrategyImpl{ fixedWindow: fixedWindowStrategy, mutex: &sync.Mutex{}, diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go index fea93dfa3b8ef49034e952619f617bf87e4be879..129493962403e0028b09f9646054fda236c99ff7 100644 --- a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go +++ b/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go @@ -27,16 +27,17 @@ import ( ) func TestThreadSafeFixedWindowTpsLimitStrategyImpl_IsAllowable(t *testing.T) { - strategy := NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 60000) + creator := &threadSafeFixedWindowStrategyCreator{} + strategy := creator.Create(2, 60000) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) - strategy = NewThreadSafeFixedWindowTpsLimitStrategyImpl(2, 2000) + strategy = creator.Create(2, 2000) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) - time.Sleep(time.Duration(2100 * 1000)) + time.Sleep(2100 * time.Millisecond) assert.True(t, strategy.IsAllowable()) assert.True(t, strategy.IsAllowable()) assert.False(t, strategy.IsAllowable()) diff --git a/filter/impl/tps/impl/tps_limiter_method_service.go b/filter/impl/tps/impl/tps_limiter_method_service.go index 3faf0d6e672315bb83e951be45a7f93c4ac508ef..426ae5994867c5a09653641870ebcef531c0d43c 100644 --- a/filter/impl/tps/impl/tps_limiter_method_service.go +++ b/filter/impl/tps/impl/tps_limiter_method_service.go @@ -148,7 +148,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) - limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator(int(limitRate), int(limitInterval))) + limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval))) return limitState.(tps.TpsLimitStrategy).IsAllowable() } diff --git a/filter/impl/tps/impl/tps_limiter_method_service_test.go b/filter/impl/tps/impl/tps_limiter_method_service_test.go index 006e9463871061488f696366d251c54fb8cefef5..e747d4682d0a8bdee03da6f012fb76b7bd1e02af 100644 --- a/filter/impl/tps/impl/tps_limiter_method_service_test.go +++ b/filter/impl/tps/impl/tps_limiter_method_service_test.go @@ -48,10 +48,12 @@ func TestMethodServiceTpsLimiterImpl_IsAllowable_Only_Service_Level(t *testing.T mockStrategyImpl := NewMockTpsLimitStrategy(ctrl) mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) - extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) tps.TpsLimitStrategy { - assert.Equal(t, 20, rate) - assert.Equal(t, 60000, interval) - return mockStrategyImpl + + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, &mockStrategyCreator{ + rate: 20, + interval: 60000, + t: t, + strategy: mockStrategyImpl, }) limiter := GetMethodServiceTpsLimiter() @@ -95,10 +97,12 @@ func TestMethodServiceTpsLimiterImpl_IsAllowable_Method_Level_Override(t *testin mockStrategyImpl := NewMockTpsLimitStrategy(ctrl) mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) - extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) tps.TpsLimitStrategy { - assert.Equal(t, 40, rate) - assert.Equal(t, 7000, interval) - return mockStrategyImpl + + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, &mockStrategyCreator{ + rate: 40, + interval: 7000, + t: t, + strategy: mockStrategyImpl, }) limiter := GetMethodServiceTpsLimiter() @@ -123,13 +127,28 @@ func TestMethodServiceTpsLimiterImpl_IsAllowable_Both_Method_And_Service(t *test mockStrategyImpl := NewMockTpsLimitStrategy(ctrl) mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1) - extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, func(rate int, interval int) tps.TpsLimitStrategy { - assert.Equal(t, 40, rate) - assert.Equal(t, 3000, interval) - return mockStrategyImpl + + extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, &mockStrategyCreator{ + rate: 40, + interval: 3000, + t: t, + strategy: mockStrategyImpl, }) limiter := GetMethodServiceTpsLimiter() result := limiter.IsAllowable(*invokeUrl, invoc) assert.True(t, result) } + +type mockStrategyCreator struct { + rate int + interval int + t *testing.T + strategy tps.TpsLimitStrategy +} + +func (creator *mockStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { + assert.Equal(creator.t, creator.rate, rate) + assert.Equal(creator.t, creator.interval, interval) + return creator.strategy +} diff --git a/filter/impl/tps/impl/tps_limiter_mock.go b/filter/impl/tps/impl/tps_limiter_mock.go index ff2f984e13a8617aefdbef0137ed8feca1bfd4ba..acd3a15d18baf10838faf57e141afe1711f0aebb 100644 --- a/filter/impl/tps/impl/tps_limiter_mock.go +++ b/filter/impl/tps/impl/tps_limiter_mock.go @@ -20,11 +20,17 @@ // Package filter is a generated GoMock package. package impl +import ( + reflect "reflect" +) + +import ( + gomock "github.com/golang/mock/gomock" +) + import ( common "github.com/apache/dubbo-go/common" protocol "github.com/apache/dubbo-go/protocol" - gomock "github.com/golang/mock/gomock" - reflect "reflect" ) // MockTpsLimiter is a mock of TpsLimiter interface diff --git a/filter/impl/tps/tps_limit_strategy.go b/filter/impl/tps/tps_limit_strategy.go index d1af85b464ca3cbb500100b895cdc0badff24898..c55f008a09b3743f728ab0506c6b0095cbfd181c 100644 --- a/filter/impl/tps/tps_limit_strategy.go +++ b/filter/impl/tps/tps_limit_strategy.go @@ -34,3 +34,7 @@ package tps type TpsLimitStrategy interface { IsAllowable() bool } + +type TpsLimitStrategyCreator interface { + Create(rate int, interval int) TpsLimitStrategy +} diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index ba74d86c0c38ba02ec5e87423e0fe8990dae486b..81f392565f701d990dc1783d5d467814a0fba5bf 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -113,7 +113,9 @@ type Options struct { RequestTimeout time.Duration } -type CallResponse struct { +//AsyncCallbackResponse async response for dubbo +type AsyncCallbackResponse struct { + common.CallbackResponse Opts Options Cause error Start time.Time // invoke(call) start time == write start time @@ -121,8 +123,6 @@ type CallResponse struct { Reply interface{} } -type AsyncCallback func(response CallResponse) - type Client struct { opts Options conf ClientConfig @@ -136,10 +136,10 @@ func NewClient(opt Options) *Client { switch { case opt.ConnectTimeout == 0: - opt.ConnectTimeout = 3e9 + opt.ConnectTimeout = 3 * time.Second fallthrough case opt.RequestTimeout == 0: - opt.RequestTimeout = 3e9 + opt.RequestTimeout = 3 * time.Second } c := &Client{ @@ -199,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error { return perrors.WithStack(c.call(ct, request, response, nil)) } -func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error { +func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error { return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) } -func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error { +func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error { p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index eb1f15c862a910120e118c06bf9b572e93f58832..3f8a8ee98c3b2d8b87e2d5469a18d1792578d1d6 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) { + r := response.(AsyncCallbackResponse) + assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }, NewResponse(user, nil)) assert.NoError(t, err) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index a878ffd91e29d6949870ec25fed9481f301b435a..758363117f1720a7fe89eb9745b415e506315db8 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -26,6 +26,7 @@ import ( import ( "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go/common" perrors "github.com/pkg/errors" ) @@ -109,7 +110,7 @@ type PendingResponse struct { err error start time.Time readStart time.Time - callback AsyncCallback + callback common.AsyncCallback response *Response done chan struct{} } @@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse { } } -func (r PendingResponse) GetCallResponse() CallResponse { - return CallResponse{ +func (r PendingResponse) GetCallResponse() common.CallbackResponse { + return AsyncCallbackResponse{ Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index bc321a97a4271c147d9317145d9f1aa76ca27902..6dcf2568fa8c88a864c567486a501c2ad7feb3f7 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -42,8 +42,8 @@ var ( type DubboInvoker struct { protocol.BaseInvoker - client *Client - destroyLock sync.Mutex + client *Client + quitOnce sync.Once } func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } response := NewResponse(inv.Reply(), nil) if async { - if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { + if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) @@ -97,19 +97,11 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } func (di *DubboInvoker) Destroy() { - if di.IsDestroyed() { - return - } - di.destroyLock.Lock() - defer di.destroyLock.Unlock() - - if di.IsDestroyed() { - return - } + di.quitOnce.Do(func() { + di.BaseInvoker.Destroy() - di.BaseInvoker.Destroy() - - if di.client != nil { - di.client.Close() // close client - } + if di.client != nil { + di.client.Close() + } + }) } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 0a765356f7353829c8486fddba986e3a444441a1..7d60090e2d81bcb750d1e6d79a08059687c7937d 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -28,6 +28,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -65,8 +66,9 @@ func TestDubboInvoker_Invoke(t *testing.T) { // AsyncCall lock := sync.Mutex{} lock.Lock() - inv.SetCallBack(func(response CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) + inv.SetCallBack(func(response common.CallbackResponse) { + r := response.(AsyncCallbackResponse) + assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(inv) diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 925baa2198d9917824c1be78b7cd0c2f93bfb894..056a93aaff1ec657db89f21b4a6b28efc354b49b 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -47,7 +47,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente } bcl.defaultConfiguratorFunc = f bcl.dynamicConfiguration.AddListener(key, listener) - if rawConfig, err := bcl.dynamicConfiguration.GetConfig(key, config_center.WithGroup(constant.DUBBO)); err != nil { + if rawConfig, err := bcl.dynamicConfiguration.GetInternalProperty(key, config_center.WithGroup(constant.DUBBO)); err != nil { //set configurators to empty bcl.configurators = []config_center.Configurator{} return diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 5a4cc2c66e506360c02b9289f0606692ac168a23..53a592609153003d7d6c24881bccde0dfe6cdde6 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -20,6 +20,7 @@ package zookeeper import ( "context" "strings" + "sync" ) import ( @@ -71,14 +72,16 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { } type RegistryConfigurationListener struct { - client *zk.ZookeeperClient - registry *zkRegistry - events chan *config_center.ConfigChangeEvent + client *zk.ZookeeperClient + registry *zkRegistry + events chan *config_center.ConfigChangeEvent + isClosed bool + closeOnce sync.Once } func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.wg.Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} + return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType @@ -109,13 +112,11 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { } } func (l *RegistryConfigurationListener) Close() { - if l.registry.IsAvailable() { - /** - * if the registry is not available, it means that the registry has been destroy - * so we don't need to call Done(), or it will cause the negative count panic for registry.wg - */ + // ensure that the listener will be closed at most once. + l.closeOnce.Do(func() { + l.isClosed = true l.registry.wg.Done() - } + }) } func (l *RegistryConfigurationListener) valid() bool { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 29ae51d44f3691807cbc74912290ba141d1f5d47..1defedc28a2d42183be8c2e5d77441d8831c1d30 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -46,6 +46,7 @@ import ( const ( RegistryZkClient = "zk registry" RegistryConnDelay = 3 + MaxWaitInterval = time.Duration(3e9) ) var ( @@ -200,6 +201,10 @@ func (r *zkRegistry) RestartCallBack() bool { } logger.Infof("success to re-register service :%v", confIf.Key()) } + r.listener = zookeeper.NewZkEventListener(r.client) + r.configListener = NewRegistryConfigurationListener(r.client, r) + r.dataListener = NewRegistryDataListener(r.configListener) + return flag } @@ -256,6 +261,10 @@ func (r *zkRegistry) Register(conf common.URL) error { return nil } +func (r *zkRegistry) service(c common.URL) string { + return url.QueryEscape(c.Service()) +} + func (r *zkRegistry) register(c common.URL) error { var ( err error @@ -291,7 +300,7 @@ func (r *zkRegistry) register(c common.URL) error { return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } // 鍏堝垱寤烘湇鍔′笅闈㈢殑provider node - dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER]) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -325,11 +334,11 @@ func (r *zkRegistry) register(c common.URL) error { encodedURL = url.QueryEscape(rawURL) // Print your own registration service providers. - dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.PROVIDER)).String()) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String()) logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) case common.CONSUMER: - dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.CONSUMER]) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -337,7 +346,7 @@ func (r *zkRegistry) register(c common.URL) error { logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return perrors.WithStack(err) } - dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), common.DubboNodes[common.PROVIDER]) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) r.cltLock.Lock() err = r.client.Create(dubboPath) r.cltLock.Unlock() @@ -354,7 +363,7 @@ func (r *zkRegistry) register(c common.URL) error { rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) encodedURL = url.QueryEscape(rawURL) - dubboPath = fmt.Sprintf("/dubbo/%s/%s", c.Service(), (common.RoleType(common.CONSUMER)).String()) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String()) logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) default: @@ -399,10 +408,19 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } +func sleepWait(n int) { + wait := time.Duration((n + 1) * 2e8) + if wait > MaxWaitInterval { + wait = MaxWaitInterval + } + time.Sleep(wait) +} //subscribe from registry func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { + n := 0 for { + n++ if !r.IsAvailable() { logger.Warnf("event listener game over.") return @@ -423,14 +441,14 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi if serviceEvent, err := listener.Next(); err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() - return + break } else { logger.Infof("update begin, service event: %v", serviceEvent.String()) notifyListener.Notify(serviceEvent) } } - + sleepWait(n) } } @@ -440,6 +458,10 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen ) r.listenerLock.Lock() + if r.configListener.isClosed { + r.listenerLock.Unlock() + return nil, perrors.New("configListener already been closed") + } zkListener = r.configListener r.listenerLock.Unlock() if r.listener == nil { @@ -461,7 +483,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen //Interested register to dataconfig. r.dataListener.AddInterestedURL(conf) for _, v := range strings.Split(conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") { - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, conf.Service()), r.dataListener) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, url.QueryEscape(conf.Service())), r.dataListener) } return zkListener, nil diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go index a90fbad05ae787f36d38607b0a73374d874e6994..aa627c7e8a53ef87fb39446b05d4001bcf18cf3f 100644 --- a/remoting/zookeeper/listener_test.go +++ b/remoting/zookeeper/listener_test.go @@ -18,6 +18,7 @@ package zookeeper import ( + "net/url" "sync" "testing" "time" @@ -122,3 +123,9 @@ func (m *mockDataListener) DataChange(eventType remoting.Event) bool { } return true } + +func TestZkPath(t *testing.T) { + zkPath := "io.grpc.examples.helloworld.GreeterGrpc$IGreeter" + zkPath = url.QueryEscape(zkPath) + assert.Equal(t, zkPath, "io.grpc.examples.helloworld.GreeterGrpc%24IGreeter") +}