diff --git a/README_CN.md b/README_CN.md index 552685c7bba9888e089f635f5812d93355e31715..b76d8983deae427f9317c4f930f0e06da479f484 100644 --- a/README_CN.md +++ b/README_CN.md @@ -180,7 +180,7 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic ## [User List](https://github.com/apache/dubbo-go/issues/2) -若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。 +若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。 <div> <table> diff --git a/before_ut.sh b/before_ut.sh index 210e9e723ba9e2118cf642729359808b78fddb8d..b55e424ef72b33181b2ea40fdb37ac319110aec0 100755 --- a/before_ut.sh +++ b/before_ut.sh @@ -36,5 +36,8 @@ cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar +mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar +cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar + mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar \ No newline at end of file diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index 8b38f2dd6136b4d31f46e7214c0ad1359537b198..ea186049642d8dbe42ff11997e0c154ff298ce6c 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -24,7 +24,6 @@ import ( ) import ( - _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/stretchr/testify/assert" ) @@ -34,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/zookeeper" ) diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 4ccc19e95521d03ae1f663ec276646cf30926533..7f4f14a8e47173253e2e5b7f4eed5db2bed64958 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -85,7 +85,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { return l, nil } -// Process Process config change event , generate routers and set them to the listenableRouter instance +// Process Process config change event, generate routers and set them to the listenableRouter instance func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) if remoting.EventTypeDel == event.ConfigType { diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 8144c83203dbe98778dd6bb8dcdb9888be664b3b..433abcb72eb6e201e64790af932e847d8faba8af 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -42,7 +42,7 @@ type FileTagRouter struct { force bool } -// NewFileTagRouter Create file tag router instance with content ( from config file) +// NewFileTagRouter Create file tag router instance with content (from config file) func NewFileTagRouter(content []byte) (*FileTagRouter, error) { fileRouter := &FileTagRouter{} rule, err := getRule(string(content)) diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 926446dcb2f18fa2fd4639a9246a85f435d75d45..5fb7ab151c1e89960152b6c8562ef08f173127df 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -22,9 +22,27 @@ import ( "github.com/apache/dubbo-go/common/yaml" ) +/** + * %YAML1.2 + * --- + * force: true + * runtime: false + * enabled: true + * priority: 1 + * key: demo-provider + * tags: + * - name: tag1 + * addresses: [ip1, ip2] + * - name: tag2 + * addresses: [ip3, ip4] + * ... + */ // RouterRule RouterRule config read from config file or config center type RouterRule struct { router.BaseRouterRule `yaml:",inline""` + Tags []Tag + addressToTagNames map[string][]string + tagNameToAddresses map[string][]string } func getRule(rawRule string) (*RouterRule, error) { @@ -34,5 +52,58 @@ func getRule(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule + r.init() return r, nil } + +func (t *RouterRule) init() { + t.addressToTagNames = make(map[string][]string, 8) + t.tagNameToAddresses = make(map[string][]string, 8) + for _, tag := range t.Tags { + for _, address := range tag.Addresses { + t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name) + } + t.tagNameToAddresses[tag.Name] = tag.Addresses + } +} + +func (t *RouterRule) getAddresses() []string { + var result = make([]string, 0, 8*len(t.Tags)) + for _, tag := range t.Tags { + result = append(result, tag.Addresses...) + } + return result +} + +func (t *RouterRule) getTagNames() []string { + var result = make([]string, 0, len(t.Tags)) + for _, tag := range t.Tags { + result = append(result, tag.Name) + } + return result +} + +func (t *RouterRule) hasTag(tag string) bool { + for _, t := range t.Tags { + if tag == t.Name { + return true + } + } + return false +} + +func (t *RouterRule) getAddressToTagNames() map[string][]string { + return t.addressToTagNames +} + +func (t *RouterRule) getTagNameToAddresses() map[string][]string { + return t.tagNameToAddresses +} + +func (t *RouterRule) getTags() []Tag { + return t.Tags +} + +func (t *RouterRule) setTags(tags []Tag) { + t.Tags = tags +} diff --git a/cluster/router/tag/router_rule_test.go b/cluster/router/tag/router_rule_test.go index 2df65193f9d0cf607258f3080e22b42cd6e9b16a..4e0f5b729ee3b74c60ff0e97b98ea0b26763bb89 100644 --- a/cluster/router/tag/router_rule_test.go +++ b/cluster/router/tag/router_rule_test.go @@ -22,19 +22,56 @@ import ( ) import ( - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) -func TestGetRule(t *testing.T) { +type RuleTestSuite struct { + suite.Suite + rule *RouterRule +} + +func (suite *RuleTestSuite) SetupTest() { + var err error yml := ` scope: application -runtime: true force: true +runtime: false +enabled: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: [ip1, ip2] + - name: tag2 + addresses: [ip3, ip4] ` - rule, e := getRule(yml) - assert.Nil(t, e) - assert.NotNil(t, rule) - assert.Equal(t, true, rule.Force) - assert.Equal(t, true, rule.Runtime) - assert.Equal(t, "application", rule.Scope) + suite.rule, err = getRule(yml) + suite.Nil(err) +} + +func (suite *RuleTestSuite) TestGetRule() { + var err error + suite.Equal(true, suite.rule.Force) + suite.Equal(false, suite.rule.Runtime) + suite.Equal("application", suite.rule.Scope) + suite.Equal(1, suite.rule.Priority) + suite.Equal("demo-provider", suite.rule.Key) + suite.Nil(err) +} + +func (suite *RuleTestSuite) TestGetTagNames() { + suite.Equal([]string{"tag1", "tag2"}, suite.rule.getTagNames()) +} + +func (suite *RuleTestSuite) TestGetAddresses() { + suite.Equal([]string{"ip1", "ip2", "ip3", "ip4"}, suite.rule.getAddresses()) +} + +func (suite *RuleTestSuite) TestHasTag() { + suite.Equal(true, suite.rule.hasTag("tag1")) + suite.Equal(false, suite.rule.hasTag("tag404")) +} + +func TestRuleTestSuite(t *testing.T) { + suite.Run(t, new(RuleTestSuite)) } diff --git a/cluster/router/tag/tag.go b/cluster/router/tag/tag.go new file mode 100644 index 0000000000000000000000000000000000000000..73d10b5db4a1664c4d9d014045b475fac757684b --- /dev/null +++ b/cluster/router/tag/tag.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tag + +type Tag struct { + Name string + Addresses []string +} + +func (t *Tag) getName() string { + return t.Name +} + +func (t *Tag) setName(name string) { + t.Name = name +} + +func (t *Tag) getAddresses() []string { + return t.Addresses +} + +func (t *Tag) setAddresses(addresses []string) { + t.Addresses = addresses +} diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index e1376fd96a88246e7ad0e26d2e3c34693c88a77c..ece950ebc0cf6973431d80c2c98e33e3c690cfc9 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -18,7 +18,10 @@ package tag import ( + "errors" + "net" "strconv" + "strings" ) import ( @@ -27,15 +30,21 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting" ) // tagRouter defines url, enable and the priority type tagRouter struct { - url *common.URL - enabled bool - priority int64 + url *common.URL + tagRouterRule *RouterRule + enabled bool + priority int64 + application string } // NewTagRouter returns a tagRouter instance if url is not nil @@ -55,6 +64,15 @@ func (c *tagRouter) isEnabled() bool { return c.enabled } +func (c *tagRouter) SetApplication(app string) { + c.application = app +} + +func (c *tagRouter) tagRouterRuleCopy() RouterRule { + routerRule := *c.tagRouterRule + return routerRule +} + // Route gets a list of invoker func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if !c.isEnabled() { @@ -63,7 +81,152 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati if len(invokers) == 0 { return invokers } - return filterUsingStaticTag(invokers, url, invocation) + if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled { + return filterUsingStaticTag(invokers, url, invocation) + } + // since the rule can be changed by config center, we should copy one to use. + tagRouterRuleCopy := c.tagRouterRuleCopy() + tag, ok := invocation.Attachments()[constant.Tagkey] + if !ok { + tag = url.GetParam(constant.Tagkey, "") + } + var ( + result []protocol.Invoker + addresses []string + ) + // if we are requesting for a Provider with a specific tag + if len(tag) > 0 { + addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] + // filter by dynamic tag group first + if len(addresses) > 0 { + filterAddressMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) { + return true + } + return false + } + result = filterInvoker(invokers, filterAddressMatches) + if len(result) > 0 || tagRouterRuleCopy.Force { + return result + } + } else { + // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by + // dynamic tag group but force=false. check static tag + filter := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag { + return true + } + return false + } + result = filterInvoker(invokers, filter) + } + // If there's no tagged providers that can match the current tagged request. force.tag is set by default + // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. + if len(result) > 0 || isForceUseTag(url, invocation) { + return result + } else { + // FAILOVER: return all Providers without any tags. + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) { + return true + } + return false + } + filterTagIsEmpty := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" { + return true + } + return false + } + return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty) + } + } else { + // return all addresses in dynamic tag group. + addresses = tagRouterRuleCopy.getAddresses() + if len(addresses) > 0 { + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) { + return true + } + return false + } + result = filterInvoker(invokers, filterAddressNotMatches) + // 1. all addresses are in dynamic tag group, return empty list. + if len(result) == 0 { + return result + } + } + // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the + // static tag group. + filter := func(invoker protocol.Invoker) bool { + localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") + return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) + } + return filterInvoker(result, filter) + } +} + +func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + if remoting.EventTypeDel == event.ConfigType { + c.tagRouterRule = nil + return + } else { + content, ok := event.Value.(string) + if !ok { + logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) + return + } + + routerRule, err := getRule(content) + if err != nil { + logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err) + return + } + c.tagRouterRule = routerRule + return + } +} + +func (c *tagRouter) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + invoker := invokers[0] + url := invoker.GetUrl() + providerApplication := url.GetParam(constant.RemoteApplicationKey, "") + if providerApplication == "" { + logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " + + "in this TagRouter is not specified.") + return + } + dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please") + return + } + + if providerApplication != c.application { + dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c) + } + + routerKey := providerApplication + constant.TagRouterRuleSuffix + dynamicConfiguration.AddListener(routerKey, c) + //get rule + rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP)) + if len(rule) == 0 || err != nil { + logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err) + return + } + if rule != "" { + c.Process(&config_center.ConfigChangeEvent{ + Key: routerKey, + Value: rule, + ConfigType: remoting.EventTypeUpdate}) + } } // URL gets the url of tagRouter @@ -76,7 +239,7 @@ func (c *tagRouter) Priority() int64 { return c.priority } -// filterUsingStaticTag gets a list of invoker using static tag +// filterUsingStaticTag gets a list of invoker using static tag, If there's no dynamic tag rule being set, use static tag in URL func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if tag, ok := invocation.Attachments()[constant.Tagkey]; ok { result := make([]protocol.Invoker, 0, 8) @@ -100,3 +263,163 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { } return false } + +type filter func(protocol.Invoker) bool + +func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker { + var res []protocol.Invoker +OUTER: + for _, invoker := range invokers { + for _, filter := range filters { + if !filter(invoker) { + continue OUTER + } + } + res = append(res, invoker) + } + return res +} + +// TODO: need move to dubbogo/gost +func checkAddressMatch(addresses []string, host, port string) bool { + for _, address := range addresses { + if matchIp(address, host, port) { + return true + } + if address == net.JoinHostPort(constant.ANYHOST_VALUE, port) { + return true + } + } + return false +} + +func matchIp(pattern, host, port string) bool { + // if the pattern is subnet format, it will not be allowed to config port param in pattern. + if strings.Contains(pattern, "/") { + _, subnet, _ := net.ParseCIDR(pattern) + if subnet != nil && subnet.Contains(net.ParseIP(host)) { + return true + } + return false + } + return matchIpRange(pattern, host, port) +} + +func matchIpRange(pattern, host, port string) bool { + if pattern == "" || host == "" { + logger.Error("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host) + return false + } + + pattern = strings.TrimSpace(pattern) + if "*.*.*.*" == pattern || "*" == pattern { + return true + } + + isIpv4 := true + ip4 := net.ParseIP(host).To4() + + if ip4 == nil { + isIpv4 = false + } + + hostAndPort := getPatternHostAndPort(pattern, isIpv4) + if hostAndPort[1] != "" && hostAndPort[1] != port { + return false + } + + pattern = hostAndPort[0] + // TODO 常量化 + splitCharacter := "." + if !isIpv4 { + splitCharacter = ":" + } + + mask := strings.Split(pattern, splitCharacter) + // check format of pattern + if err := checkHostPattern(pattern, mask, isIpv4); err != nil { + logger.Error(err) + return false + } + + if pattern == host { + return true + } + + // short name condition + if !ipPatternContains(pattern) { + return pattern == host + } + + ipAddress := strings.Split(host, splitCharacter) + for i := 0; i < len(mask); i++ { + if "*" == mask[i] || mask[i] == ipAddress[i] { + continue + } else if strings.Contains(mask[i], "-") { + rangeNumStrs := strings.Split(mask[i], "-") + if len(rangeNumStrs) != 2 { + logger.Error("There is wrong format of ip Address: " + mask[i]) + return false + } + min := getNumOfIpSegment(rangeNumStrs[0], isIpv4) + max := getNumOfIpSegment(rangeNumStrs[1], isIpv4) + ip := getNumOfIpSegment(ipAddress[i], isIpv4) + if ip < min || ip > max { + return false + } + } else if "0" == ipAddress[i] && "0" == mask[i] || "00" == mask[i] || "000" == mask[i] || "0000" == mask[i] { + continue + } else if mask[i] != ipAddress[i] { + return false + } + } + return true +} + +func ipPatternContains(pattern string) bool { + return strings.Contains(pattern, "*") || strings.Contains(pattern, "-") +} + +func checkHostPattern(pattern string, mask []string, isIpv4 bool) error { + if !isIpv4 { + if len(mask) != 8 && ipPatternContains(pattern) { + return errors.New("If you config ip expression that contains '*' or '-', please fill qualified ip pattern like 234e:0:4567:0:0:0:3d:*. ") + } + if len(mask) != 8 && !strings.Contains(pattern, "::") { + return errors.New("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern) + } + } else { + if len(mask) != 4 { + return errors.New("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern) + } + } + return nil +} + +func getPatternHostAndPort(pattern string, isIpv4 bool) []string { + result := make([]string, 2) + if strings.HasPrefix(pattern, "[") && strings.Contains(pattern, "]:") { + end := strings.Index(pattern, "]:") + result[0] = pattern[1:end] + result[1] = pattern[end+2:] + } else if strings.HasPrefix(pattern, "[") && strings.HasSuffix(pattern, "]") { + result[0] = pattern[1 : len(pattern)-1] + result[1] = "" + } else if isIpv4 && strings.Contains(pattern, ":") { + end := strings.Index(pattern, ":") + result[0] = pattern[:end] + result[1] = pattern[end+1:] + } else { + result[0] = pattern + } + return result +} + +func getNumOfIpSegment(ipSegment string, isIpv4 bool) int { + if isIpv4 { + ipSeg, _ := strconv.Atoi(ipSegment) + return ipSeg + } + ipSeg, _ := strconv.ParseInt(ipSegment, 0, 16) + return int(ipSeg) +} diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 000b3ec6724d85590c86456a009d5194c4e71e03..e5ddc2890c1512baa84d6a2195ecfda243f1d66b 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -19,25 +19,43 @@ package tag import ( "context" + "fmt" + "github.com/stretchr/testify/suite" "testing" + "time" ) import ( + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/zookeeper" ) const ( - tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou" - tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai" - tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing" - tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true" - tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true" + tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou&remote.application=test-tag" + tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20002/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai&remote.application=test-tag" + tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing&remote.application=test-tag" + tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20004/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing&remote.application=test-tag" + tagRouterTestUserConsumer = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true&remote.application=test-tag" + + tagRouterTestDynamicIpv4Provider1 = "dubbo://127.0.0.1:20001/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider2 = "dubbo://127.0.0.1:20002/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider3 = "dubbo://127.0.0.1:20003/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider4 = "dubbo://127.0.0.1:20004/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag4" + tagRouterTestDynamicIpv4Provider5 = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag5" tagRouterTestDubboTag = "dubbo.tag" tagRouterTestDubboForceTag = "dubbo.force.tag" @@ -45,6 +63,15 @@ const ( tagRouterTestGuangZhou = "guangzhou" tagRouterTestFalse = "false" tagRouterTestTrue = "true" + + routerPath = "/dubbo/config/dubbo/test-tag.tag-router" + routerLocalIP = "127.0.0.1" + routerZk = "zookeeper" +) + +var ( + zkFormat = "zookeeper://%s:%d" + conditionFormat = "condition://%s/com.foo.BarService" ) // MockInvoker is only mock the Invoker to support test tagRouter @@ -160,3 +187,211 @@ func TestTagRouterRouteNoForce(t *testing.T) { invRst2 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 3, len(invRst2)) } + +func TestFilterInvoker(t *testing.T) { + u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) + u3, e3 := common.NewURL(tagRouterTestShangHaiUrl) + u4, e4 := common.NewURL(tagRouterTestBeijingUrl) + u5, e5 := common.NewURL(tagRouterTestEnabledBeijingUrl) + assert.Nil(t, e2) + assert.Nil(t, e3) + assert.Nil(t, e4) + assert.Nil(t, e5) + inv2 := NewMockInvoker(u2) + inv3 := NewMockInvoker(u3) + inv4 := NewMockInvoker(u4) + inv5 := NewMockInvoker(u5) + var invokers []protocol.Invoker + invokers = append(invokers, inv2, inv3, inv4, inv5) + filterTag := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" { + return true + } + return false + } + res := filterInvoker(invokers, filterTag) + assert.Equal(t, []protocol.Invoker{inv4, inv5}, res) + flag := true + filterEnabled := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParamBool(constant.RouterEnabled, false) == flag { + return true + } + return false + } + res2 := filterInvoker(invokers, filterTag, filterEnabled) + assert.Equal(t, []protocol.Invoker{inv4}, res2) +} + +type DynamicTagRouter struct { + suite.Suite + rule *RouterRule + + route *tagRouter + zkClient *zookeeper.ZookeeperClient + testCluster *zk.TestCluster + invokers []protocol.Invoker + url *common.URL +} + +func TestDynamicTagRouter(t *testing.T) { + dtg := &DynamicTagRouter{} + u1, _ := common.NewURL(tagRouterTestDynamicIpv4Provider1) + u2, _ := common.NewURL(tagRouterTestDynamicIpv4Provider2) + u3, _ := common.NewURL(tagRouterTestDynamicIpv4Provider3) + u4, _ := common.NewURL(tagRouterTestDynamicIpv4Provider4) + u5, _ := common.NewURL(tagRouterTestDynamicIpv4Provider5) + inv1 := NewMockInvoker(u1) + inv2 := NewMockInvoker(u2) + inv3 := NewMockInvoker(u3) + inv4 := NewMockInvoker(u4) + inv5 := NewMockInvoker(u5) + dtg.invokers = append(dtg.invokers, inv1, inv2, inv3, inv4, inv5) + suite.Run(t, dtg) +} + +func (suite *DynamicTagRouter) SetupTest() { + var err error + testYML := `enabled: true +scope: application +force: true +runtime: false +valid: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: ["127.0.0.1:20001"] + - name: tag2 + addresses: ["127.0.0.1:20002"] + - name: tag3 + addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] +` + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + suite.NoError(err) + err = z.Create(routerPath) + suite.NoError(err) + + suite.zkClient = z + suite.testCluster = ts + + _, err = z.Conn.Set(routerPath, []byte(testYML), 0) + suite.NoError(err) + + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + suite.Nil(err) + suite.NotNil(configuration) + + url, e1 := common.NewURL(tagRouterTestUserConsumerTag) + suite.Nil(e1) + + tagRouter, err := NewTagRouter(&url) + suite.Nil(err) + suite.NotNil(tagRouter) + suite.route = tagRouter + suite.url = &url +} + +func (suite *DynamicTagRouter) TearDownTest() { + suite.zkClient.Close() + suite.testCluster.Stop() +} + +func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() { + invokers := suite.invokers + suite.route.Notify(invokers) + suite.NotNil(suite.route.tagRouterRule) + + consumer := &invocation.RPCInvocation{} + consumer.SetAttachments(tagRouterTestDubboTag, "tag1") + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[0]) + + consumer.SetAttachments(tagRouterTestDubboTag, "tag3") + targetInvokers = suite.route.Route(invokers, suite.url, consumer) + suite.Equal(2, len(targetInvokers)) + suite.Equal(targetInvokers, []protocol.Invoker{suite.invokers[2], suite.invokers[3]}) +} + +func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() { + invokers := suite.invokers + consumer := &invocation.RPCInvocation{} + consumer.SetAttachments(tagRouterTestDubboTag, "tag4") + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[3]) +} + +// Teas no tag and return a address are not in dynamic tag group +func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() { + invokers := suite.invokers + suite.route.Notify(invokers) + suite.NotNil(suite.route.tagRouterRule) + consumer := &invocation.RPCInvocation{} + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[4]) + // test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group. + consumer.SetAttachments(tagRouterTestDubboTag, "tag5") + targetInvokers = suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[4]) +} + +func (suite *DynamicTagRouter) TestTODO() { + testYML := `enabled: true +scope: application +force: true +runtime: false +valid: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: ["127.0.0.1:20001"] + - name: tag2 + addresses: ["127.0.0.1:20002"] + - name: tag3 + addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] +` + _, err := suite.zkClient.Conn.Set(routerPath, []byte(testYML), 1) + suite.NoError(err) + + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) +} + +func TestProcess(t *testing.T) { + u1, err := common.NewURL(tagRouterTestUserConsumerTag) + assert.Nil(t, err) + tagRouter, e := NewTagRouter(&u1) + assert.Nil(t, e) + assert.NotNil(t, tagRouter) + + testYML := ` +scope: application +force: true +runtime: false +enabled: true +valid: true +priority: 1 +key: demo-provider +tags: + - name: beijing + addresses: [192.168.1.1, 192.168.1.2] + - name: hangzhou + addresses: [192.168.1.3, 192.168.1.4] +` + tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd}) + assert.NotNil(t, tagRouter.tagRouterRule) + assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames()) + assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses()) + assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"]) + assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"]) + tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel}) + assert.Nil(t, tagRouter.tagRouterRule) +} diff --git a/common/constant/default.go b/common/constant/default.go index c1c404e089ea90899d2b599b01cd5980c3e92ab1..629aa32392a0151046eaaea67287618eae02158d 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -37,6 +37,7 @@ const ( DEFAULT_RETRIES_INT = 2 DEFAULT_PROTOCOL = "dubbo" DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_REG_TTL = "15m" DEFAULT_CLUSTER = "failover" DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 diff --git a/common/constant/key.go b/common/constant/key.go index f33c2ac5b78926452d027035e89b4a7d32f7b1fe..f7189b7c2589d1461e333828c5a18c8d6cc6bbd0 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -102,6 +102,7 @@ const ( ZONE_KEY = "zone" REGISTRY_ZONE = "registry_zone" REGISTRY_ZONE_FORCE = "registry_zone_force" + REGISTRY_TTL_KEY = "registry.ttl" ) const ( @@ -192,6 +193,9 @@ const ( HealthCheckRouterName = "health_check" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" + // TagRouterRuleSuffix Specify tag router suffix + TagRouterRuleSuffix = ".tag-router" + RemoteApplicationKey = "remote.application" // ConditionRouterRuleSuffix Specify condition router suffix ConditionRouterRuleSuffix = ".condition-router" diff --git a/config/registry_config.go b/config/registry_config.go index 53c0f9f5d24476f5b6cd396013d95d5b77befde8..2adae5d0379dd1571cf923001b5ed15cee219564 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -39,6 +39,7 @@ type RegistryConfig struct { // I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` + TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute // for registry Address string `yaml:"address" json:"address,omitempty" property:"address"` Username string `yaml:"username" json:"username,omitempty" property:"username"` @@ -130,6 +131,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred)) urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone) urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10)) + urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { urlMap.Set(k, v) } diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 6fe5c4d7df28a7693c732543140ed74f959dc77e..9a09b713fad11afdc06310a2d0072454342ccb0b 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -167,7 +167,6 @@ func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url commo "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ TimeoutMs: uint64(int32(timeout / time.Millisecond)), - ListenInterval: uint64(int32(timeout / time.Millisecond)), NotLoadCacheAtStart: true, LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, logDir), CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""), diff --git a/go.mod b/go.mod index 7a472daef7481be390c3debfa00b1d1cd5207c47..c19627378261221b8a9565730ed62f7cc367e5e2 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/mitchellh/hashstructure v1.0.0 // indirect github.com/mitchellh/mapstructure v1.2.3 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f + github.com/nacos-group/nacos-sdk-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.2.6+incompatible // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 424a11087bb9cad0c62488e8e3e96d64feec6d7d..aa6ecc86e25276eae2747aa5291f718713d164bd 100644 --- a/go.sum +++ b/go.sum @@ -516,8 +516,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f h1:gid5/0AkHvINWK69Fgbidb3BVIXqlf1YEm7wO0NVPsw= -github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f/go.mod h1:fti1GlX/EB6RDKvzK/P7Vuibqj0JMPJHQwrcTU1tLXk= +github.com/nacos-group/nacos-sdk-go v1.0.0 h1:CufUF7DZca2ZzIrJtMMCDih1sA58BWCglArLMCZArUc= +github.com/nacos-group/nacos-sdk-go v1.0.0/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= diff --git a/protocol/rest/server/server_impl/go_restful_server.go b/protocol/rest/server/server_impl/go_restful_server.go index c7d971fcaa5ada0ba02cc436b5ae6705793887ef..6fb9ee8daa7383580b9144ea25954f8ead974dcc 100644 --- a/protocol/rest/server/server_impl/go_restful_server.go +++ b/protocol/rest/server/server_impl/go_restful_server.go @@ -48,8 +48,8 @@ var filterSlice []restful.FilterFunction // GoRestfulServer a rest server implement by go-restful type GoRestfulServer struct { - srv *http.Server - container *restful.Container + srv *http.Server + ws *restful.WebService } // NewGoRestfulServer a constructor of GoRestfulServer @@ -60,13 +60,17 @@ func NewGoRestfulServer() server.RestServer { // Start go-restful server // It will add all go-restful filters func (grs *GoRestfulServer) Start(url common.URL) { - grs.container = restful.NewContainer() + container := restful.NewContainer() for _, filter := range filterSlice { - grs.container.Filter(filter) + container.Filter(filter) } grs.srv = &http.Server{ - Handler: grs.container, + Handler: container, } + grs.ws = &restful.WebService{} + grs.ws.Path("/") + grs.ws.SetDynamicRoutes(true) + container.Add(grs.ws) ln, err := net.Listen("tcp", url.Location) if err != nil { panic(perrors.New(fmt.Sprintf("Restful Server start error:%v", err))) @@ -83,23 +87,21 @@ func (grs *GoRestfulServer) Start(url common.URL) { // Publish a http api in go-restful server // The routeFunc should be invoked when the server receive a request func (grs *GoRestfulServer) Deploy(restMethodConfig *config.RestMethodConfig, routeFunc func(request server.RestServerRequest, response server.RestServerResponse)) { - ws := &restful.WebService{} + rf := func(req *restful.Request, resp *restful.Response) { routeFunc(NewGoRestfulRequestAdapter(req), resp) } - ws.Path(restMethodConfig.Path). + grs.ws.Route(grs.ws.Method(restMethodConfig.MethodType). Produces(strings.Split(restMethodConfig.Produces, ",")...). Consumes(strings.Split(restMethodConfig.Consumes, ",")...). - Route(ws.Method(restMethodConfig.MethodType).To(rf)) - grs.container.Add(ws) - + Path(restMethodConfig.Path).To(rf)) } // Delete a http api in go-restful server func (grs *GoRestfulServer) UnDeploy(restMethodConfig *config.RestMethodConfig) { ws := new(restful.WebService) ws.Path(restMethodConfig.Path) - err := grs.container.Remove(ws) + err := grs.ws.RemoveRoute(restMethodConfig.Path, restMethodConfig.MethodType) if err != nil { logger.Warnf("[Go restful] Remove web service error:%v", err) } diff --git a/protocol/rest/server/server_impl/go_restful_server_test.go b/protocol/rest/server/server_impl/go_restful_server_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b1e66063bf11f1fa805a83b0633a0e1aa8b38b0f --- /dev/null +++ b/protocol/rest/server/server_impl/go_restful_server_test.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server_impl + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol/rest/config" + "github.com/apache/dubbo-go/protocol/rest/server" +) + +func TestGoRestfulServerDeploySameUrl(t *testing.T) { + grs := NewGoRestfulServer() + url, err := common.NewURL("http://127.0.0.1:43121") + assert.NoError(t, err) + grs.Start(url) + rmc := &config.RestMethodConfig{ + Produces: "*/*", + Consumes: "*/*", + MethodType: "POST", + Path: "/test", + } + f := func(request server.RestServerRequest, response server.RestServerResponse) {} + grs.Deploy(rmc, f) + rmc1 := &config.RestMethodConfig{ + Produces: "*/*", + Consumes: "*/*", + MethodType: "GET", + Path: "/test", + } + grs.Deploy(rmc1, f) + grs.UnDeploy(rmc) + grs.UnDeploy(rmc1) + grs.Destroy() +} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 51d3e2f56abac8e4ab8b966870f1ff5bb79c4171..411090820c7682ab9c3b5576ea8ad5207c2c899f 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -59,6 +59,7 @@ func init() { type nacosRegistry struct { *common.URL namingClient naming_client.INamingClient + registryUrls []common.URL } func getCategory(url common.URL) string { @@ -128,6 +129,36 @@ func (nr *nacosRegistry) Register(url common.URL) error { if !isRegistry { return perrors.New("registry [" + serviceName + "] to nacos failed") } + nr.registryUrls = append(nr.registryUrls, url) + return nil +} + +func createDeregisterParam(url common.URL, serviceName string) vo.DeregisterInstanceParam { + if len(url.Ip) == 0 { + url.Ip = localIP + } + if len(url.Port) == 0 || url.Port == "0" { + url.Port = "80" + } + port, _ := strconv.Atoi(url.Port) + return vo.DeregisterInstanceParam{ + Ip: url.Ip, + Port: uint64(port), + ServiceName: serviceName, + Ephemeral: true, + } +} + +func (nr *nacosRegistry) DeRegister(url common.URL) error { + serviceName := getServiceName(url) + param := createDeregisterParam(url, serviceName) + isDeRegistry, err := nr.namingClient.DeregisterInstance(param) + if err != nil { + return err + } + if !isDeRegistry { + return perrors.New("DeRegistry [" + serviceName + "] to nacos failed") + } return nil } @@ -193,6 +224,13 @@ func (nr *nacosRegistry) IsAvailable() bool { // nolint func (nr *nacosRegistry) Destroy() { + for _, url := range nr.registryUrls { + err := nr.DeRegister(url) + logger.Infof("DeRegister Nacos URL:%+v", url) + if err != nil { + logger.Errorf("Deregister URL:%+v err:%v", url, err.Error()) + } + } return } @@ -209,6 +247,7 @@ func newNacosRegistry(url *common.URL) (registry.Registry, error) { registry := &nacosRegistry{ URL: url, namingClient: client, + registryUrls: []common.URL{}, } return registry, nil } diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 63d92d70fd5e1a00f0ce1ca95b1926fb9c36c84b..0e5ad8e6990856aeb0dfdde72f9c3f7fdae3e985 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -60,11 +60,20 @@ type nacosServiceDiscovery struct { // namingClient is the Nacos' client namingClient naming_client.INamingClient + // cache registry instances + registryInstances []registry.ServiceInstance } // Destroy will close the service discovery. // Actually, it only marks the naming client as null and then return func (n *nacosServiceDiscovery) Destroy() error { + for _, inst := range n.registryInstances { + err := n.Unregister(inst) + logger.Infof("Unregister nacos instance:%+v", inst) + if err != nil { + logger.Errorf("Unregister nacos instance:%+v, err:%+v", inst, err) + } + } n.namingClient = nil return nil } @@ -76,6 +85,7 @@ func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) erro if err != nil || !ok { return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName()) } + n.registryInstances = append(n.registryInstances, instance) return nil } @@ -118,8 +128,8 @@ func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet { return res } - for _, e := range services { - res.Add(e.Name) + for _, e := range services.Doms { + res.Add(e) } return res } @@ -334,8 +344,9 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) return &nacosServiceDiscovery{ - group: group, - namingClient: client, - descriptor: descriptor, + group: group, + namingClient: client, + descriptor: descriptor, + registryInstances: []registry.ServiceInstance{}, }, nil } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 8f2ac1023b8ad34938b9996b480e3bbc4adbaaea..e8ee51beb70b5a08ec60b213c5342ef52972c59f 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen var zkListener *RegistryConfigurationListener dataListener := r.dataListener + ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL) + conf.SetParam(constant.REGISTRY_TTL_KEY, ttl) dataListener.mutex.Lock() defer dataListener.mutex.Unlock() if r.dataListener.subscribed[conf.ServiceKey()] != nil { diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go index 1f398485b2f16defddf44ce1a08a7ecfd9760dd1..0b05b6e6e09501dfd25164e865a3eb6eef91dc9f 100644 --- a/remoting/kubernetes/listener_test.go +++ b/remoting/kubernetes/listener_test.go @@ -19,6 +19,7 @@ package kubernetes import ( "testing" + "time" ) import ( @@ -87,6 +88,7 @@ func TestListener(t *testing.T) { listener := NewEventListener(c) dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} listener.ListenServiceEvent("/dubbo", dataListener) + time.Sleep(1e9) for _, tc := range tests { diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9a4874db24696d90e4fcc7d9d987f5888f1be599..4f50c18ab61ba6faf373dfd0f831c14ae7ab6d5d 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -37,6 +37,10 @@ import ( "github.com/apache/dubbo-go/remoting" ) +var ( + defaultTTL = 15 * time.Minute +) + // nolint type ZkEventListener struct { client *ZookeeperClient @@ -197,10 +201,20 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen var ( failTimes int + ttl time.Duration event chan struct{} zkEvent zk.Event ) event = make(chan struct{}, 4) + ttl = defaultTTL + if conf != nil { + timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)) + if err == nil { + ttl = timeout + } else { + logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL) + } + } defer close(event) for { // get current children for a zkPath @@ -302,18 +316,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen }(dubboPath, listener) } } - select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue + // Periodically update provider information + ticker := time.NewTicker(ttl) + WATCH: + for { + select { + case <-ticker.C: + l.handleZkNodeEvent(zkEvent.Path, children, listener) + case zkEvent = <-childEventCh: + logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) + ticker.Stop() + if zkEvent.Type != zk.EventNodeChildrenChanged { + break WATCH + } + l.handleZkNodeEvent(zkEvent.Path, children, listener) + break WATCH + case <-l.client.Done(): + logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + ticker.Stop() + return } - l.handleZkNodeEvent(zkEvent.Path, children, listener) - case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return } + } }