diff --git a/cluster/router/selfDiscovery/factory.go b/cluster/router/selfDiscovery/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..dd035a6659b5f0ed3ad3e8330359427b84308019 --- /dev/null +++ b/cluster/router/selfDiscovery/factory.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package self_disc + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.SelfDiscoveryRouterName, newSelfDiscRouteFactory) +} + +// SelfDiscRouteFactory +type SelfDiscRouteFactory struct { +} + +// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory +func newSelfDiscRouteFactory() router.PriorityRouterFactory { + return &SelfDiscRouteFactory{} +} + +// NewPriorityRouter construct a new NewHealthCheckRouter via url +func (f *SelfDiscRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { + return NewSelfDiscRouter(url) +} diff --git a/cluster/router/selfDiscovery/factory_test.go b/cluster/router/selfDiscovery/factory_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4fcdd32f16b21e8b8e44267392a2e9b2a40bdbb5 --- /dev/null +++ b/cluster/router/selfDiscovery/factory_test.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package self_disc + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// nolint +type MockInvoker struct { + url *common.URL +} + +// nolint +func NewMockInvoker(url *common.URL) *MockInvoker { + return &MockInvoker{ + url: url, + } +} + +// nolint +func (bi *MockInvoker) GetUrl() *common.URL { + return bi.url +} + +// nolint +func (bi *MockInvoker) IsAvailable() bool { + return true +} + +// nolint +func (bi *MockInvoker) IsDestroyed() bool { + return true +} + +// nolint +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { + return nil +} + +// nolint +func (bi *MockInvoker) Destroy() { +} + +// nolint +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newSelfDiscRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/selfDiscovery/self_disc_route.go b/cluster/router/selfDiscovery/self_disc_route.go new file mode 100644 index 0000000000000000000000000000000000000000..ca697bfc43fac77e65391904d9e2f87ce183260f --- /dev/null +++ b/cluster/router/selfDiscovery/self_disc_route.go @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package self_disc + +import ( + "github.com/RoaringBitmap/roaring" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" + selfDesc = "self-desc" + name = "health-check-router" +) + +// SelfDiscRouter provides a health-first routing mechanism through HealthChecker +type SelfDiscRouter struct { + url *common.URL + localIP string +} + +// NewSelfDiscRouter construct an HealthCheckRouter via url +func NewSelfDiscRouter(url *common.URL) (router.PriorityRouter, error) { + r := &SelfDiscRouter{ + url: url, + localIP: url.Ip, + } + return r, nil +} + +// Route gets a list of healthy invoker +func (r *SelfDiscRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { + addrPool := cache.FindAddrPool(r) + // Add healthy invoker to the list + selectedInvokers := utils.JoinIfNotEqual(addrPool[selfDesc], invokers) + // If all invokers are considered unhealthy, downgrade to all invoker + if selectedInvokers.IsEmpty() { + logger.Warnf(" Now all invokers are not match, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } + return selectedInvokers +} + +// Pool separates healthy invokers from others. +func (r *SelfDiscRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { + rb := make(router.AddrPool, 8) + rb[selfDesc] = roaring.NewBitmap() + selfDescFound := false + logger.Debug("local ip = ", r.localIP) + for i, invoker := range invokers { + if invoker.GetUrl().Ip == r.localIP { + rb[selfDesc].Add(uint32(i)) + selfDescFound = true + } + } + if selfDescFound { + // found self desc + logger.Debug("found self desc ") + return rb, nil + } + for i, _ := range invokers { + rb[selfDesc].Add(uint32(i)) + } + return rb, nil +} + +// ShouldPool will always return true to make sure healthy check constantly. +func (r *SelfDiscRouter) ShouldPool() bool { + return true +} + +func (r *SelfDiscRouter) Name() string { + return name +} + +// Priority +func (r *SelfDiscRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *SelfDiscRouter) URL() *common.URL { + return r.url +} diff --git a/cluster/router/selfDiscovery/self_disc_route_test.go b/cluster/router/selfDiscovery/self_disc_route_test.go new file mode 100644 index 0000000000000000000000000000000000000000..fcb90948402097b5735b834ba57bdd5675817f1d --- /dev/null +++ b/cluster/router/selfDiscovery/self_disc_route_test.go @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package self_disc + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + healthCheckRoute1010IP = "192.168.10.10" + healthCheckRoute1011IP = "192.168.10.11" + healthCheckRoute1012IP = "192.168.10.12" + healthCheckRouteMethodNameTest = "test" + healthCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" + healthCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestHealthCheckRouterRoute(t *testing.T) { + defer protocol.CleanAllStatus() + consumerURL, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) + url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) + url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) + url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) + hcr, _ := NewSelfDiscRouter(consumerURL) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1) + invoker2 := NewMockInvoker(url2) + invoker3 := NewMockInvoker(url3) + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation(healthCheckRouteMethodNameTest, nil, nil) + res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*SelfDiscRouter), invokers), consumerURL, inv) + // now all invokers are healthy + assert.True(t, len(res.ToArray()) == 1) + + invokers = invokers[1:] + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*SelfDiscRouter), invokers), consumerURL, inv) + assert.True(t, len(res.ToArray()) == 2) +} + +func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { + pool, info := r.Pool(addrs) + cache := chain.BuildCache(addrs) + cache.SetAddrMeta(r.Name(), info) + cache.SetAddrPool(r.Name(), pool) + return cache +} diff --git a/common/constant/key.go b/common/constant/key.go index 0515094f285a4bf598b04e2ea1ef376325de7ac1..13ff1eeb5531c677790509a8abbf6da05bad2888 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -214,6 +214,8 @@ const ( ListenableRouterName = "listenable" // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" + // SelfDiscoveryRouterName Specify the name of HealthCheckRouter + SelfDiscoveryRouterName = "self_disc" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" // TagRouterRuleSuffix Specify tag router suffix