Skip to content
Snippets Groups Projects
Unverified Commit 94ddbacf authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #22 from hxmhlt/feature/registry_upgrade

upgrade registry interface definition & fix some bug
parents 3285cb40 ffca25a0
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,8 @@ import (
"github.com/dubbo/dubbo-go/registry"
)
const RegistryConnDelay = 3
type Options struct {
ServiceTTL time.Duration
selector selector.Selector
......@@ -74,25 +76,38 @@ func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) {
cacheServiceMap: make(map[string]*ServiceArray),
registry: registry,
}
invoker.Listen()
go invoker.listen()
return invoker, nil
}
func (ivk *Invoker) Listen() {
go ivk.listen()
}
func (ivk *Invoker) listen() {
for {
ch := ivk.registry.GetListenEvent()
if ivk.registry.IsClosed() {
log.Warn("event listener game over.")
return
}
listener, err := ivk.registry.Subscribe()
if err != nil {
if ivk.registry.IsClosed() {
log.Warn("event listener game over.")
return
}
log.Warn("getListener() = err:%s", jerrors.ErrorStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
e, isOpen := <-ch
if !isOpen {
log.Warn("registry closed!")
break
if serviceEvent, err := listener.Next(); err != nil {
log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err))
listener.Close()
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
return
} else {
ivk.update(serviceEvent)
}
ivk.update(e)
}
}
......@@ -131,7 +146,7 @@ func (ivk *Invoker) update(res *registry.ServiceEvent) {
}
}
func (ivk *Invoker) getService(registryConf *registry.ServiceConfig) (*ServiceArray, error) {
func (ivk *Invoker) getService(registryConf registry.ServiceConfig) (*ServiceArray, error) {
defer ivk.listenerLock.Unlock()
registryKey := registryConf.Key()
......@@ -159,7 +174,7 @@ func (ivk *Invoker) getService(registryConf *registry.ServiceConfig) (*ServiceAr
return newSvcArr, nil
}
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf *registry.ServiceConfig, req jsonrpc.Request, resp interface{}) error {
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf registry.ServiceConfig, req jsonrpc.Request, resp interface{}) error {
registryArray, err := ivk.getService(registryConf)
if err != nil {
......@@ -180,7 +195,7 @@ func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf *reg
return nil
}
func (ivk *Invoker) DubboCall(reqId int64, registryConf *registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {
func (ivk *Invoker) DubboCall(reqId int64, registryConf registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {
registryArray, err := ivk.getService(registryConf)
if err != nil {
......@@ -201,3 +216,7 @@ func (ivk *Invoker) DubboCall(reqId int64, registryConf *registry.ServiceConfig,
log.Info("response result:%s", reply)
return nil
}
func (ivk *Invoker) Close() {
ivk.DubboClient.Close()
}
......@@ -57,7 +57,7 @@ func testDubborpc(clientConfig *examples.ClientConfig, userKey string) {
user = new(DubboUser)
defer clientInvoker.DubboClient.Close()
err = clientInvoker.DubboCall(1, &conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Dubbo))
err = clientInvoker.DubboCall(1, conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Dubbo))
// Call service
if err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
......
......@@ -40,7 +40,7 @@ func main() {
initProfiling(clientConfig)
initClient(clientConfig)
time.Sleep(3e9)
time.Sleep(10e9)
gxlog.CInfo("\n\n\nstart to test jsonrpc")
testJsonrpc(clientConfig, "A003", "GetUser")
......
......@@ -59,7 +59,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str
user = new(JsonRPCUser)
err = clientInvoker.HttpCall(ctx, 1, &conf, req, user)
err = clientInvoker.HttpCall(ctx, 1, conf, req, user)
if err != nil {
panic(err)
} else {
......
......@@ -23,8 +23,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dubbogo/hessian2 v0.0.0-20190330063706-e01b2c027961 h1:nGlTGXqzalnFtNqDsar2k/Gle/7pILm0MeH65fJSE7Y=
github.com/dubbogo/hessian2 v0.0.0-20190330063706-e01b2c027961/go.mod h1:v+gfInE8fm/k3Fjkb2oUCKSO9LKbWvf+PtweEI89BmI=
github.com/dubbogo/hessian2 v0.0.0-20190331022028-ade83b794bf2 h1:5kv4/4ptZTNcG2dzfHqXPiBHZcPPR3jshgxpHvlidew=
github.com/dubbogo/hessian2 v0.0.0-20190331022028-ade83b794bf2/go.mod h1:v+gfInE8fm/k3Fjkb2oUCKSO9LKbWvf+PtweEI89BmI=
github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI=
......
......@@ -11,11 +11,18 @@ type Registry interface {
RegisterProvider(ServiceConfigIf) error
//used for service consumer calling , register services cared about ,for dubbo's admin monitoring
RegisterConsumer(ServiceConfigIf) error
//used for service consumer ,start listen goroutine
GetListenEvent() chan *ServiceEvent
//used for service consumer ,start subscribe service event from registry
Subscribe() (Listener, error)
//input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available
GetService(*ServiceConfig) ([]*ServiceURL, error)
GetService(ServiceConfig) ([]*ServiceURL, error)
//close the registry for Elegant closing
Close()
//return if the registry is closed for consumer subscribing
IsClosed() bool
}
type Listener interface {
Next() (*ServiceEvent, error)
Close()
}
......@@ -2,7 +2,6 @@ package zookeeper
import (
"fmt"
"time"
)
import (
......@@ -48,18 +47,14 @@ func (r *ZkRegistry) RegisterConsumer(regConf registry.ServiceConfigIf) error {
listener = r.listener
r.listenerLock.Unlock()
if listener != nil {
go listener.listenServiceEvent(&conf)
go listener.listenServiceEvent(conf)
}
return nil
}
func (r *ZkRegistry) GetListenEvent() chan *registry.ServiceEvent {
return r.outerEventCh
}
// name: service@protocol
func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.ServiceURL, error) {
func (r *ZkRegistry) GetService(conf registry.ServiceConfig) ([]*registry.ServiceURL, error) {
var (
ok bool
err error
......@@ -68,7 +63,7 @@ func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.Servi
listener *zkEventListener
serviceURL *registry.ServiceURL
serviceConfIf registry.ServiceConfigIf
serviceConf *registry.ServiceConfig
serviceConf registry.ServiceConfig
)
r.listenerLock.Lock()
listener = r.listener
......@@ -84,7 +79,7 @@ func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.Servi
if !ok {
return nil, jerrors.Errorf("Service{%s} has not been registered", conf.Key())
}
serviceConf, ok = serviceConfIf.(*registry.ServiceConfig)
serviceConf, ok = serviceConfIf.(registry.ServiceConfig)
if !ok {
return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", conf.Key())
}
......@@ -129,45 +124,16 @@ func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.Servi
return services, nil
}
func (r *ZkRegistry) listen() {
defer r.wg.Done()
for {
if r.isClosed() {
log.Warn("event listener game over.")
return
}
listener, err := r.getListener()
if err != nil {
if r.isClosed() {
log.Warn("event listener game over.")
return
}
log.Warn("getListener() = err:%s", jerrors.ErrorStack(err))
time.Sleep(timeSecondDuration(RegistryConnDelay))
continue
}
if err = listener.listenEvent(r); err != nil {
log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err))
r.listenerLock.Lock()
r.listener = nil
r.listenerLock.Unlock()
listener.close()
time.Sleep(timeSecondDuration(RegistryConnDelay))
continue
}
}
func (r *ZkRegistry) Subscribe() (registry.Listener, error) {
r.wg.Add(1)
return r.getListener()
}
func (r *ZkRegistry) getListener() (*zkEventListener, error) {
var (
ok bool
zkListener *zkEventListener
serviceConf *registry.ServiceConfig
serviceConf registry.ServiceConfig
)
r.listenerLock.Lock()
......@@ -185,7 +151,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
}
// new client & listener
zkListener = newZkEventListener(client)
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = zkListener
......@@ -194,7 +160,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) {
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if serviceConf, ok = svs.(*registry.ServiceConfig); ok {
if serviceConf, ok = svs.(registry.ServiceConfig); ok {
go zkListener.listenServiceEvent(serviceConf)
}
}
......
......@@ -36,11 +36,13 @@ type zkEventListener struct {
serviceMapLock sync.Mutex
serviceMap map[string]struct{}
wg sync.WaitGroup
registry *ZkRegistry
}
func newZkEventListener(client *zookeeperClient) *zkEventListener {
func newZkEventListener(registry *ZkRegistry, client *zookeeperClient) *zkEventListener {
return &zkEventListener{
client: client,
registry: registry,
events: make(chan zkEvent, 32),
serviceMap: make(map[string]struct{}),
}
......@@ -80,7 +82,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *registry.ServiceConfig) {
func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf registry.ServiceConfig) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
......@@ -154,7 +156,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co
}
}
func (l *zkEventListener) listenDirEvent(zkPath string, conf *registry.ServiceConfig) {
func (l *zkEventListener) listenDirEvent(zkPath string, conf registry.ServiceConfig) {
l.wg.Add(1)
defer l.wg.Done()
......@@ -220,7 +222,7 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf *registry.ServiceCo
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *zkEventListener) listenServiceEvent(conf *registry.ServiceConfig) {
func (l *zkEventListener) listenServiceEvent(conf registry.ServiceConfig) {
var (
err error
zkPath string
......@@ -277,29 +279,27 @@ func (l *zkEventListener) listenServiceEvent(conf *registry.ServiceConfig) {
}
log.Info("listen dubbo path{%s}", zkPath)
go func(zkPath string, conf *registry.ServiceConfig) {
go func(zkPath string, conf registry.ServiceConfig) {
l.listenDirEvent(zkPath, conf)
log.Warn("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, conf)
}
func (l *zkEventListener) listenEvent(r *ZkRegistry) error {
func (l *zkEventListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.done():
log.Warn("listener's zk client connection is broken, so zk event listener exit now.")
l.close()
return jerrors.New("listener stopped")
return nil, jerrors.New("listener stopped")
case <-r.done:
case <-l.registry.done:
log.Warn("zk consumer register has quit, so zk event listener exit asap now.")
l.close()
return jerrors.New("listener stopped")
return nil, jerrors.New("listener stopped")
case e := <-l.events:
log.Debug("got zk event %s", e)
if e.err != nil {
return jerrors.Trace(e.err)
return nil, jerrors.Trace(e.err)
}
if e.res.Action == registry.ServiceDel && !l.valid() {
log.Warn("update @result{%s}. But its connection to registry is invalid", e.res)
......@@ -307,7 +307,8 @@ func (l *zkEventListener) listenEvent(r *ZkRegistry) error {
}
//r.update(e.res)
//write to invoker
r.outerEventCh <- e.res
//r.outerEventCh <- e.res
return e.res, nil
}
}
}
......@@ -316,7 +317,10 @@ func (l *zkEventListener) valid() bool {
return l.client.zkConnValid()
}
func (l *zkEventListener) close() {
func (l *zkEventListener) Close() {
l.registry.listenerLock.Lock()
l.client.Close()
l.registry.listenerLock.Unlock()
l.registry.wg.Done()
l.wg.Wait()
}
......@@ -85,8 +85,7 @@ type ZkRegistry struct {
listener *zkEventListener
//for provider
zkPath map[string]int // key = protocol://ip:port/interface
outerEventCh chan *registry.ServiceEvent
zkPath map[string]int // key = protocol://ip:port/interface
}
func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) {
......@@ -96,11 +95,10 @@ func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) {
)
r = &ZkRegistry{
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]registry.ServiceConfigIf),
zkPath: make(map[string]int),
outerEventCh: make(chan *registry.ServiceEvent),
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]registry.ServiceConfigIf),
zkPath: make(map[string]int),
}
for _, opt := range opts {
......@@ -134,10 +132,10 @@ func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) {
r.wg.Add(1)
go r.handleZkRestart()
if r.DubboType == registry.CONSUMER {
r.wg.Add(1)
go r.listen()
}
//if r.DubboType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
return r, nil
}
......@@ -417,11 +415,9 @@ func (r *ZkRegistry) closeRegisters() {
r.client.Close()
r.client = nil
r.services = nil
//关闭outerListenerEvent
close(r.outerEventCh)
}
func (r *ZkRegistry) isClosed() bool {
func (r *ZkRegistry) IsClosed() bool {
select {
case <-r.done:
return true
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment