Skip to content
Snippets Groups Projects
Commit f9aff5ac authored by 李平's avatar 李平
Browse files

reslove conficts

parents d530be14 014d1bc6
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"strconv"
)
import (
consul "github.com/hashicorp/consul/api"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/utils"
)
func buildId(url common.URL) string {
t := md5.Sum([]byte(url.String()))
return hex.EncodeToString(t[:])
}
func buildService(url common.URL) (*consul.AgentServiceRegistration, error) {
var err error
// id
id := buildId(url)
// address
if url.Ip == "" {
url.Ip, _ = utils.GetLocalIP()
}
// port
port, err := strconv.Atoi(url.Port)
if err != nil {
return nil, err
}
// tcp
tcp := fmt.Sprintf("%s:%d", url.Ip, port)
// tags
tags := make([]string, 0, 8)
for k := range url.Params {
tags = append(tags, k+"="+url.Params.Get(k))
}
tags = append(tags, "dubbo")
// meta
meta := make(map[string]string, 8)
meta["url"] = url.String()
// check
check := &consul.AgentServiceCheck{
TCP: tcp,
Interval: url.GetParam("consul-check-interval", "10s"),
Timeout: url.GetParam("consul-check-timeout", "1s"),
DeregisterCriticalServiceAfter: url.GetParam("consul-deregister-critical-service-after", "20s"),
}
service := &consul.AgentServiceRegistration{
Name: url.Service(),
ID: id,
Address: url.Ip,
Port: port,
Tags: tags,
Meta: meta,
Check: check,
}
return service, nil
}
func retrieveURL(service *consul.ServiceEntry) (common.URL, error) {
url, ok := service.Service.Meta["url"]
if !ok {
return common.URL{}, perrors.New("retrieve url fails with no url key in service meta")
}
url1, err := common.NewURL(context.Background(), url)
if err != nil {
return common.URL{}, perrors.WithStack(err)
}
return url1, nil
}
func in(url common.URL, urls []common.URL) bool {
for _, url1 := range urls {
if url.URLEqual(url1) {
return true
}
}
return false
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"strconv"
"sync"
"testing"
)
import (
"github.com/hashicorp/consul/agent"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
var (
registryHost = "localhost"
registryPort = 8500
providerHost = "localhost"
providerPort = 8000
consumerHost = "localhost"
consumerPort = 8001
service = "HelloWorld"
protocol = "tcp"
)
func newProviderRegistryUrl(host string, port int) *common.URL {
url1 := common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
)
return url1
}
func newConsumerRegistryUrl(host string, port int) *common.URL {
url1 := common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)),
)
return url1
}
func newProviderUrl(host string, port int, service string, protocol string) common.URL {
url1 := common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithPath(service),
common.WithProtocol(protocol),
)
return *url1
}
func newConsumerUrl(host string, port int, service string, protocol string) common.URL {
url1 := common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithPath(service),
common.WithProtocol(protocol),
)
return *url1
}
type testConsulAgent struct {
dataDir string
testAgent *agent.TestAgent
}
func newConsulAgent(t *testing.T, port int) *testConsulAgent {
dataDir, _ := ioutil.TempDir("./", "agent")
hcl := `
ports {
http = ` + strconv.Itoa(port) + `
}
data_dir = "` + dataDir + `"
`
testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
testAgent.Start(t)
consulAgent := &testConsulAgent{
dataDir: dataDir,
testAgent: testAgent,
}
return consulAgent
}
func (consulAgent *testConsulAgent) close() {
consulAgent.testAgent.Shutdown()
os.RemoveAll(consulAgent.dataDir)
}
type testServer struct {
listener net.Listener
wg sync.WaitGroup
done chan struct{}
}
func newServer(host string, port int) *testServer {
addr := fmt.Sprintf("%s:%d", host, port)
tcpAddr, _ := net.ResolveTCPAddr("tcp", addr)
listener, _ := net.ListenTCP("tcp", tcpAddr)
server := &testServer{
listener: listener,
done: make(chan struct{}),
}
server.wg.Add(1)
go server.serve()
return server
}
func (server *testServer) serve() {
defer server.wg.Done()
for {
select {
case <-server.done:
return
default:
conn, err := server.listener.Accept()
if err != nil {
continue
}
conn.Write([]byte("Hello World"))
conn.Close()
}
}
}
func (server *testServer) close() {
close(server.done)
server.listener.Close()
server.wg.Wait()
}
type consulRegistryTestSuite struct {
t *testing.T
providerRegistry registry.Registry
consumerRegistry registry.Registry
listener registry.Listener
providerUrl common.URL
consumerUrl common.URL
}
func newConsulRegistryTestSuite(t *testing.T) *consulRegistryTestSuite {
suite := &consulRegistryTestSuite{t: t}
return suite
}
func (suite *consulRegistryTestSuite) close() {
suite.listener.Close()
suite.providerRegistry.Destroy()
suite.consumerRegistry.Destroy()
}
// register -> subscribe -> unregister
func test1(t *testing.T) {
consulAgent := newConsulAgent(t, registryPort)
defer consulAgent.close()
server := newServer(providerHost, providerPort)
defer server.close()
suite := newConsulRegistryTestSuite(t)
defer suite.close()
suite.testNewProviderRegistry()
suite.testRegister()
suite.testNewConsumerRegistry()
suite.testSubscribe()
suite.testListener(remoting.EventTypeAdd)
suite.testUnregister()
suite.testListener(remoting.EventTypeDel)
}
// subscribe -> register
func test2(t *testing.T) {
consulAgent := newConsulAgent(t, registryPort)
defer consulAgent.close()
server := newServer(providerHost, providerPort)
defer server.close()
suite := newConsulRegistryTestSuite(t)
defer suite.close()
suite.testNewConsumerRegistry()
suite.testSubscribe()
suite.testNewProviderRegistry()
suite.testRegister()
suite.testListener(remoting.EventTypeAdd)
}
func TestConsulRegistry(t *testing.T) {
t.Run("test1", test1)
t.Run("test2", test2)
}
......@@ -114,16 +114,13 @@ func (dir *registryDirectory) Subscribe(url common.URL) {
}
}
//subscribe service from registry , and update the cacheServices
//subscribe service from registry, and update the cacheServices
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
}
logger.Debugf("registry update, result{%s}", res)
logger.Debugf("update service name: %s!", res.Service)
dir.refreshInvokers(res)
}
......@@ -149,7 +146,6 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
}
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
newInvokersList := []protocol.Invoker{}
groupInvokersMap := make(map[string][]protocol.Invoker)
groupInvokersList := []protocol.Invoker{}
......
......@@ -147,7 +147,7 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error)
for i := range updateInstances {
newUrl := generateUrl(updateInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate})
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeUpdate})
}
}
}
......
......@@ -100,7 +100,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EvnetTypeUpdate,
Action: remoting.EventTypeUpdate,
Content: string(event.Kv.Value),
})
}
......
......@@ -46,7 +46,7 @@ type EventType int
const (
EventTypeAdd = iota
EventTypeDel
EvnetTypeUpdate
EventTypeUpdate
)
var serviceEventTypeStrings = [...]string{
......
......@@ -47,9 +47,11 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
pathMap: make(map[string]struct{}),
}
}
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
......@@ -70,9 +72,8 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remo
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
content, _, _ := l.client.Conn.Get(zkEvent.Path)
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EvnetTypeUpdate, Content: string(content)})
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)})
}
case zk.EventNodeCreated:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
if len(listener) > 0 {
......@@ -100,7 +101,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
return true
}
}
return false
}
......@@ -130,14 +130,14 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
continue
}
// listen l service node
go func(node string) {
go func(node string, zkPath string, listener remoting.DataListener) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", node)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
}(newNode, zkPath, listener)
}
// old node was deleted
......@@ -205,11 +205,10 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
}
failTimes = 0
for _, c := range children {
// listen l service node
dubboPath := path.Join(zkPath, c)
//Save the path to avoid listen repeatly
//Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
l.pathMapLock.Unlock()
......@@ -231,14 +230,14 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
go func(zkPath string, listener remoting.DataListener) {
if l.ListenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
}(dubboPath, listener)
//liten sub path recursive
//listen sub path recursive
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
......@@ -263,7 +262,7 @@ func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// this func is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
......@@ -294,7 +293,6 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da
}
for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
......@@ -305,12 +303,12 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
go func(zkPath string, listener remoting.DataListener) {
if l.ListenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
}(dubboPath, listener)
}
logger.Infof("listen dubbo path{%s}", zkPath)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment