diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go index beadea380d00edb8ef059824aef0cefc9354f368..728e42d08d898dbba8570e6cdadc31b2eecfcfcb 100644 --- a/client/invoker/invoker.go +++ b/client/invoker/invoker.go @@ -18,6 +18,8 @@ import ( "github.com/dubbo/dubbo-go/registry" ) +const RegistryConnDelay = 3 + type Options struct { ServiceTTL time.Duration selector selector.Selector @@ -74,25 +76,38 @@ func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) { cacheServiceMap: make(map[string]*ServiceArray), registry: registry, } - invoker.Listen() + go invoker.listen() return invoker, nil } -func (ivk *Invoker) Listen() { - go ivk.listen() -} - func (ivk *Invoker) listen() { for { - ch := ivk.registry.GetListenEvent() + if ivk.registry.IsClosed() { + log.Warn("event listener game over.") + return + } + + listener, err := ivk.registry.Subscribe() + if err != nil { + if ivk.registry.IsClosed() { + log.Warn("event listener game over.") + return + } + log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + continue + } for { - e, isOpen := <-ch - if !isOpen { - log.Warn("registry closed!") - break + if serviceEvent, err := listener.Next(); err != nil { + log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err)) + listener.Close() + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + return + } else { + ivk.update(serviceEvent) } - ivk.update(e) + } } @@ -131,7 +146,7 @@ func (ivk *Invoker) update(res *registry.ServiceEvent) { } } -func (ivk *Invoker) getService(registryConf *registry.ServiceConfig) (*ServiceArray, error) { +func (ivk *Invoker) getService(registryConf registry.ServiceConfig) (*ServiceArray, error) { defer ivk.listenerLock.Unlock() registryKey := registryConf.Key() @@ -159,7 +174,7 @@ func (ivk *Invoker) getService(registryConf *registry.ServiceConfig) (*ServiceAr return newSvcArr, nil } -func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf *registry.ServiceConfig, req jsonrpc.Request, resp interface{}) error { +func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf registry.ServiceConfig, req jsonrpc.Request, resp interface{}) error { registryArray, err := ivk.getService(registryConf) if err != nil { @@ -180,7 +195,7 @@ func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, registryConf *reg return nil } -func (ivk *Invoker) DubboCall(reqId int64, registryConf *registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { +func (ivk *Invoker) DubboCall(reqId int64, registryConf registry.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { registryArray, err := ivk.getService(registryConf) if err != nil { @@ -201,3 +216,7 @@ func (ivk *Invoker) DubboCall(reqId int64, registryConf *registry.ServiceConfig, log.Info("response result:%s", reply) return nil } + +func (ivk *Invoker) Close() { + ivk.DubboClient.Close() +} diff --git a/examples/dubbo/go-client/app/test.go b/examples/dubbo/go-client/app/test.go index e881e7d17552c490f53f99e9dcd0b380a942b9c5..1154da0975f18a52cccf73f0c0079e4ffd968850 100644 --- a/examples/dubbo/go-client/app/test.go +++ b/examples/dubbo/go-client/app/test.go @@ -57,7 +57,7 @@ func testDubborpc(clientConfig *examples.ClientConfig, userKey string) { user = new(DubboUser) defer clientInvoker.DubboClient.Close() - err = clientInvoker.DubboCall(1, &conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Dubbo)) + err = clientInvoker.DubboCall(1, conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Dubbo)) // Call service if err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index ed394b223787d9365c51b6f867f1704a92dd86d1..502e37c7615b90fdd8788be32df50c42027b6c00 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -40,7 +40,7 @@ func main() { initProfiling(clientConfig) initClient(clientConfig) - time.Sleep(3e9) + time.Sleep(10e9) gxlog.CInfo("\n\n\nstart to test jsonrpc") testJsonrpc(clientConfig, "A003", "GetUser") diff --git a/examples/jsonrpc/go-client/app/test.go b/examples/jsonrpc/go-client/app/test.go index ac0053e0ae8847be086c51925021028ba249c039..2c662c9e9d86ce266caa5e42e6f1216fe3c75478 100644 --- a/examples/jsonrpc/go-client/app/test.go +++ b/examples/jsonrpc/go-client/app/test.go @@ -59,7 +59,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str user = new(JsonRPCUser) - err = clientInvoker.HttpCall(ctx, 1, &conf, req, user) + err = clientInvoker.HttpCall(ctx, 1, conf, req, user) if err != nil { panic(err) } else { diff --git a/go.sum b/go.sum index 65abe2c5ec032af2cf72318250cbcf21f7452150..789f294efdcab00539c5fdf5bb336a112d1341d6 100644 --- a/go.sum +++ b/go.sum @@ -23,8 +23,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dubbogo/hessian2 v0.0.0-20190330063706-e01b2c027961 h1:nGlTGXqzalnFtNqDsar2k/Gle/7pILm0MeH65fJSE7Y= -github.com/dubbogo/hessian2 v0.0.0-20190330063706-e01b2c027961/go.mod h1:v+gfInE8fm/k3Fjkb2oUCKSO9LKbWvf+PtweEI89BmI= github.com/dubbogo/hessian2 v0.0.0-20190331022028-ade83b794bf2 h1:5kv4/4ptZTNcG2dzfHqXPiBHZcPPR3jshgxpHvlidew= github.com/dubbogo/hessian2 v0.0.0-20190331022028-ade83b794bf2/go.mod h1:v+gfInE8fm/k3Fjkb2oUCKSO9LKbWvf+PtweEI89BmI= github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI= diff --git a/registry/registry.go b/registry/registry.go index dc8bcea736e6308b577f87e0e8f46e820cb19850..0c07449de767892a6c24fd0d8230c683282ab799 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -11,11 +11,18 @@ type Registry interface { RegisterProvider(ServiceConfigIf) error //used for service consumer calling , register services cared about ,for dubbo's admin monitoring RegisterConsumer(ServiceConfigIf) error - //used for service consumer ,start listen goroutine - GetListenEvent() chan *ServiceEvent + //used for service consumer ,start subscribe service event from registry + Subscribe() (Listener, error) //input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available - GetService(*ServiceConfig) ([]*ServiceURL, error) + GetService(ServiceConfig) ([]*ServiceURL, error) + //close the registry for Elegant closing + Close() + //return if the registry is closed for consumer subscribing + IsClosed() bool +} +type Listener interface { + Next() (*ServiceEvent, error) Close() } diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go index 9f4e9201cfe5274ba20e8329927c880e41404bd5..38bc3873552fe560a91f1fc66de14a445681a178 100644 --- a/registry/zookeeper/consumer.go +++ b/registry/zookeeper/consumer.go @@ -2,7 +2,6 @@ package zookeeper import ( "fmt" - "time" ) import ( @@ -48,18 +47,14 @@ func (r *ZkRegistry) RegisterConsumer(regConf registry.ServiceConfigIf) error { listener = r.listener r.listenerLock.Unlock() if listener != nil { - go listener.listenServiceEvent(&conf) + go listener.listenServiceEvent(conf) } return nil } -func (r *ZkRegistry) GetListenEvent() chan *registry.ServiceEvent { - return r.outerEventCh -} - // name: service@protocol -func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.ServiceURL, error) { +func (r *ZkRegistry) GetService(conf registry.ServiceConfig) ([]*registry.ServiceURL, error) { var ( ok bool err error @@ -68,7 +63,7 @@ func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.Servi listener *zkEventListener serviceURL *registry.ServiceURL serviceConfIf registry.ServiceConfigIf - serviceConf *registry.ServiceConfig + serviceConf registry.ServiceConfig ) r.listenerLock.Lock() listener = r.listener @@ -84,7 +79,7 @@ func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.Servi if !ok { return nil, jerrors.Errorf("Service{%s} has not been registered", conf.Key()) } - serviceConf, ok = serviceConfIf.(*registry.ServiceConfig) + serviceConf, ok = serviceConfIf.(registry.ServiceConfig) if !ok { return nil, jerrors.Errorf("Service{%s}: failed to get serviceConfigIf type", conf.Key()) } @@ -129,45 +124,16 @@ func (r *ZkRegistry) GetService(conf *registry.ServiceConfig) ([]*registry.Servi return services, nil } -func (r *ZkRegistry) listen() { - defer r.wg.Done() - - for { - if r.isClosed() { - log.Warn("event listener game over.") - return - } - - listener, err := r.getListener() - if err != nil { - if r.isClosed() { - log.Warn("event listener game over.") - return - } - log.Warn("getListener() = err:%s", jerrors.ErrorStack(err)) - time.Sleep(timeSecondDuration(RegistryConnDelay)) - continue - } - if err = listener.listenEvent(r); err != nil { - log.Warn("Selector.watch() = error{%v}", jerrors.ErrorStack(err)) - - r.listenerLock.Lock() - r.listener = nil - r.listenerLock.Unlock() - - listener.close() - - time.Sleep(timeSecondDuration(RegistryConnDelay)) - continue - } - } +func (r *ZkRegistry) Subscribe() (registry.Listener, error) { + r.wg.Add(1) + return r.getListener() } func (r *ZkRegistry) getListener() (*zkEventListener, error) { var ( ok bool zkListener *zkEventListener - serviceConf *registry.ServiceConfig + serviceConf registry.ServiceConfig ) r.listenerLock.Lock() @@ -185,7 +151,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) { } // new client & listener - zkListener = newZkEventListener(client) + zkListener = newZkEventListener(r, client) r.listenerLock.Lock() r.listener = zkListener @@ -194,7 +160,7 @@ func (r *ZkRegistry) getListener() (*zkEventListener, error) { // listen r.cltLock.Lock() for _, svs := range r.services { - if serviceConf, ok = svs.(*registry.ServiceConfig); ok { + if serviceConf, ok = svs.(registry.ServiceConfig); ok { go zkListener.listenServiceEvent(serviceConf) } } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 145e2e146fbf7f0503abdc1fb76e9282438b9691..cb3b150e0e3b06912ddbf42b258caf2d1e8c75f8 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -36,11 +36,13 @@ type zkEventListener struct { serviceMapLock sync.Mutex serviceMap map[string]struct{} wg sync.WaitGroup + registry *ZkRegistry } -func newZkEventListener(client *zookeeperClient) *zkEventListener { +func newZkEventListener(registry *ZkRegistry, client *zookeeperClient) *zkEventListener { return &zkEventListener{ client: client, + registry: registry, events: make(chan zkEvent, 32), serviceMap: make(map[string]struct{}), } @@ -80,7 +82,7 @@ func (l *zkEventListener) listenServiceNodeEvent(zkPath string) bool { return false } -func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf *registry.ServiceConfig) { +func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, conf registry.ServiceConfig) { contains := func(s []string, e string) bool { for _, a := range s { if a == e { @@ -154,7 +156,7 @@ func (l *zkEventListener) handleZkNodeEvent(zkPath string, children []string, co } } -func (l *zkEventListener) listenDirEvent(zkPath string, conf *registry.ServiceConfig) { +func (l *zkEventListener) listenDirEvent(zkPath string, conf registry.ServiceConfig) { l.wg.Add(1) defer l.wg.Done() @@ -220,7 +222,7 @@ func (l *zkEventListener) listenDirEvent(zkPath string, conf *registry.ServiceCo // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | // --------> listenServiceNodeEvent -func (l *zkEventListener) listenServiceEvent(conf *registry.ServiceConfig) { +func (l *zkEventListener) listenServiceEvent(conf registry.ServiceConfig) { var ( err error zkPath string @@ -277,29 +279,27 @@ func (l *zkEventListener) listenServiceEvent(conf *registry.ServiceConfig) { } log.Info("listen dubbo path{%s}", zkPath) - go func(zkPath string, conf *registry.ServiceConfig) { + go func(zkPath string, conf registry.ServiceConfig) { l.listenDirEvent(zkPath, conf) log.Warn("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(zkPath, conf) } -func (l *zkEventListener) listenEvent(r *ZkRegistry) error { +func (l *zkEventListener) Next() (*registry.ServiceEvent, error) { for { select { case <-l.client.done(): log.Warn("listener's zk client connection is broken, so zk event listener exit now.") - l.close() - return jerrors.New("listener stopped") + return nil, jerrors.New("listener stopped") - case <-r.done: + case <-l.registry.done: log.Warn("zk consumer register has quit, so zk event listener exit asap now.") - l.close() - return jerrors.New("listener stopped") + return nil, jerrors.New("listener stopped") case e := <-l.events: log.Debug("got zk event %s", e) if e.err != nil { - return jerrors.Trace(e.err) + return nil, jerrors.Trace(e.err) } if e.res.Action == registry.ServiceDel && !l.valid() { log.Warn("update @result{%s}. But its connection to registry is invalid", e.res) @@ -307,7 +307,8 @@ func (l *zkEventListener) listenEvent(r *ZkRegistry) error { } //r.update(e.res) //write to invoker - r.outerEventCh <- e.res + //r.outerEventCh <- e.res + return e.res, nil } } } @@ -316,7 +317,10 @@ func (l *zkEventListener) valid() bool { return l.client.zkConnValid() } -func (l *zkEventListener) close() { +func (l *zkEventListener) Close() { + l.registry.listenerLock.Lock() l.client.Close() + l.registry.listenerLock.Unlock() + l.registry.wg.Done() l.wg.Wait() } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index a65f68d56db2b07cd2d0c5935db7142b7116c38f..84816644b265aa945a0fde822f7929339c0eb6d2 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -85,8 +85,7 @@ type ZkRegistry struct { listener *zkEventListener //for provider - zkPath map[string]int // key = protocol://ip:port/interface - outerEventCh chan *registry.ServiceEvent + zkPath map[string]int // key = protocol://ip:port/interface } func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { @@ -96,11 +95,10 @@ func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { ) r = &ZkRegistry{ - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]registry.ServiceConfigIf), - zkPath: make(map[string]int), - outerEventCh: make(chan *registry.ServiceEvent), + birth: time.Now().UnixNano(), + done: make(chan struct{}), + services: make(map[string]registry.ServiceConfigIf), + zkPath: make(map[string]int), } for _, opt := range opts { @@ -134,10 +132,10 @@ func NewZkRegistry(opts ...registry.RegistryOption) (registry.Registry, error) { r.wg.Add(1) go r.handleZkRestart() - if r.DubboType == registry.CONSUMER { - r.wg.Add(1) - go r.listen() - } + //if r.DubboType == registry.CONSUMER { + // r.wg.Add(1) + // go r.listen() + //} return r, nil } @@ -417,11 +415,9 @@ func (r *ZkRegistry) closeRegisters() { r.client.Close() r.client = nil r.services = nil - //å…³é—outerListenerEvent - close(r.outerEventCh) } -func (r *ZkRegistry) isClosed() bool { +func (r *ZkRegistry) IsClosed() bool { select { case <-r.done: return true