Skip to content
Snippets Groups Projects
Commit 10c35d72 authored by fangyincheng's avatar fangyincheng
Browse files

Merge remote-tracking branch 'upstream/master' into imp

parents 04370a54 2b5f5e48
No related branches found
No related tags found
No related merge requests found
Showing
with 953 additions and 539 deletions
...@@ -31,7 +31,7 @@ Finished List: ...@@ -31,7 +31,7 @@ Finished List:
- Codec: JsonRPC v2(√), Hessian v2(√) - Codec: JsonRPC v2(√), Hessian v2(√)
- Registry: ZooKeeper(√) - Registry: ZooKeeper(√)
- Cluster Strategy: Failover(√) - Cluster Strategy: Failover(√)
- Load Balance: Random(√) - Load Balance: Random(√), RoundRobin(√), LeastActive(√)
- Filter: Echo Health Check(√) - Filter: Echo Health Check(√)
Working List: Working List:
......
# Apache Dubbo-go [English](./README.md) # # Apache Dubbo-go [English](./README.md) #
[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go) [![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go) [![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
--- ---
Apache Dubbo Go 语言实现 Apache Dubbo Go 语言实现
## 证书 ## ## 证书 ##
Apache License, Version 2.0 Apache License, Version 2.0
## 发布日志 ## ## 发布日志 ##
[v1.0.0 - 2019年5月29日 兼容dubbo v2.6.5 版本](https://github.com/apache/dubbo-go/releases/tag/v1.0.0) [v1.0.0 - 2019年5月29日 兼容dubbo v2.6.5 版本](https://github.com/apache/dubbo-go/releases/tag/v1.0.0)
## 工程架构 ## ## 工程架构 ##
基于dubbo的extension模块和分层的代码设计(包括 protocol layer, registry layer, cluster layer, config 等等)。我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。同时,欢迎你为社区贡献有用的拓展实现。 基于dubbo的extension模块和分层的代码设计(包括 protocol layer, registry layer, cluster layer, config 等等)。我们的目标是:你可以对这些分层接口进行新的实现,并通过调用 extension 模块的“ extension.SetXXX ”方法来覆盖 dubbo-go [同 go-for-apache-dubbo ]的默认实现,以完成自己的特殊需求而无需修改源代码。同时,欢迎你为社区贡献有用的拓展实现。
![框架设计](https://raw.githubusercontent.com/wiki/dubbo/dubbo-go/dubbo-go%E4%BB%A3%E7%A0%81%E5%88%86%E5%B1%82%E8%AE%BE%E8%AE%A1.png) ![框架设计](https://raw.githubusercontent.com/wiki/dubbo/dubbo-go/dubbo-go%E4%BB%A3%E7%A0%81%E5%88%86%E5%B1%82%E8%AE%BE%E8%AE%A1.png)
关于详细设计请阅读 [code layered design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design) 关于详细设计请阅读 [code layered design](https://github.com/apache/dubbo-go/wiki/dubbo-go-V1.0-design)
## 功能列表 ## ## 功能列表 ##
实现列表: 实现列表:
- Role: Consumer(√), Provider(√) - Role: Consumer(√), Provider(√)
- Transport: HTTP(√), TCP(√) - Transport: HTTP(√), TCP(√)
- Codec: JsonRPC v2(√), Hessian v2(√) - Codec: JsonRPC v2(√), Hessian v2(√)
- Registry: ZooKeeper(√) - Registry: ZooKeeper(√)
- Cluster Strategy: Failover(√) - Cluster Strategy: Failover(√)
- Load Balance: Random(√) - Load Balance: Random(√), RoundRobin(√), LeastActive(√)
- Filter: Echo Health Check(√) - Filter: Echo Health Check(√)
开发中列表: 开发中列表:
- Cluster Strategy: Failfast/Failsafe/Failback/Forking - Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: RoundRobin/LeastActive/ConsistentHash - Load Balance: RoundRobin/LeastActive/ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter - Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul - Registry: etcd/k8s/consul
任务列表: 任务列表:
- routing rule (dubbo v2.6.x) - routing rule (dubbo v2.6.x)
- metrics (dubbo v2.7.x) waiting dubbo's quota - metrics (dubbo v2.7.x) waiting dubbo's quota
- dynamic configuration center & metadata center (dubbo v2.7.x) - dynamic configuration center & metadata center (dubbo v2.7.x)
- tracing (dubbo ecosystem) - tracing (dubbo ecosystem)
你可以通过访问 [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap) 知道更多关于 dubbo-go 的信息 你可以通过访问 [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap) 知道更多关于 dubbo-go 的信息
## 快速开始 ## ## 快速开始 ##
这个子目录下的例子展示了如何使用 dubbo-go 。请仔细阅读 [examples/README.md](https://github.com/apache/dubbo-go/blob/develop/examples/README.md) 学习如何处理配置并编译程序。 这个子目录下的例子展示了如何使用 dubbo-go 。请仔细阅读 [examples/README.md](https://github.com/apache/dubbo-go/blob/develop/examples/README.md) 学习如何处理配置并编译程序。
## 性能测试 ## ## 性能测试 ##
性能测试项目是 [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark) 性能测试项目是 [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark)
关于 dubbo-go 性能测试报告,请阅读 [dubbo benchmarking report](https://github.com/apache/dubbo-go/wiki/pressure-test-report-for-dubbo) & [jsonrpc benchmarking report](https://github.com/apache/dubbo-go/wiki/pressure-test-report-for-jsonrpc) 关于 dubbo-go 性能测试报告,请阅读 [dubbo benchmarking report](https://github.com/apache/dubbo-go/wiki/pressure-test-report-for-dubbo) & [jsonrpc benchmarking report](https://github.com/apache/dubbo-go/wiki/pressure-test-report-for-jsonrpc)
## [User List](https://github.com/apache/dubbo-go/issues/2) ## [User List](https://github.com/apache/dubbo-go/issues/2)
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png) ![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
## Stargazers ## Stargazers
[![Stargazers over time](https://starchart.cc/apache/dubbo-go.svg)](https://starchart.cc/apache/dubbo-go) [![Stargazers over time](https://starchart.cc/apache/dubbo-go.svg)](https://starchart.cc/apache/dubbo-go)
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// @author yiji@apache.org
package loadbalance
import (
"math/rand"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
const (
LeastActive = "leastactive"
)
func init() {
extension.SetLoadbalance(LeastActive, NewLeastActiveLoadBalance)
}
type leastActiveLoadBalance struct {
}
func NewLeastActiveLoadBalance() cluster.LoadBalance {
return &leastActiveLoadBalance{}
}
func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
return nil
}
if count == 1 {
return invokers[0]
}
var (
leastActive int32 = -1 // The least active value of all invokers
totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
firstWeight int64 = 0 // Initial value, used for comparision
leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE)
leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
sameWeight = true // Every invoker has the same weight value?
)
for i := 0; i < count; i++ {
invoker := invokers[i]
// Active number
active := protocol.GetStatus(invoker.GetUrl(), invocation.MethodName()).GetActive()
// current weight (maybe in warmUp)
weight := GetWeight(invoker, invocation)
// There are smaller active services
if leastActive == -1 || active < leastActive {
leastActive = active
leastIndexes[0] = i
leastCount = 1 // next available leastIndex offset
totalWeight = weight
firstWeight = weight
sameWeight = true
} else if active == leastActive {
leastIndexes[leastCount] = i
totalWeight += weight
leastCount++
if sameWeight && (i > 0) && weight != firstWeight {
sameWeight = false
}
}
}
if leastCount == 1 {
return invokers[0]
}
if !sameWeight && totalWeight > 0 {
offsetWeight := rand.Int63n(totalWeight) + 1
for i := 0; i < leastCount; i++ {
leastIndex := leastIndexes[i]
offsetWeight -= GetWeight(invokers[i], invocation)
if offsetWeight <= 0 {
return invokers[leastIndex]
}
}
}
index := leastIndexes[rand.Intn(leastCount)]
return invokers[index]
}
package loadbalance
import (
"context"
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestLeastActiveSelect(t *testing.T) {
loadBalance := NewLeastActiveLoadBalance()
var invokers []protocol.Invoker
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))
for i := 1; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loadBalance.Select(invokers, &invocation.RPCInvocation{})
}
func TestLeastActiveByWeight(t *testing.T) {
loadBalance := NewLeastActiveLoadBalance()
var invokers []protocol.Invoker
loop := 3
for i := 1; i <= loop; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("test%v://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
inv := new(invocation.RPCInvocation)
inv.SetMethod("test")
protocol.BeginCount(invokers[2].GetUrl(), inv.MethodName())
loop = 10000
var (
firstCount int
secondCount int
)
for i := 1; i <= loop; i++ {
invoker := loadBalance.Select(invokers, inv)
if invoker.GetUrl().Protocol == "test1" {
firstCount++
} else if invoker.GetUrl().Protocol == "test2" {
secondCount++
}
}
assert.Equal(t, firstCount+secondCount, loop)
}
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package loadbalance
import (
"math"
"sync"
"sync/atomic"
"time"
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)
const (
RoundRobin = "roundrobin"
COMPLETE = 0
UPDATING = 1
)
var (
methodWeightMap sync.Map // [string]invokers
state int32 = COMPLETE // update lock acquired ?
recyclePeriod int64 = 60 * time.Second.Nanoseconds()
)
func init() {
extension.SetLoadbalance(RoundRobin, NewRoundRobinLoadBalance)
}
type roundRobinLoadBalance struct{}
func NewRoundRobinLoadBalance() cluster.LoadBalance {
return &roundRobinLoadBalance{}
}
func (lb *roundRobinLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
return nil
}
if count == 1 {
return invokers[0]
}
key := invokers[0].GetUrl().Path + "." + invocation.MethodName()
cache, _ := methodWeightMap.LoadOrStore(key, &cachedInvokers{})
cachedInvokers := cache.(*cachedInvokers)
var (
clean = false
totalWeight = int64(0)
maxCurrentWeight = int64(math.MinInt64)
now = time.Now()
selectedInvoker protocol.Invoker
selectedWeightRobin *weightedRoundRobin
)
for _, invoker := range invokers {
var weight = GetWeight(invoker, invocation)
if weight < 0 {
weight = 0
}
identifier := invoker.GetUrl().Key()
loaded, found := cachedInvokers.LoadOrStore(identifier, &weightedRoundRobin{weight: weight})
weightRobin := loaded.(*weightedRoundRobin)
if !found {
clean = true
}
if weightRobin.Weight() != weight {
weightRobin.setWeight(weight)
}
currentWeight := weightRobin.increaseCurrent()
weightRobin.lastUpdate = &now
if currentWeight > maxCurrentWeight {
maxCurrentWeight = currentWeight
selectedInvoker = invoker
selectedWeightRobin = weightRobin
}
totalWeight += weight
}
cleanIfRequired(clean, cachedInvokers, &now)
if selectedWeightRobin != nil {
selectedWeightRobin.Current(totalWeight)
return selectedInvoker
}
// should never happen
return invokers[0]
}
func cleanIfRequired(clean bool, invokers *cachedInvokers, now *time.Time) {
if clean && atomic.CompareAndSwapInt32(&state, COMPLETE, UPDATING) {
defer atomic.CompareAndSwapInt32(&state, UPDATING, COMPLETE)
invokers.Range(func(identify, robin interface{}) bool {
weightedRoundRobin := robin.(*weightedRoundRobin)
elapsed := now.Sub(*weightedRoundRobin.lastUpdate).Nanoseconds()
if elapsed > recyclePeriod {
invokers.Delete(identify)
}
return true
})
}
}
// Record the weight of the invoker
type weightedRoundRobin struct {
weight int64
current int64
lastUpdate *time.Time
}
func (robin *weightedRoundRobin) Weight() int64 {
return atomic.LoadInt64(&robin.weight)
}
func (robin *weightedRoundRobin) setWeight(weight int64) {
robin.weight = weight
robin.current = 0
}
func (robin *weightedRoundRobin) increaseCurrent() int64 {
return atomic.AddInt64(&robin.current, robin.weight)
}
func (robin *weightedRoundRobin) Current(delta int64) {
atomic.AddInt64(&robin.current, -1*delta)
}
type cachedInvokers struct {
sync.Map /*[string]weightedRoundRobin*/
}
package loadbalance
import (
"context"
"fmt"
"strconv"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestRoundRobinSelect(t *testing.T) {
loadBalance := NewRoundRobinLoadBalance()
var invokers []protocol.Invoker
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))
for i := 1; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loadBalance.Select(invokers, &invocation.RPCInvocation{})
}
func TestRoundRobinByWeight(t *testing.T) {
loadBalance := NewRoundRobinLoadBalance()
var invokers []protocol.Invoker
loop := 10
for i := 1; i <= loop; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loop = (1 + loop) * loop / 2
selected := make(map[protocol.Invoker]int)
for i := 1; i <= loop; i++ {
invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{})
selected[invoker]++
}
for _, i := range invokers {
w, _ := strconv.Atoi(i.GetUrl().GetParam("weight", "-1"))
assert.True(t, selected[i] == w)
}
}
...@@ -28,7 +28,8 @@ import ( ...@@ -28,7 +28,8 @@ import (
func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 {
url := invoker.GetUrl() url := invoker.GetUrl()
weight := url.GetMethodParamInt(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT)
if weight > 0 { if weight > 0 {
//get service register time an do warm up time //get service register time an do warm up time
now := time.Now().Unix() now := time.Now().Unix()
......
...@@ -67,3 +67,8 @@ const ( ...@@ -67,3 +67,8 @@ const (
OWNER_KEY = "owner" OWNER_KEY = "owner"
ENVIRONMENT_KEY = "environment" ENVIRONMENT_KEY = "environment"
) )
const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_TIMEOUT_KET = "config.timeout"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
)
var (
configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error))
)
func SetConfigCenter(name string, v func(config *common.URL) (config_center.DynamicConfiguration, error)) {
configCenters[name] = v
}
func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) {
if configCenters[name] == nil {
panic("config center for " + name + " is not existing, make sure you have import the package.")
}
return configCenters[name](config)
}
...@@ -20,6 +20,7 @@ package common ...@@ -20,6 +20,7 @@ package common
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"net" "net"
"net/url" "net/url"
"strconv" "strconv"
...@@ -288,6 +289,15 @@ func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { ...@@ -288,6 +289,15 @@ func (c URL) GetMethodParamInt(method string, key string, d int64) int64 {
return int64(r) return int64(r)
} }
func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 {
r := c.GetMethodParamInt(method, key, math.MinInt64)
if r == math.MinInt64 {
return c.GetParamInt(key, d)
}
return r
}
func (c URL) GetMethodParam(method string, key string, d string) string { func (c URL) GetMethodParam(method string, key string, d string) string {
var r string var r string
if r = c.Params.Get("methods." + method + "." + key); r == "" { if r = c.Params.Get("methods." + method + "." + key); r == "" {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
import (
"time"
)
import (
"github.com/apache/dubbo-go/remoting"
)
//////////////////////////////////////////
// DynamicConfiguration
//////////////////////////////////////////
const DEFAULT_GROUP = "dubbo"
const DEFAULT_CONFIG_TIMEOUT = "10s"
type DynamicConfiguration interface {
AddListener(string, remoting.ConfigurationListener, ...Option)
RemoveListener(string, remoting.ConfigurationListener, ...Option)
GetConfig(string, ...Option) string
GetConfigs(string, ...Option) string
}
type Options struct {
Group string
Timeout time.Duration
}
type Option func(*Options)
func WithGroup(group string) Option {
return func(opt *Options) {
opt.Group = group
}
}
func WithTimeout(time time.Duration) Option {
return func(opt *Options) {
opt.Timeout = time
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
const ZK_CLIENT = "zk config_center"
type ZookeeperDynamicConfiguration struct {
url common.URL
rootPath string
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
}
func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConfiguration, error) {
c := &ZookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
err := zookeeper.ValidateZookeeperClient(c, zookeeper.WithZkName(ZK_CLIENT))
if err != nil {
return nil, err
}
c.wg.Add(1)
go zookeeper.HandleClientRestart(c)
c.listener = zookeeper.NewZkEventListener(c.client)
//c.configListener = NewRegistryConfigurationListener(c.client, c)
//c.dataListener = NewRegistryDataListener(c.configListener)
return c, nil
}
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) GetConfig(key string, opions ...config_center.Option) string {
return ""
}
func (*ZookeeperDynamicConfiguration) GetConfigs(key string, opions ...config_center.Option) string {
return ""
}
func (r *ZookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *ZookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *ZookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *ZookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *ZookeeperDynamicConfiguration) GetDone() chan struct{} {
return r.done
}
func (r *ZookeeperDynamicConfiguration) GetUrl() common.URL {
return r.url
}
func (r *ZookeeperDynamicConfiguration) Destroy() {
if r.listener != nil {
r.listener.Close()
}
close(r.done)
r.wg.Wait()
r.closeConfigs()
}
func (r *ZookeeperDynamicConfiguration) IsAvailable() bool {
select {
case <-r.done:
return false
default:
return true
}
}
func (r *ZookeeperDynamicConfiguration) closeConfigs() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// 先关闭旧client,以关闭tmp node
r.client.Close()
r.client = nil
}
func (r *ZookeeperDynamicConfiguration) RestartCallBack() bool {
return true
}
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// @author yiji@apache.org
package impl
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
const active = "active"
func init() {
extension.SetFilter(active, GetActiveFilter)
}
type ActiveFilter struct {
}
func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
protocol.BeginCount(invoker.GetUrl(), invocation.MethodName())
return invoker.Invoke(invocation)
}
func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
protocol.EndCount(invoker.GetUrl(), invocation.MethodName())
return result
}
func GetActiveFilter() filter.Filter {
return &ActiveFilter{}
}
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// @author yiji@apache.org
package protocol
import (
"sync"
"sync/atomic"
)
import (
"github.com/apache/dubbo-go/common"
)
var (
methodStatistics sync.Map // url -> { methodName : RpcStatus}
)
type RpcStatus struct {
active int32
}
func (rpc *RpcStatus) GetActive() int32 {
return atomic.LoadInt32(&rpc.active)
}
func GetStatus(url common.URL, methodName string) *RpcStatus {
identifier := url.Key()
methodMap, found := methodStatistics.Load(identifier)
if !found {
methodMap = &sync.Map{}
methodStatistics.Store(identifier, methodMap)
}
methodActive := methodMap.(*sync.Map)
rpcStatus, found := methodActive.Load(methodName)
if !found {
rpcStatus = &RpcStatus{}
methodActive.Store(methodName, rpcStatus)
}
status := rpcStatus.(*RpcStatus)
return status
}
func BeginCount(url common.URL, methodName string) {
beginCount0(GetStatus(url, methodName))
}
func EndCount(url common.URL, methodName string) {
endCount0(GetStatus(url, methodName))
}
// private methods
func beginCount0(rpcStatus *RpcStatus) {
atomic.AddInt32(&rpcStatus.active, 1)
}
func endCount0(rpcStatus *RpcStatus) {
atomic.AddInt32(&rpcStatus.active, -1)
}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package directory package directory
import ( import (
"github.com/apache/dubbo-go/remoting"
"sync" "sync"
"time" "time"
) )
...@@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { ...@@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action { switch res.Action {
case registry.ServiceAdd: case remoting.Add:
//dir.cacheService.Add(res.Path, dir.serviceTTL) //dir.cacheService.Add(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service) dir.cacheInvoker(res.Service)
case registry.ServiceDel: case remoting.Del:
//dir.cacheService.Del(res.Path, dir.serviceTTL) //dir.cacheService.Del(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service) dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service) logger.Infof("selector delete service url{%s}", res.Service)
......
...@@ -19,6 +19,7 @@ package directory ...@@ -19,6 +19,7 @@ package directory
import ( import (
"context" "context"
"github.com/apache/dubbo-go/remoting"
"net/url" "net/url"
"strconv" "strconv"
"testing" "testing"
...@@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) { ...@@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir() registryDirectory, mockRegistry := normalRegistryDir()
time.Sleep(1e9) time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3) assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
time.Sleep(1e9) time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2) assert.Len(t, registryDirectory.cacheInvokers, 2)
} }
...@@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) { ...@@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1") urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap))}) common.WithParams(urlmap))})
} }
//for group2 //for group2
...@@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) { ...@@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap2.Set(constant.GROUP_KEY, "group2") urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap2))}) common.WithParams(urlmap2))})
} }
...@@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { ...@@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
} }
return registryDirectory, mockRegistry.(*registry.MockRegistry) return registryDirectory, mockRegistry.(*registry.MockRegistry)
} }
...@@ -25,38 +25,19 @@ import ( ...@@ -25,38 +25,19 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting"
) )
func init() { func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
//////////////////////////////////////////
// service url event type
//////////////////////////////////////////
type ServiceEventType int
const (
ServiceAdd = iota
ServiceDel
)
var serviceEventTypeStrings = [...]string{
"add service",
"delete service",
}
func (t ServiceEventType) String() string {
return serviceEventTypeStrings[t]
}
////////////////////////////////////////// //////////////////////////////////////////
// service event // service event
////////////////////////////////////////// //////////////////////////////////////////
type ServiceEvent struct { type ServiceEvent struct {
Action ServiceEventType Action remoting.EventType
Service common.URL Service common.URL
} }
......
...@@ -19,295 +19,64 @@ package zookeeper ...@@ -19,295 +19,64 @@ package zookeeper
import ( import (
"context" "context"
"fmt"
"path"
"sync"
"time"
) )
import ( import (
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
) )
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
zk "github.com/apache/dubbo-go/remoting/zookeeper"
) )
const ( type RegistryDataListener struct {
MaxFailTimes = 15 interestedURL []*common.URL
) listener *RegistryConfigurationListener
type zkEvent struct {
res *registry.ServiceEvent
err error
} }
func (e zkEvent) String() string { func NewRegistryDataListener(listener *RegistryConfigurationListener) *RegistryDataListener {
return fmt.Sprintf("err:%s, res:%s", e.err, e.res) return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
} }
func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
type zkEventListener struct { l.interestedURL = append(l.interestedURL, url)
client *zookeeperClient
events chan zkEvent
serviceMapLock sync.Mutex
serviceMap map[string]struct{}
wg sync.WaitGroup
registry *zkRegistry
} }
func newZkEventListener(registry *zkRegistry, client *zookeeperClient) *zkEventListener { func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
return &zkEventListener{ serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
client: client, if err != nil {
registry: registry, logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
events: make(chan zkEvent, 32), return false
serviceMap: make(map[string]struct{}),
} }
} for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
l.wg.Add(1) return true
defer l.wg.Done()
var zkEvent zk.Event
for {
keyEventCh, err := l.client.existW(zkPath)
if err != nil {
logger.Errorf("existW{key:%s} = error{%v}", zkPath, err)
return false
}
select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
case zk.EventNodeCreated:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
case zk.EventNotWatching:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
case zk.EventNodeDeleted:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
return true
}
case <-l.client.done():
return false
} }
} }
return false return false
} }
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf common.URL) { type RegistryConfigurationListener struct {
contains := func(s []string, e string) bool { client *zk.ZookeeperClient
for _, a := range s { registry *zkRegistry
if a == e { events chan *remoting.ConfigChangeEvent
return true
}
}
return false
}
newChildren, err := l.client.getChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
serviceURL common.URL
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
//context.TODO
serviceURL, err = common.NewURL(context.TODO(), n)
if err != nil {
logger.Errorf("NewURL(%s) = error{%v}", n, perrors.WithStack(err))
continue
}
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL{%s} is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
continue
}
logger.Infof("add serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
go func(node string, serviceURL common.URL) {
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete serviceURL{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, serviceURL)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
serviceURL, err = common.NewURL(context.TODO(), n)
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL{%s} has been deleted is not compatible with SubURL{%#v}", serviceURL.Key(), conf.Key())
continue
}
logger.Warnf("delete serviceURL{%s}", serviceURL)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
} }
func (l *zkEventListener) listenDirEvent(zkPath string, conf common.URL) { func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
l.wg.Add(1) reg.wg.Add(1)
defer l.wg.Done() return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.getChildrenW(zkPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Errorf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.registerEvent(zkPath, &event)
select {
case <-time.After(timeSecondDuration(failTimes * RegistryConnDelay)):
l.client.unregisterEvent(zkPath, &event)
continue
case <-l.client.done():
l.client.unregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.unregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, conf)
continue
}
}
failTimes = 0
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, stateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, conf)
case <-l.client.done():
logger.Warnf("client.done(), listen(path{%s}, ReferenceConfig{%#v}) goroutine exit now...", zkPath, conf)
return
}
}
} }
func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) {
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener l.events <- configType
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *zkEventListener) listenServiceEvent(conf common.URL) {
var (
err error
zkPath string
dubboPath string
children []string
serviceURL common.URL
)
zkPath = fmt.Sprintf("/dubbo%s/providers", conf.Path)
l.serviceMapLock.Lock()
_, ok := l.serviceMap[zkPath]
l.serviceMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.serviceMapLock.Lock()
l.serviceMap[zkPath] = struct{}{}
l.serviceMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.getChildren(zkPath)
if err != nil {
children = nil
logger.Errorf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
serviceURL, err = common.NewURL(context.TODO(), c)
if err != nil {
logger.Errorf("NewURL(r{%s}) = error{%v}", c, err)
continue
}
if !conf.URLEqual(serviceURL) {
logger.Warnf("serviceURL %v is not compatible with SubURL %v", serviceURL.Key(), conf.Key())
continue
}
logger.Debugf("add serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceAdd, Service: serviceURL}, nil}
// listen l service node
dubboPath = path.Join(zkPath, c)
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
l.events <- zkEvent{&registry.ServiceEvent{Action: registry.ServiceDel, Service: serviceURL}, nil}
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, conf common.URL) {
l.listenDirEvent(zkPath, conf)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, conf)
} }
func (l *zkEventListener) Next() (*registry.ServiceEvent, error) { func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for { for {
select { select {
case <-l.client.done(): case <-l.client.Done():
logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.") logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
return nil, perrors.New("listener stopped") return nil, perrors.New("listener stopped")
...@@ -317,29 +86,21 @@ func (l *zkEventListener) Next() (*registry.ServiceEvent, error) { ...@@ -317,29 +86,21 @@ func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events: case e := <-l.events:
logger.Debugf("got zk event %s", e) logger.Debugf("got zk event %s", e)
if e.err != nil { if e.ConfigType == remoting.Del && !l.valid() {
return nil, perrors.WithStack(e.err) logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
}
if e.res.Action == registry.ServiceDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.res)
continue continue
} }
//r.update(e.res) //r.update(e.res)
//write to invoker //write to invoker
//r.outerEventCh <- e.res //r.outerEventCh <- e.res
return e.res, nil return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
} }
} }
} }
func (l *RegistryConfigurationListener) Close() {
func (l *zkEventListener) valid() bool { l.registry.wg.Done()
return l.client.zkConnValid()
} }
func (l *zkEventListener) Close() { func (l *RegistryConfigurationListener) valid() bool {
l.registry.listenerLock.Lock() return l.client.ZkConnValid()
l.client.Close()
l.registry.listenerLock.Unlock()
l.registry.wg.Done()
l.wg.Wait()
} }
...@@ -40,13 +40,12 @@ import ( ...@@ -40,13 +40,12 @@ import (
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/zookeeper"
"github.com/apache/dubbo-go/version" "github.com/apache/dubbo-go/version"
) )
const ( const (
defaultTimeout = int64(10e9) RegistryZkClient = "zk registry"
RegistryZkClient = "zk registry"
RegistryConnDelay = 3
) )
var ( var (
...@@ -73,14 +72,16 @@ type zkRegistry struct { ...@@ -73,14 +72,16 @@ type zkRegistry struct {
done chan struct{} done chan struct{}
cltLock sync.Mutex cltLock sync.Mutex
client *zookeeperClient client *zookeeper.ZookeeperClient
services map[string]common.URL // service name + protocol -> service config services map[string]common.URL // service name + protocol -> service config
listenerLock sync.Mutex listenerLock sync.Mutex
listener *zkEventListener listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
configListener *RegistryConfigurationListener
//for provider //for provider
zkPath map[string]int // key = protocol://ip:port/interface zkPath map[string]int // key = protocol://ip:port/interface
} }
func newZkRegistry(url *common.URL) (registry.Registry, error) { func newZkRegistry(url *common.URL) (registry.Registry, error) {
...@@ -97,30 +98,28 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { ...@@ -97,30 +98,28 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
zkPath: make(map[string]int), zkPath: make(map[string]int),
} }
//if r.SubURL.Name == "" { err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
// r.SubURL.Name = RegistryZkClient
//}
//if r.Version == "" {
// r.Version = version.Version
//}
err = r.validateZookeeperClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.wg.Add(1) r.wg.Add(1)
go r.handleZkRestart() go zookeeper.HandleClientRestart(r)
//if r.RoleType == registry.CONSUMER { r.listener = zookeeper.NewZkEventListener(r.client)
// r.wg.Add(1) r.configListener = NewRegistryConfigurationListener(r.client, r)
// go r.listen() r.dataListener = NewRegistryDataListener(r.configListener)
//}
return r, nil return r, nil
} }
func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) { type Options struct {
client *zookeeper.ZookeeperClient
}
type Option func(*Options)
func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
var ( var (
err error err error
r *zkRegistry r *zkRegistry
...@@ -136,139 +135,78 @@ func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) { ...@@ -136,139 +135,78 @@ func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
zkPath: make(map[string]int), zkPath: make(map[string]int),
} }
c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second) c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
r.wg.Add(1) r.wg.Add(1)
go r.handleZkRestart() go zookeeper.HandleClientRestart(r)
//if r.RoleType == registry.CONSUMER { r.listener = zookeeper.NewZkEventListener(r.client)
// r.wg.Add(1) r.configListener = NewRegistryConfigurationListener(r.client, r)
// go r.listen() r.dataListener = NewRegistryDataListener(r.configListener)
//}
return c, r, nil return c, r, nil
} }
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) {
r.client = client
}
func (r *zkRegistry) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *zkRegistry) GetDone() chan struct{} {
return r.done
}
func (r *zkRegistry) GetUrl() common.URL { func (r *zkRegistry) GetUrl() common.URL {
return *r.URL return *r.URL
} }
func (r *zkRegistry) Destroy() { func (r *zkRegistry) Destroy() {
if r.listener != nil { if r.configListener != nil {
r.listener.Close() r.configListener.Close()
} }
close(r.done) close(r.done)
r.wg.Wait() r.wg.Wait()
r.closeRegisters() r.closeRegisters()
} }
func (r *zkRegistry) validateZookeeperClient() error { func (r *zkRegistry) RestartCallBack() bool {
var (
err error
)
err = nil // copy r.services
r.cltLock.Lock() services := []common.URL{}
defer r.cltLock.Unlock() for _, confIf := range r.services {
if r.client == nil { services = append(services, confIf)
//in dubbp ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
r.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
}
r.client, err = newZookeeperClient(RegistryZkClient, []string{r.Location}, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk addresss{%v}, timeout{%d}) = error{%v}",
RegistryZkClient, r.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.Location)
}
}
if r.client.conn == nil {
var event <-chan zk.Event
r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
if err == nil {
r.client.wait.Add(1)
go r.client.handleZkEvent(event)
}
} }
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL) flag := true
} for _, confIf := range services {
err := r.register(confIf)
func (r *zkRegistry) handleZkRestart() { if err != nil {
var ( logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
err error confIf, perrors.WithStack(err))
flag bool flag = false
failTimes int break
confIf common.URL
)
defer r.wg.Done()
LOOP:
for {
select {
case <-r.done:
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.client.done():
r.cltLock.Lock()
r.client.Close()
r.client = nil
r.cltLock.Unlock()
// 接zk,直至成功
failTimes = 0
for {
select {
case <-r.done:
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-time.After(time.Duration(1e9 * failTimes * RegistryConnDelay)): // 防止疯狂重连zk
}
err = r.validateZookeeperClient()
logger.Infof("ZkProviderRegistry.validateZookeeperClient(zkAddr{%s}) = error{%#v}",
r.client.zkAddrs, perrors.WithStack(err))
if err == nil {
// copy r.services
services := []common.URL{}
for _, confIf = range r.services {
services = append(services, confIf)
}
flag = true
for _, confIf = range services {
err = r.register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
if flag {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
} }
logger.Infof("success to re-register service :%v", confIf.Key())
} }
return flag
} }
func (r *zkRegistry) Register(conf common.URL) error { func (r *zkRegistry) Register(conf common.URL) error {
var ( var (
ok bool ok bool
err error err error
listener *zkEventListener
) )
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
switch role { switch role {
...@@ -291,12 +229,6 @@ func (r *zkRegistry) Register(conf common.URL) error { ...@@ -291,12 +229,6 @@ func (r *zkRegistry) Register(conf common.URL) error {
r.cltLock.Unlock() r.cltLock.Unlock()
logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
r.listenerLock.Lock()
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
go listener.listenServiceEvent(conf)
}
case common.PROVIDER: case common.PROVIDER:
// 检验服务是否已经注册过 // 检验服务是否已经注册过
...@@ -337,7 +269,7 @@ func (r *zkRegistry) register(c common.URL) error { ...@@ -337,7 +269,7 @@ func (r *zkRegistry) register(c common.URL) error {
//conf config.URL //conf config.URL
) )
err = r.validateZookeeperClient() err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
if err != nil { if err != nil {
return perrors.WithStack(err) return perrors.WithStack(err)
} }
...@@ -428,6 +360,7 @@ func (r *zkRegistry) register(c common.URL) error { ...@@ -428,6 +360,7 @@ func (r *zkRegistry) register(c common.URL) error {
dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String()) dubboPath = fmt.Sprintf("/dubbo%s/%s", c.Path, (common.RoleType(common.CONSUMER)).String())
logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
default: default:
return perrors.Errorf("@c{%v} type is not referencer or provider", c) return perrors.Errorf("@c{%v} type is not referencer or provider", c)
} }
...@@ -464,44 +397,37 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { ...@@ -464,44 +397,37 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
} }
func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) { func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf) return r.getListener(conf)
} }
func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) { func (r *zkRegistry) getListener(conf common.URL) (*RegistryConfigurationListener, error) {
var ( var (
zkListener *zkEventListener zkListener *RegistryConfigurationListener
) )
r.listenerLock.Lock() r.listenerLock.Lock()
zkListener = r.listener zkListener = r.configListener
r.listenerLock.Unlock() r.listenerLock.Unlock()
if zkListener != nil { if r.listener == nil {
return zkListener, nil r.cltLock.Lock()
} client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
r.cltLock.Lock() // new client & listener
client := r.client listener := zookeeper.NewZkEventListener(r.client)
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
// new client & listener r.listenerLock.Lock()
zkListener = newZkEventListener(r, client) r.listener = listener
r.listenerLock.Unlock()
}
r.listenerLock.Lock() //注册到dataconfig的interested
r.listener = zkListener r.dataListener.AddInterestedURL(&conf)
r.listenerLock.Unlock()
// listen go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo%s/providers", conf.Path), r.dataListener)
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
return zkListener, nil return zkListener, nil
} }
......
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/remoting/zookeeper"
) )
func Test_Register(t *testing.T) { func Test_Register(t *testing.T) {
...@@ -40,7 +41,7 @@ func Test_Register(t *testing.T) { ...@@ -40,7 +41,7 @@ func Test_Register(t *testing.T) {
ts, reg, err := newMockZkRegistry(&regurl) ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop() defer ts.Stop()
err = reg.Register(url) err = reg.Register(url)
children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers") children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err) assert.NoError(t, err)
} }
...@@ -49,7 +50,6 @@ func Test_Subscribe(t *testing.T) { ...@@ -49,7 +50,6 @@ func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := newMockZkRegistry(&regurl) ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
//provider register //provider register
err = reg.Register(url) err = reg.Register(url)
...@@ -61,8 +61,8 @@ func Test_Subscribe(t *testing.T) { ...@@ -61,8 +61,8 @@ func Test_Subscribe(t *testing.T) {
//consumer register //consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
_, reg2, err := newMockZkRegistry(&regurl) _, reg2, err := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
reg2.client = reg.client
err = reg2.Register(url) err = reg2.Register(url)
listener, err := reg2.Subscribe(url) listener, err := reg2.Subscribe(url)
...@@ -71,8 +71,8 @@ func Test_Subscribe(t *testing.T) { ...@@ -71,8 +71,8 @@ func Test_Subscribe(t *testing.T) {
if err != nil { if err != nil {
return return
} }
assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String()) assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
defer ts.Stop()
} }
func Test_ConsumerDestory(t *testing.T) { func Test_ConsumerDestory(t *testing.T) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment