Skip to content
Snippets Groups Projects
Unverified Commit 7cbe7c38 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #55 from dubbo/develop

Rft: refactoring code
parents e17bddf7 8962cdd9
No related branches found
No related tags found
No related merge requests found
Showing
with 762 additions and 422 deletions
......@@ -18,7 +18,7 @@ classes
# Gopkg.lock
# vendor/
vendor/
logs/
language: go
go:
- "1.11"
- "1.12"
env:
- GO111MODULE=on
install: true
script:
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash)
# GO for Apache Dubbo #
# Go for Apache Dubbo [中文](./README_CN.md) #
[![Build Status](https://travis-ci.com/dubbo/go-for-apache-dubbo.svg?branch=master)](https://travis-ci.com/dubbo/go-for-apache-dubbo)
[![codecov](https://codecov.io/gh/dubbo/go-for-apache-dubbo/branch/master/graph/badge.svg)](https://codecov.io/gh/dubbo/go-for-apache-dubbo)
---
Apache Dubbo Golang Implementation.
Apache Dubbo Go Implementation.
## License
Apache License, Version 2.0
## Code design ##
Extension module and layered code design based on dubbo (include protocol layer,registry layer,cluster layer,config layer and so on), Our goal is: you can implement these layered interfaces in a new way, and override the default implementation of dubbo-go[same go-for-apache-dubbo] by calling 'extension.SetXXX' of extension, and complete your special needs without modifying the source code. At the same time, you are welcome to contribute implementation of useful expansion to the community.
![frame design](https://raw.githubusercontent.com/wiki/dubbo/dubbo-go/dubbo-go%E4%BB%A3%E7%A0%81%E5%88%86%E5%B1%82%E8%AE%BE%E8%AE%A1.png)
About detail design please refer to [code layered design](https://github.com/dubbo/go-for-apache-dubbo/wiki/dubbo-go-V2.6-design)
## Feature list ##
+ 1 Transport: HTTP(√)
+ 2 Codec: JsonRPC(√), Hessian(√)
+ 3 Service discovery:Service Register(√), Service Watch(√)
+ 4 Registry: ZooKeeper(√), Etcd(X), Redis(X)
+ 5 Strategy: Failover(√), Failfast(√)
+ 6 Load Balance: Random(√), RoundRobin(√)
+ 7 Role: Consumer(√), Provider(√)
Finished List:
- Role: Consumer(√), Provider(√)
- Transport: HTTP(√), TCP(√)
- Codec: JsonRPC v2(√), Hessian v2(√)
- Registry: ZooKeeper(√)
- Cluster Strategy: Failover(√)
- Load Balance: Random(√)
- Filter: Echo Health Check(√)
Working List:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: RoundRobin/LeastActive/ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul
Todo List:
- routing rule (dubbo v2.6.x)
- monitoring (dubbo v2.6.x)
- dynamic configuration (dubbo v2.7.x)
- metrics (dubbo v2.7.x) waiting dubbo's quota
You can know more about dubbo-go by its [roadmap](https://github.com/dubbo/go-for-apache-dubbo/wiki/Roadmap).
## Quick Start
The subdirectory examples shows how to use dubbo-go. Please read the [examples/README.md](https://github.com/dubbo/go-for-apache-dubbo/blob/develop/examples/README.md) carefully to learn how to dispose the configuration and compile the program.
## Code Example
## Benchmark
The subdirectory examples shows how to use dubbo-go. Please read the examples/readme.md carefully to learn how to dispose the configuration and compile the program.
Benchmark project please refer to [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark)
About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-dubbo) & [jsonrpc benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-jsonrpc)
## Todo list
## [User List](https://github.com/dubbo/go-for-apache-dubbo/issues/2)
- [ ] Tcp Transport and Hessian2 protocol
- [ ] Network
- [ ] Fuse
- [ ] Rate Limit
- [ ] Trace
- [ ] Metrics
- [ ] Load Balance
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
# Go for Apache Dubbo [English](./README.md) #
[![Build Status](https://travis-ci.com/dubbo/go-for-apache-dubbo.svg?branch=master)](https://travis-ci.com/dubbo/go-for-apache-dubbo)
[![codecov](https://codecov.io/gh/dubbo/go-for-apache-dubbo/branch/master/graph/badge.svg)](https://codecov.io/gh/dubbo/go-for-apache-dubbo)
---
Apache Dubbo Go 语言实现
## 证书 ##
Apache License, Version 2.0
## 代码设计 ##
基于dubbo的extension模块和分层的代码设计(包括 protocol layer, registry layer, cluster layer, config 等等)。我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。同时,欢迎你为社区贡献有用的拓展实现。
![框架设计](https://raw.githubusercontent.com/wiki/dubbo/dubbo-go/dubbo-go%E4%BB%A3%E7%A0%81%E5%88%86%E5%B1%82%E8%AE%BE%E8%AE%A1.png)
关于详细设计请阅读 [code layered design](https://github.com/dubbo/go-for-apache-dubbo/wiki/dubbo-go-V2.6-design)
## 功能列表 ##
实现列表:
- Role: Consumer(√), Provider(√)
- Transport: HTTP(√), TCP(√)
- Codec: JsonRPC v2(√), Hessian v2(√)
- Registry: ZooKeeper(√)
- Cluster Strategy: Failover(√)
- Load Balance: Random(√)
- Filter: Echo Health Check(√)
开发中列表:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: RoundRobin/LeastActive/ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul
任务列表:
- routing rule (dubbo v2.6.x)
- monitoring (dubbo v2.6.x)
- dynamic configuration (dubbo v2.7.x)
- metrics (dubbo v2.7.x) waiting dubbo's quota
你可以通过访问 [roadmap](https://github.com/dubbo/go-for-apache-dubbo/wiki/Roadmap) 知道更多关于 dubbo-go 的信息
## 快速开始 ##
这个子目录下的例子展示了如何使用 dubbo-go 。请仔细阅读 [examples/README.md](https://github.com/dubbo/go-for-apache-dubbo/blob/develop/examples/README.md) 学习如何处理配置并编译程序。
## 性能测试 ##
性能测试项目是 [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark)
关于 dubbo-go 性能测试报告,请阅读 [dubbo benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-dubbo) & [jsonrpc benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-jsonrpc)
## [User List](https://github.com/dubbo/go-for-apache-dubbo/issues/2)
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
package client
import (
"context"
)
import (
"github.com/dubbo/go-for-apache-dubbo/registry"
)
type Transport interface {
Call(ctx context.Context, url registry.ServiceURL, request Request, resp interface{}) error
NewRequest(conf registry.ServiceConfig, method string, args interface{}) (Request, error)
}
//////////////////////////////////////////////
// Request
//////////////////////////////////////////////
type Request interface {
ServiceConfig() registry.ServiceConfig
}
package invoker
import (
"context"
"sync"
"time"
)
import (
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/go-for-apache-dubbo/client"
"github.com/dubbo/go-for-apache-dubbo/client/selector"
"github.com/dubbo/go-for-apache-dubbo/dubbo"
"github.com/dubbo/go-for-apache-dubbo/jsonrpc"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
const RegistryConnDelay = 3
type Options struct {
ServiceTTL time.Duration
selector selector.Selector
//TODO:we should provider a transport client interface
HttpClient *jsonrpc.HTTPClient
DubboClient *dubbo.Client
}
type Option func(*Options)
func WithServiceTTL(ttl time.Duration) Option {
return func(o *Options) {
o.ServiceTTL = ttl
}
}
func WithHttpClient(client *jsonrpc.HTTPClient) Option {
return func(o *Options) {
o.HttpClient = client
}
}
func WithDubboClient(client *dubbo.Client) Option {
return func(o *Options) {
o.DubboClient = client
}
}
func WithLBSelector(selector selector.Selector) Option {
return func(o *Options) {
o.selector = selector
}
}
type Invoker struct {
Options
cacheServiceMap map[string]*ServiceArray
registry registry.Registry
listenerLock sync.Mutex
}
func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) {
options := Options{
//default 300s
ServiceTTL: time.Duration(300e9),
selector: selector.NewRandomSelector(),
}
for _, opt := range opts {
opt(&options)
}
if options.HttpClient == nil && options.DubboClient == nil {
return nil, jerrors.New("Must specify the transport client!")
}
invoker := &Invoker{
Options: options,
cacheServiceMap: make(map[string]*ServiceArray),
registry: registry,
}
go invoker.listen()
return invoker, nil
}
func (ivk *Invoker) listen() {
for {
if ivk.registry.IsClosed() {
log.Warn("event listener game over.")
return
}
listener, err := ivk.registry.Subscribe()
if err != nil {
if ivk.registry.IsClosed() {
log.Warn("event listener game over.")
return
}
log.Warn("getListener() = err:%s", jerrors.ErrorStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
if serviceEvent, err := listener.Next(); err != nil {
log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err))
listener.Close()
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
return
} else {
ivk.update(serviceEvent)
}
}
}
}
func (ivk *Invoker) update(res *registry.ServiceEvent) {
if res == nil || res.Service == nil {
return
}
log.Debug("registry update, result{%s}", res)
registryKey := res.Service.ServiceConfig().Key()
ivk.listenerLock.Lock()
defer ivk.listenerLock.Unlock()
svcArr, ok := ivk.cacheServiceMap[registryKey]
log.Debug("registry name:%s, its current member lists:%+v", registryKey, svcArr)
switch res.Action {
case registry.ServiceAdd:
if ok {
svcArr.add(res.Service, ivk.ServiceTTL)
} else {
ivk.cacheServiceMap[registryKey] = newServiceArray([]registry.ServiceURL{res.Service})
}
case registry.ServiceDel:
if ok {
svcArr.del(res.Service, ivk.ServiceTTL)
if len(svcArr.arr) == 0 {
delete(ivk.cacheServiceMap, registryKey)
log.Warn("delete registry %s from registry map", registryKey)
}
}
log.Error("selector delete registryURL{%s}", res.Service)
}
}
func (ivk *Invoker) getService(registryConf registry.ServiceConfig) (*ServiceArray, error) {
defer ivk.listenerLock.Unlock()
registryKey := registryConf.Key()
ivk.listenerLock.Lock()
svcArr, sok := ivk.cacheServiceMap[registryKey]
log.Debug("r.svcArr[registryString{%v}] = svcArr{%s}", registryKey, svcArr)
if sok && time.Since(svcArr.birth) < ivk.Options.ServiceTTL {
return svcArr, nil
}
ivk.listenerLock.Unlock()
svcs, err := ivk.registry.GetService(registryConf)
ivk.listenerLock.Lock()
if err != nil {
log.Error("Registry.get(conf:%+v) = {err:%s, svcs:%+v}",
registryConf, jerrors.ErrorStack(err), svcs)
return nil, jerrors.Trace(err)
}
newSvcArr := newServiceArray(svcs)
ivk.cacheServiceMap[registryKey] = newSvcArr
return newSvcArr, nil
}
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, req client.Request, resp interface{}) error {
serviceConf := req.ServiceConfig()
registryArray, err := ivk.getService(serviceConf)
if err != nil {
return err
}
if len(registryArray.arr) == 0 {
return jerrors.New("cannot find svc " + serviceConf.String())
}
url, err := ivk.selector.Select(reqId, registryArray)
if err != nil {
return err
}
if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", resp)
return nil
}
func (ivk *Invoker) DubboCall(reqId int64, registryConf registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {
registryArray, err := ivk.getService(registryConf)
if err != nil {
return err
}
if len(registryArray.arr) == 0 {
return jerrors.New("cannot find svc " + registryConf.String())
}
url, err := ivk.selector.Select(reqId, registryArray)
if err != nil {
return err
}
//TODO:这里要改一下call方法改为接收指针类型
if err = ivk.DubboClient.Call(url.Ip()+":"+url.Port(), url, method, args, reply, opts...); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", reply)
return nil
}
func (ivk *Invoker) Close() {
ivk.DubboClient.Close()
}
package invoker
import (
"fmt"
"strings"
"time"
)
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/go-for-apache-dubbo/registry"
)
//////////////////////////////////////////
// registry array
// should be returned by registry ,will be used by client & waiting to selector
//////////////////////////////////////////
var (
ErrServiceArrayEmpty = jerrors.New("registryArray empty")
ErrServiceArrayTimeout = jerrors.New("registryArray timeout")
)
type ServiceArray struct {
arr []registry.ServiceURL
birth time.Time
idx int64
}
func newServiceArray(arr []registry.ServiceURL) *ServiceArray {
return &ServiceArray{
arr: arr,
birth: time.Now(),
}
}
func (s *ServiceArray) GetIdx() *int64 {
return &s.idx
}
func (s *ServiceArray) GetSize() int64 {
return int64(len(s.arr))
}
func (s *ServiceArray) GetService(i int64) registry.ServiceURL {
return s.arr[i]
}
func (s *ServiceArray) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("birth:%s, idx:%d, arr len:%d, arr:{", s.birth, s.idx, len(s.arr)))
for i := range s.arr {
builder.WriteString(fmt.Sprintf("%d:%s, ", i, s.arr[i]))
}
builder.WriteString("}")
return builder.String()
}
func (s *ServiceArray) add(registry registry.ServiceURL, ttl time.Duration) {
s.arr = append(s.arr, registry)
s.birth = time.Now().Add(ttl)
}
func (s *ServiceArray) del(registry registry.ServiceURL, ttl time.Duration) {
for i, svc := range s.arr {
if svc.PrimitiveURL() == registry.PrimitiveURL() {
s.arr = append(s.arr[:i], s.arr[i+1:]...)
s.birth = time.Now().Add(ttl)
break
}
}
}
package selector
import (
"math/rand"
"sync/atomic"
)
import (
"github.com/dubbo/go-for-apache-dubbo/client"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
type RandomSelector struct{}
func NewRandomSelector() Selector {
return &RandomSelector{}
}
func (s *RandomSelector) Select(ID int64, array client.ServiceArrayIf) (registry.ServiceURL, error) {
if array.GetSize() == 0 {
return nil, ServiceArrayEmpty
}
idx := atomic.AddInt64(array.GetIdx(), 1)
idx = ((int64)(rand.Int()) + ID) % array.GetSize()
return array.GetService(idx), nil
}
package selector
import (
"sync/atomic"
)
import (
"github.com/dubbo/go-for-apache-dubbo/client"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
type RoundRobinSelector struct{}
func NewRoundRobinSelector() Selector {
return &RoundRobinSelector{}
}
func (s *RoundRobinSelector) Select(ID int64, array client.ServiceArrayIf) (registry.ServiceURL, error) {
if array.GetSize() == 0 {
return nil, ServiceArrayEmpty
}
idx := atomic.AddInt64(array.GetIdx(), 1)
idx = (ID + idx) % array.GetSize()
return array.GetService(idx), nil
}
package selector
import (
"fmt"
)
import (
"github.com/dubbo/go-for-apache-dubbo/client"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
var (
ServiceArrayEmpty = fmt.Errorf("emtpy service array")
)
type Selector interface {
Select(ID int64, array client.ServiceArrayIf) (registry.ServiceURL, error)
}
package client
import "github.com/dubbo/go-for-apache-dubbo/registry"
type ServiceArrayIf interface {
GetIdx() *int64
GetSize() int64
GetService(i int64) registry.ServiceURL
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import "github.com/dubbo/go-for-apache-dubbo/protocol"
type Cluster interface {
Join(Directory) protocol.Invoker
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/common/utils"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/dubbo/go-for-apache-dubbo/version"
)
type baseClusterInvoker struct {
directory cluster.Directory
availablecheck bool
destroyed *atomic.Bool
}
func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: true,
destroyed: atomic.NewBool(false),
}
}
func (invoker *baseClusterInvoker) GetUrl() common.URL {
return invoker.directory.GetUrl()
}
func (invoker *baseClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
}
func (invoker *baseClusterInvoker) IsAvailable() bool {
//TODO:sticky connection
return invoker.directory.IsAvailable()
}
//check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip, _ := utils.GetLocalIP()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetUrl().SubURL.Key(), invoker.directory.GetUrl().String(), ip, version.Version)
}
return nil
}
//check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
ip, _ := utils.GetLocalIP()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetUrl().Service(), ip, version.Version)
}
return nil
}
func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
//todo:ticky connect 粘纸连接
if len(invokers) == 1 {
return invokers[0]
}
selectedInvoker := lb.Select(invokers, invocation)
//judge to if the selectedInvoker is invoked
if !selectedInvoker.IsAvailable() || !invoker.availablecheck || isInvoked(selectedInvoker, invoked) {
// do reselect
var reslectInvokers []protocol.Invoker
for _, invoker := range invokers {
if !invoker.IsAvailable() {
continue
}
if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}
if len(reslectInvokers) > 0 {
return lb.Select(reslectInvokers, invocation)
} else {
return nil
}
}
return selectedInvoker
}
func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
if len(invoked) > 0 {
for _, i := range invoked {
if i == selectedInvoker {
return true
}
}
}
return false
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type failoverCluster struct {
}
const name = "failover"
func init() {
extension.SetCluster(name, NewFailoverCluster)
}
func NewFailoverCluster() cluster.Cluster {
return &failoverCluster{}
}
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/common/utils"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/dubbo/go-for-apache-dubbo/version"
)
type failoverClusterInvoker struct {
baseClusterInvoker
}
func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &failoverClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
//get reties
retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, 0); v != 0 {
retries = v
}
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
for i := int64(0); i < retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
err := invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
} else {
return result
}
}
ip, _ := utils.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error().Error(),
)}
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
"context"
"fmt"
"net/url"
"testing"
)
import (
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/go-for-apache-dubbo/cluster/directory"
"github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance"
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/common/logger"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/dubbo/go-for-apache-dubbo/protocol/invocation"
)
/////////////////////////////
// mock invoker
/////////////////////////////
type MockInvoker struct {
url common.URL
available bool
destroyed bool
successCount int
}
func NewMockInvoker(url common.URL, successCount int) *MockInvoker {
return &MockInvoker{
url: url,
available: true,
destroyed: false,
successCount: successCount,
}
}
func (bi *MockInvoker) GetUrl() common.URL {
return bi.url
}
func (bi *MockInvoker) IsAvailable() bool {
return bi.available
}
func (bi *MockInvoker) IsDestroyed() bool {
return bi.destroyed
}
type rest struct {
tried int
success bool
}
func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
count++
var success bool
var err error = nil
if count >= bi.successCount {
success = true
} else {
err = perrors.New("error")
}
result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}}
return result
}
func (bi *MockInvoker) Destroy() {
logger.Infof("Destroy invoker: %v", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
}
var count int
func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i), common.WithParams(urlParam))
invokers = append(invokers, NewMockInvoker(url, successCount))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failoverCluster.Join(staticDir)
if len(invocations) > 0 {
return clusterInvoker.Invoke(invocations[0])
}
return clusterInvoker.Invoke(&invocation.RPCInvocation{})
}
func Test_FailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 2, urlParams)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverInvokeFail(t *testing.T) {
urlParams := url.Values{}
result := normalInvoke(t, 3, urlParams)
assert.Errorf(t, result.Error(), "error")
count = 0
}
func Test_FailoverInvoke1(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "3")
result := normalInvoke(t, 3, urlParams)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverInvoke2(t *testing.T) {
urlParams := url.Values{}
urlParams.Set(constant.RETRIES_KEY, "2")
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := &invocation.RPCInvocation{}
ivc.SetMethod("test")
result := normalInvoke(t, 3, urlParams, ivc)
assert.NoError(t, result.Error())
count = 0
}
func Test_FailoverDestroy(t *testing.T) {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failoverCluster := NewFailoverCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failoverCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable())
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type mockCluster struct {
}
func NewMockCluster() cluster.Cluster {
return &mockCluster{}
}
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetUrl())
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type registryAwareCluster struct {
}
func init() {
extension.SetCluster("registryAware", NewRegistryAwareCluster)
}
func NewRegistryAwareCluster() cluster.Cluster {
return &registryAwareCluster{}
}
func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory)
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type registryAwareClusterInvoker struct {
baseClusterInvoker
}
func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &registryAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
func (invoker *registryAwareClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
//First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
for _, invoker := range invokers {
if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" {
return invoker.Invoke(invocation)
}
}
//If none of the invokers has a local signal, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(invocation)
}
}
return nil
}
// Copyright 2016-2019 hxmhlt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster_impl
import (
"context"
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/go-for-apache-dubbo/cluster/directory"
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/dubbo/go-for-apache-dubbo/protocol/invocation"
)
func Test_RegAwareInvokeSuccess(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
}
func TestDestroy(t *testing.T) {
regAwareCluster := NewRegistryAwareCluster()
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
invokers = append(invokers, NewMockInvoker(url, 1))
}
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
assert.Equal(t, false, clusterInvoker.IsAvailable())
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment