Skip to content
Snippets Groups Projects
Commit 28437d47 authored by Ian Luo's avatar Ian Luo
Browse files

Merge branch 'develop' into bitmap-router

parents 6686481b 279aaa25
No related branches found
No related tags found
No related merge requests found
Showing
with 202 additions and 15 deletions
Apache Dubbo Go
Apache Dubbo-go
Copyright 2018-2020 The Apache Software Foundation
This product includes software developed at
......
......@@ -34,7 +34,7 @@ Both extension module and layered project architecture is according to Apache Du
![dubbo go extend](./doc/pic/arch/dubbo-go-ext.png)
If you wanna know more about dubbo-go, please visit this reference [Project Architeture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
If you wanna know more about dubbo-go, please visit this reference [Project Architecture design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
## Feature list ##
......
......@@ -38,6 +38,7 @@ func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluser *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewAvailableClusterInvoker(directory)
}
......@@ -17,6 +17,10 @@
package cluster_impl
import (
"context"
)
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
......@@ -36,6 +40,7 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
......@@ -146,6 +151,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return selectedInvoker
}
func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)
result := invoker.interceptor.DoInvoke(ctx, invocation)
invoker.interceptor.AfterInvoker(ctx, invocation)
return result
}
return nil
}
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
......
......@@ -39,6 +39,7 @@ func NewBroadcastCluster() cluster.Cluster {
return &broadcastCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newBroadcastClusterInvoker(directory)
}
......@@ -36,6 +36,7 @@ func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *broadcastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
......
......@@ -39,6 +39,7 @@ func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
}
......@@ -126,6 +126,7 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err
}
}
// nolint
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {
......
......@@ -39,6 +39,7 @@ func NewFailFastCluster() cluster.Cluster {
return &failfastCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
}
......@@ -35,6 +35,7 @@ func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
......
......@@ -19,16 +19,15 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type failoverCluster struct{}
const name = "failover"
func init() {
extension.SetCluster(name, NewFailoverCluster)
extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster)
}
// NewFailoverCluster returns a failover cluster instance
......@@ -40,6 +39,7 @@ func NewFailoverCluster() cluster.Cluster {
return &failoverCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
}
......@@ -19,6 +19,7 @@ package cluster_impl
import (
"context"
"fmt"
"strconv"
)
......@@ -44,6 +45,7 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
......@@ -91,8 +93,10 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
invokerSvc := invoker.GetUrl().Service()
invokerUrl := invoker.directory.GetUrl()
return &protocol.RPCResult{
Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invokerSvc, retries, providers, len(providers), len(invokers), invokerUrl, ip, constant.Version, result.Error().Error(),
Err: perrors.Wrap(result.Error(), fmt.Sprintf("Failed to invoke the method %v in the service %v. "+
"Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. "+
"Last error is %+v.", methodName, invokerSvc, retries, providers, len(providers), len(invokers),
invokerUrl, ip, constant.Version, result.Error().Error()),
)}
}
......
......@@ -43,6 +43,7 @@ import (
// mock invoker
// ///////////////////////////
// nolint
type MockInvoker struct {
url common.URL
available bool
......@@ -51,6 +52,7 @@ type MockInvoker struct {
successCount int
}
// nolint
func NewMockInvoker(url common.URL, successCount int) *MockInvoker {
return &MockInvoker{
url: url,
......@@ -60,23 +62,28 @@ func NewMockInvoker(url common.URL, successCount int) *MockInvoker {
}
}
// nolint
func (bi *MockInvoker) GetUrl() common.URL {
return bi.url
}
// nolint
func (bi *MockInvoker) IsAvailable() bool {
return bi.available
}
// nolint
func (bi *MockInvoker) IsDestroyed() bool {
return bi.destroyed
}
// nolint
type rest struct {
tried int
success bool
}
// nolint
func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result {
count++
var (
......@@ -93,14 +100,17 @@ func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation)
return result
}
// nolint
func (bi *MockInvoker) Destroy() {
logger.Infof("Destroy invoker: %v", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
}
// nolint
var count int
// nolint
func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
......@@ -119,6 +129,7 @@ func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocat
return clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
}
// nolint
func TestFailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(3, urlParams)
......@@ -126,6 +137,7 @@ func TestFailoverInvokeSuccess(t *testing.T) {
count = 0
}
// nolint
func TestFailoverInvokeFail(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(4, urlParams)
......@@ -133,6 +145,7 @@ func TestFailoverInvokeFail(t *testing.T) {
count = 0
}
// nolint
func TestFailoverInvoke1(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3")
......@@ -141,6 +154,7 @@ func TestFailoverInvoke1(t *testing.T) {
count = 0
}
// nolint
func TestFailoverInvoke2(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "2")
......@@ -152,6 +166,7 @@ func TestFailoverInvoke2(t *testing.T) {
count = 0
}
// nolint
func TestFailoverDestroy(t *testing.T) {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
......
......@@ -39,6 +39,7 @@ func NewFailsafeCluster() cluster.Cluster {
return &failsafeCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailsafeClusterInvoker(directory)
}
......@@ -45,6 +45,7 @@ func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// nolint
func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
......
......@@ -39,6 +39,7 @@ func NewForkingCluster() cluster.Cluster {
return &forkingCluster{}
}
// Join returns a baseClusterInvoker instance
func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
return newForkingClusterInvoker(directory)
}
......@@ -44,7 +44,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
// Invoke ...
// nolint
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
......
......@@ -33,6 +33,7 @@ func NewMockCluster() cluster.Cluster {
return &mockCluster{}
}
// nolint
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetUrl())
}
......@@ -19,21 +19,26 @@ package cluster_impl
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
type registryAwareCluster struct{}
type zoneAwareCluster struct{}
func init() {
extension.SetCluster("registryAware", NewRegistryAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster)
}
// NewRegistryAwareCluster returns a registry aware cluster instance
func NewRegistryAwareCluster() cluster.Cluster {
return &registryAwareCluster{}
// NewZoneAwareCluster returns a zoneaware cluster instance.
//
// More than one registry for subscription.
// Usually it is used for choose between registries.
func NewZoneAwareCluster() cluster.Cluster {
return &zoneAwareCluster{}
}
func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory)
// Join returns a zoneAwareClusterInvoker instance
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newZoneAwareClusterInvoker(directory)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cluster_impl
import (
"context"
"fmt"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
// When there're more than one registry for subscription.
//
// This extension provides a strategy to decide how to distribute traffics among them:
// 1. registry marked as 'preferred=true' has the highest priority.
// 2. check the zone the current request belongs, pick the registry that has the same zone first.
// 3. Evenly balance traffic between all registries based on each registry's weight.
// 4. Pick anyone that's available.
type zoneAwareClusterInvoker struct {
baseClusterInvoker
}
func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoke := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add self to interceptor
invoke.interceptor = invoke
return invoke
}
// nolint
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
// First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key.
for _, invoker := range invokers {
key := constant.REGISTRY_KEY + "." + constant.PREFERRED_KEY
if invoker.IsAvailable() && matchParam("true", key, "false", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
// providers in the registry with the same zone
key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY
zone := invocation.AttachmentsByKey(key, "")
if "" != zone {
for _, invoker := range invokers {
if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "")
if "true" == force {
return &protocol.RPCResult{
Err: fmt.Errorf("no registry instance in zone or "+
"no available providers in the registry, zone: %v, "+
" registries: %v", zone, invoker.GetUrl()),
}
}
}
// load balance among all registries, with registry weight count in.
loadBalance := getLoadBalance(invokers[0], invocation)
ivk := invoker.doSelect(loadBalance, invocation, invokers, nil)
if ivk != nil && ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
}
// If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{
Err: fmt.Errorf("no provider available in %v", invokers),
}
}
func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)
if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}
func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
}
func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetUrl().GetParam(key, def)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment