Skip to content
Snippets Groups Projects
Commit ad754714 authored by flycash's avatar flycash
Browse files

Merge develop

parents ba263095 e2033a8a
No related branches found
No related tags found
No related merge requests found
Showing
with 114 additions and 147 deletions
......@@ -176,5 +176,5 @@ About dubbo-go benchmarking report, please refer to [dubbo benchmarking report](
If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it helps you or want do some contributions to it, please add your company to to [the user list](https://github.com/apache/dubbo-go/issues/2) to let us know your needs.
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png)
![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
......@@ -175,5 +175,5 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之。
![ctrip](https://pic.c-ctrip.com/common/c_logo2013.png)
![Excellent Health Technology Group](https://raw.githubusercontent.com/dajiiu/photo/static/mirror/haozhuo_logo.png)
![Excellent Health Technology Group](https://user-images.githubusercontent.com/52339367/84628582-80512200-af1b-11ea-945a-c6b4b9ad31f2.png)
![tuya](https://raw.githubusercontent.com/pantianying/go-tool/master/picture/logo_2-removebg-preview.png)
......@@ -50,7 +50,7 @@ func SetAndInitGlobalDispatcher(name string) {
name = "direct"
}
if globalEventDispatcher != nil {
logger.Warnf("EventDispatcher already init. It will be replaced")
logger.Warnf("EventDispatcher has been initialized. It will be replaced")
}
if dp, ok := dispatchers[name]; !ok || dp == nil {
......
......@@ -42,10 +42,8 @@ var (
providerConfig *ProviderConfig
// baseConfig = providerConfig.BaseConfig or consumerConfig
baseConfig *BaseConfig
// baseConfigOnce is used to make sure that we only create it once.
baseConfigOnce sync.Once
// configAccessMutex is used to make sure that BaseConfig.xxxxConfig will only be created once if needed.
// configAccessMutex is used to make sure that xxxxConfig will only be created once if needed.
// it should be used combine with double-check to avoid the race condition
configAccessMutex sync.Mutex
......@@ -69,6 +67,8 @@ func init() {
log.Printf("[consumerInit] %#v", errCon)
consumerConfig = nil
} else {
// Even though baseConfig has been initialized, we override it
// because we think read from config file is correct config
baseConfig = &consumerConfig.BaseConfig
}
......@@ -76,6 +76,8 @@ func init() {
log.Printf("[providerInit] %#v", errPro)
providerConfig = nil
} else {
// Even though baseConfig has been initialized, we override it
// because we think read from config file is correct config
baseConfig = &providerConfig.BaseConfig
}
}
......@@ -308,7 +310,9 @@ func GetConsumerConfig() ConsumerConfig {
func GetBaseConfig() *BaseConfig {
if baseConfig == nil {
baseConfigOnce.Do(func() {
configAccessMutex.Lock()
defer configAccessMutex.Unlock()
if baseConfig == nil {
baseConfig = &BaseConfig{
MetricConfig: &MetricConfig{},
ConfigCenterConfig: &ConfigCenterConfig{},
......@@ -316,7 +320,7 @@ func GetBaseConfig() *BaseConfig {
ApplicationConfig: &ApplicationConfig{},
ServiceDiscoveries: make(map[string]*ServiceDiscoveryConfig, 0),
}
})
}
}
return baseConfig
}
......
......@@ -319,9 +319,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC
conn, err := p.get()
if err == nil && conn == nil {
// create new conn
var rpcClientConn *gettyRPCClient
rpcClientConn, err = newGettyRPCClientConn(p, protocol, addr)
return rpcClientConn, perrors.WithStack(err)
conn, err = newGettyRPCClientConn(p, protocol, addr)
}
return conn, perrors.WithStack(err)
}
......
......@@ -57,17 +57,17 @@ type etcdV3Registry struct {
configListener *configurationListener
}
// Client get the etcdv3 client
// Client gets the etcdv3 client
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
//SetClient set the etcdv3 client
// SetClient sets the etcdv3 client
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
//
// ClientLock returns lock for client
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
......
......@@ -47,13 +47,12 @@ const (
)
var (
// ErrNilETCDV3Client ...
// Defines related errors
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
// ErrKVPairNotFound ...
ErrKVPairNotFound = perrors.New("k/v pair not found")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
// Options ...
// nolint
type Options struct {
name string
endpoints []string
......@@ -62,38 +61,38 @@ type Options struct {
heartbeat int // heartbeat second
}
// Option ...
// Option will define a function of handling Options
type Option func(*Options)
// WithEndpoints ...
// WithEndpoints sets etcd client endpoints
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
}
}
// WithName ...
// WithName sets etcd client name
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
}
}
// WithTimeout ...
// WithTimeout sets etcd client timeout
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.timeout = timeout
}
}
// WithHeartbeat ...
// WithHeartbeat sets etcd client heartbeat
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.heartbeat = heartbeat
}
}
// ValidateClient ...
// ValidateClient validates client and sets options
func ValidateClient(container clientFacade, opts ...Option) error {
options := &Options{
......@@ -133,7 +132,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
return nil
}
// Client ...
// Client represents etcd client Configuration
type Client struct {
lock sync.RWMutex
......@@ -144,7 +143,7 @@ type Client struct {
heartbeat int
ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
rawClient *clientv3.Client
exit chan struct{}
......@@ -208,7 +207,7 @@ func (c *Client) stop() bool {
return false
}
// Close ...
// nolint
func (c *Client) Close() {
if c == nil {
......@@ -267,8 +266,7 @@ func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
}
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, return nil
// if k not exist will put k/v in etcd, otherwise return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
......@@ -327,7 +325,7 @@ func (c *Client) get(k string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
// CleanKV ...
// nolint
func (c *Client) CleanKV() error {
c.lock.RLock()
......@@ -427,12 +425,12 @@ func (c *Client) keepAliveKV(k string, v string) error {
return nil
}
// Done ...
// nolint
func (c *Client) Done() <-chan struct{} {
return c.exit
}
// Valid ...
// nolint
func (c *Client) Valid() bool {
select {
case <-c.exit:
......@@ -449,7 +447,7 @@ func (c *Client) Valid() bool {
return true
}
// Create ...
// nolint
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
......@@ -459,7 +457,7 @@ func (c *Client) Create(k string, v string) error {
return nil
}
// Delete ...
// nolint
func (c *Client) Delete(k string) error {
err := c.delete(k)
......@@ -470,7 +468,7 @@ func (c *Client) Delete(k string) error {
return nil
}
// RegisterTemp ...
// RegisterTemp registers a temporary node
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
completeKey := path.Join(basePath, node)
......@@ -483,7 +481,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
return completeKey, nil
}
// GetChildrenKVList ...
// GetChildrenKVList gets children kv list by @k
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
......@@ -493,7 +491,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
return kList, vList, nil
}
// Get ...
// Get gets value by @k
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
......@@ -504,7 +502,7 @@ func (c *Client) Get(k string) (string, error) {
return v, nil
}
// Watch ...
// Watch watches on spec key
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
......@@ -514,7 +512,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
return wc, nil
}
// WatchWithPrefix ...
// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
......
......@@ -43,7 +43,7 @@ type clientFacade interface {
common.Node
}
// HandleClientRestart ...
// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r clientFacade) {
var (
......
......@@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// EventListener ...
// nolint
type EventListener struct {
client *Client
keyMapLock sync.Mutex
......@@ -41,7 +41,7 @@ type EventListener struct {
wg sync.WaitGroup
}
// NewEventListener ...
// NewEventListener returns a EventListener instance
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
......@@ -92,12 +92,10 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
}
}
}
return false
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
// return true means the event type is DELETE
// return false means the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {
logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key)
......@@ -135,7 +133,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
panic("unreachable")
}
// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
// ListenServiceNodeEventWithPrefix listens on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
defer l.wg.Done()
for {
......@@ -151,12 +149,12 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
logger.Warnf("etcd client stopped")
return
// client ctx stop
// client ctx stop
case <-l.client.ctx.Done():
logger.Warnf("etcd client ctx cancel")
return
// etcd event stream
// etcd event stream
case e, ok := <-wc:
if !ok {
......@@ -230,7 +228,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
}(key)
}
// Close ...
// nolint
func (l *EventListener) Close() {
l.wg.Wait()
}
......@@ -46,8 +46,7 @@ type Client struct {
controller *dubboRegistryController
}
// newClient
// new a client for registry
// newClient returns Client instance for registry
func newClient(url common.URL) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
......@@ -75,8 +74,7 @@ func newClient(url common.URL) (*Client, error) {
return c, nil
}
// Create
// create k/v pair in watcher-set
// Create creates k/v pair in watcher-set
func (c *Client) Create(k, v string) error {
// the read current pod must be lock, protect every
......@@ -92,8 +90,7 @@ func (c *Client) Create(k, v string) error {
return nil
}
// GetChildren
// get k children list from kubernetes-watcherSet
// GetChildren gets k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.controller.watcherSet.Get(k, true)
......@@ -112,8 +109,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
return kList, vList, nil
}
// Watch
// watch on spec key
// Watch watches on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.controller.watcherSet.Watch(k, false)
......@@ -124,8 +120,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error)
return w.ResultChan(), w.done(), nil
}
// Watch
// watch on spec prefix
// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.controller.watcherSet.Watch(prefix, true)
......@@ -136,9 +131,7 @@ func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan st
return w.ResultChan(), w.done(), nil
}
// Valid
// Valid the client
// if return false, the client is die
// if returns false, the client is die
func (c *Client) Valid() bool {
select {
......@@ -151,14 +144,12 @@ func (c *Client) Valid() bool {
return c.controller != nil
}
// Done
// read the client status
// nolint
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
// Stop
// read the client status
// nolint
func (c *Client) Close() {
select {
......@@ -174,8 +165,7 @@ func (c *Client) Close() {
// so, just wait
}
// ValidateClient
// validate the kubernetes client
// ValidateClient validates the kubernetes client
func ValidateClient(container clientFacade) error {
client := container.Client()
......@@ -194,8 +184,7 @@ func ValidateClient(container clientFacade) error {
return nil
}
// NewMockClient
// export for registry package test
// NewMockClient exports for registry package test
func NewMockClient(podList *v1.PodList) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -45,8 +45,8 @@ func NewEventListener(client *Client) *EventListener {
}
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
// this method returns true when spec key deleted,
// this method returns false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
for {
......@@ -83,8 +83,8 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
}
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
// return true means the event type is DELETE
// return false means the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key)
......
......@@ -72,8 +72,7 @@ var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
// dubboRegistryController
// work like a kubernetes controller
// dubboRegistryController works like a kubernetes controller
type dubboRegistryController struct {
// clone from client
......@@ -364,8 +363,7 @@ func (c *dubboRegistryController) processNextWorkItem() bool {
return true
}
// handleWatchedPodEvent
// handle watched pod event
// handleWatchedPodEvent handles watched pod event
func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName())
......@@ -402,8 +400,7 @@ func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType wat
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
// unmarshalRecord unmarshals the kubernetes dubbo annotation value
func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
......@@ -453,8 +450,7 @@ func (c *dubboRegistryController) initCurrentPod() error {
return nil
}
// patch current pod
// write new meta for current pod
// patchCurrentPod writes new meta for current pod
func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
if err != nil {
......@@ -463,7 +459,7 @@ func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error)
return updatedPod, nil
}
// assemble the dubbo kubernetes label
// assembleDUBBOLabel assembles the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
var (
......@@ -498,7 +494,7 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po
return oldPod, newPod, nil
}
// assemble the dubbo kubernetes annotations
// assembleDUBBOAnnotations assembles the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
......@@ -528,8 +524,7 @@ func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentP
return
}
// getPatch
// get the kubernetes pod patch bytes
// getPatch gets the kubernetes pod patch bytes
func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
......@@ -548,8 +543,7 @@ func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, erro
return patchBytes, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
// marshalRecord marshals the kubernetes dubbo annotation value
func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
......@@ -558,7 +552,7 @@ func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, err
return base64.URLEncoding.EncodeToString(msg), nil
}
// read from kubernetes-env current pod status
// readCurrentPod reads from kubernetes-env current pod status
func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
if err != nil {
......@@ -567,7 +561,7 @@ func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
return currentPod, nil
}
// add annotation for current pod
// addAnnotationForCurrentPod adds annotation for current pod
func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {
c.lock.Lock()
......
......@@ -140,14 +140,12 @@ func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
// get the watcher-set status
// Done gets the watcher-set status
func (s *watcherSetImpl) Done() <-chan struct{} {
return s.ctx.Done()
}
// Put
// put the watch event to watcher-set
// Put puts the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
blockSendMsg := func(object *WatcherEvent, w *watcher) {
......@@ -243,8 +241,7 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
return w, nil
}
// Get
// get elements from watcher-set
// Get gets elements from watcher-set
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
......@@ -297,19 +294,17 @@ type watcher struct {
exit chan struct{}
}
// ResultChan
// nolint
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}
// ID
// the watcher's id
// nolint
func (w *watcher) ID() string {
return strconv.FormatUint(w.id, 10)
}
// stop
// stop the watcher
// nolint
func (w *watcher) stop() {
// double close will panic
......@@ -318,14 +313,12 @@ func (w *watcher) stop() {
})
}
// done
// check watcher status
// done checks watcher status
func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newWatcherSet
// new watcher set from parent context
// newWatcherSet returns new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
ctx: ctx,
......
......@@ -21,7 +21,7 @@ import (
"fmt"
)
// DataListener ...
// DataListener defines common data listener interface
type DataListener interface {
DataChange(eventType Event) bool //bool is return for interface implement is interesting
}
......@@ -30,15 +30,15 @@ type DataListener interface {
// event type
//////////////////////////////////////////
// SourceObjectEventType ...
// EventType means SourceObjectEventType
type EventType int
const (
// EventTypeAdd ...
// EventTypeAdd means add event
EventTypeAdd = iota
// EventTypeDel ...
// EventTypeDel means del event
EventTypeDel
// EventTypeUpdate ...
// EventTypeUpdate means update event
EventTypeUpdate
)
......@@ -56,7 +56,7 @@ func (t EventType) String() string {
// service event
//////////////////////////////////////////
// Event ...
// Event defines common elements for service event
type Event struct {
Path string
Action EventType
......
......@@ -47,7 +47,7 @@ var (
errNilNode = perrors.Errorf("node does not exist")
)
// ZookeeperClient ...
// ZookeeperClient represents zookeeper client Configuration
type ZookeeperClient struct {
name string
ZkAddrs []string
......@@ -59,7 +59,7 @@ type ZookeeperClient struct {
eventRegistry map[string][]*chan struct{}
}
// StateToString ...
// nolint
func StateToString(state zk.State) string {
switch state {
case zk.StateDisconnected:
......@@ -89,7 +89,7 @@ func StateToString(state zk.State) string {
}
}
// Options ...
// nolint
type Options struct {
zkName string
client *ZookeeperClient
......@@ -97,17 +97,17 @@ type Options struct {
ts *zk.TestCluster
}
// Option ...
// Option will define a function of handling Options
type Option func(*Options)
// WithZkName ...
// WithZkName sets zk client name
func WithZkName(name string) Option {
return func(opt *Options) {
opt.zkName = name
}
}
// ValidateZookeeperClient ...
// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var err error
options := &Options{}
......@@ -187,14 +187,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
return z, nil
}
// WithTestCluster ...
// WithTestCluster sets test cluser for zk client
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
}
}
// NewMockZookeeperClient ...
// NewMockZookeeperClient returns a mock client instance
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
var (
err error
......@@ -226,21 +226,15 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
}
}
//callbackChan := make(chan zk.Event)
//f := func(event zk.Event) {
// callbackChan <- event
//}
z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
//z.wait.Add(1)
return ts, z, event, nil
}
// HandleZkEvent ...
// HandleZkEvent handles zookeeper events
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
......@@ -301,7 +295,7 @@ LOOP:
}
}
// RegisterEvent ...
// RegisterEvent registers zookeeper events
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
return
......@@ -316,7 +310,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
// UnregisterEvent ...
// UnregisterEvent unregisters zookeeper events
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
......@@ -343,7 +337,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
}
}
// Done ...
// nolint
func (z *ZookeeperClient) Done() <-chan struct{} {
return z.exit
}
......@@ -359,7 +353,7 @@ func (z *ZookeeperClient) stop() bool {
return false
}
// ZkConnValid ...
// ZkConnValid validates zookeeper connection
func (z *ZookeeperClient) ZkConnValid() bool {
select {
case <-z.exit:
......@@ -377,7 +371,7 @@ func (z *ZookeeperClient) ZkConnValid() bool {
return valid
}
// Close ...
// nolint
func (z *ZookeeperClient) Close() {
if z == nil {
return
......@@ -436,7 +430,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
return nil
}
// Delete ...
// nolint
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
......@@ -451,7 +445,7 @@ func (z *ZookeeperClient) Delete(basePath string) error {
return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}
// RegisterTemp ...
// RegisterTemp registers temporary node by @basePath and @node
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
......@@ -468,7 +462,6 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
return zkPath, perrors.WithStack(err)
......@@ -478,7 +471,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
return tmpPath, nil
}
// RegisterTempSeq ...
// RegisterTempSeq register temporary sequence node by @basePath and @data
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
var (
err error
......@@ -507,7 +500,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
return tmpPath, nil
}
// GetChildrenW ...
// GetChildrenW gets children watch by @path
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
var (
err error
......@@ -542,7 +535,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
return children, watcher.EvtCh, nil
}
// GetChildren ...
// GetChildren gets children by @path
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
var (
err error
......@@ -573,7 +566,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return children, nil
}
// ExistW ...
// ExistW to judge watch whether it exists or not by @zkPath
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
var (
exist bool
......@@ -599,7 +592,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
return watcher.EvtCh, nil
}
// GetContent ...
// GetContent gets content by @zkPath
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
......
......@@ -40,7 +40,7 @@ type zkClientFacade interface {
common.Node
}
// HandleClientRestart ...
// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r zkClientFacade) {
var (
err error
......
......@@ -37,7 +37,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// ZkEventListener ...
// nolint
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
......@@ -45,7 +45,7 @@ type ZkEventListener struct {
wg sync.WaitGroup
}
// NewZkEventListener ...
// NewZkEventListener returns a EventListener instance
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
......@@ -53,12 +53,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
}
}
// SetClient ...
// nolint
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
// ListenServiceNodeEvent ...
// nolint
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
var zkEvent zk.Event
......
......@@ -25,7 +25,7 @@ import (
import (
hessian "github.com/apache/dubbo-go-hessian2"
_ "github.com/apache/dubbo-go/event/proxy/proxy_factory"
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config"
_ "github.com/apache/dubbo-go/protocol/dubbo"
_ "github.com/apache/dubbo-go/registry/protocol"
......
......@@ -25,7 +25,7 @@ import (
hessian "github.com/apache/dubbo-go-hessian2"
_ "github.com/apache/dubbo-go/cluster/cluster_impl"
_ "github.com/apache/dubbo-go/cluster/loadbalance"
_ "github.com/apache/dubbo-go/event/proxy/proxy_factory"
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config"
_ "github.com/apache/dubbo-go/filter/filter_impl"
_ "github.com/apache/dubbo-go/protocol/dubbo"
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment