Skip to content
Snippets Groups Projects
Commit 5601b2cb authored by vito.he's avatar vito.he
Browse files

Mod: DataListener move to remoting

parent e016b8c2
No related branches found
No related tags found
No related merge requests found
......@@ -18,11 +18,9 @@
package config_center
import (
"github.com/apache/dubbo-go/remoting"
"time"
)
import (
"github.com/apache/dubbo-go/common"
)
//////////////////////////////////////////
// DynamicConfiguration
......@@ -31,8 +29,8 @@ const DEFAULT_GROUP = "dubbo"
const DEFAULT_CONFIG_TIMEOUT = "10s"
type DynamicConfiguration interface {
AddListener(string, common.ConfigurationListener, ...Option)
RemoveListener(string, common.ConfigurationListener, ...Option)
AddListener(string, remoting.ConfigurationListener, ...Option)
RemoveListener(string, remoting.ConfigurationListener, ...Option)
GetConfig(string, ...Option) string
GetConfigs(string, ...Option) string
}
......
......@@ -18,6 +18,7 @@
package zookeeper
import (
"github.com/apache/dubbo-go/remoting"
"sync"
)
import (
......@@ -61,11 +62,11 @@ func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConf
}
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
}
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
}
......
......@@ -18,6 +18,7 @@
package directory
import (
"github.com/apache/dubbo-go/remoting"
"sync"
"time"
)
......@@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case common.Add:
case remoting.Add:
//dir.cacheService.Add(res.Path, dir.serviceTTL)
dir.cacheInvoker(res.Service)
case common.Del:
case remoting.Del:
//dir.cacheService.Del(res.Path, dir.serviceTTL)
dir.uncacheInvoker(res.Service)
logger.Infof("selector delete service url{%s}", res.Service)
......
......@@ -19,6 +19,7 @@ package directory
import (
"context"
"github.com/apache/dubbo-go/remoting"
"net/url"
"strconv"
"testing"
......@@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: common.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
......@@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap))})
}
//for group2
......@@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
common.WithParams(urlmap2))})
}
......@@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
}
return registryDirectory, mockRegistry.(*registry.MockRegistry)
}
......@@ -19,6 +19,7 @@ package registry
import (
"fmt"
"github.com/apache/dubbo-go/remoting"
"math/rand"
"time"
)
......@@ -36,7 +37,7 @@ func init() {
//////////////////////////////////////////
type ServiceEvent struct {
Action common.EventType
Action remoting.EventType
Service common.URL
}
......
......@@ -19,6 +19,7 @@ package zookeeper
import (
"context"
"github.com/apache/dubbo-go/remoting"
)
import (
perrors "github.com/pkg/errors"
......@@ -42,7 +43,7 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
......@@ -50,7 +51,7 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
......@@ -61,14 +62,14 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *common.ConfigChangeEvent
events chan *remoting.ConfigChangeEvent
}
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.wg.Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *common.ConfigChangeEvent, 32)}
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
func (l *RegistryConfigurationListener) Process(configType *common.ConfigChangeEvent) {
func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) {
l.events <- configType
}
......@@ -85,7 +86,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.ConfigType == common.Del && !l.valid() {
if e.ConfigType == remoting.Del && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
continue
}
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package common
package remoting
import "fmt"
......
......@@ -18,6 +18,7 @@
package zookeeper
import (
"github.com/apache/dubbo-go/remoting"
"path"
"sync"
"time"
......@@ -83,7 +84,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener common.DataListener) {
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
......@@ -111,7 +112,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
// listen l service node
......@@ -119,7 +120,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete content{%s}", n)
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
......@@ -134,7 +135,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
continue
}
logger.Warnf("delete content{%s}", n)
......@@ -142,11 +143,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
}
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener common.DataListener) {
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
......@@ -216,7 +217,7 @@ func timeSecondDuration(sec int) time.Duration {
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.DataListener) {
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
......@@ -244,7 +245,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data
}
for _, c := range children {
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: c}) {
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) {
continue
}
......@@ -254,14 +255,14 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: c})
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener common.DataListener) {
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment