Skip to content
Snippets Groups Projects
Unverified Commit 1f667729 authored by Ming Deng's avatar Ming Deng Committed by GitHub
Browse files

Merge pull request #605 from watermelo/featue/addCommentForRemoting

Mod: add comments for remoting
parents 93a655f4 75375359
No related branches found
No related tags found
No related merge requests found
......@@ -57,17 +57,17 @@ type etcdV3Registry struct {
configListener *configurationListener
}
// Client get the etcdv3 client
// Client gets the etcdv3 client
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
//SetClient set the etcdv3 client
// SetClient sets the etcdv3 client
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
//
// ClientLock returns lock for client
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
......
......@@ -45,13 +45,12 @@ const (
)
var (
// ErrNilETCDV3Client ...
// Defines related errors
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
// ErrKVPairNotFound ...
ErrKVPairNotFound = perrors.New("k/v pair not found")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
// Options ...
// nolint
type Options struct {
name string
endpoints []string
......@@ -60,38 +59,38 @@ type Options struct {
heartbeat int // heartbeat second
}
// Option ...
// Option will define a function of handling Options
type Option func(*Options)
// WithEndpoints ...
// WithEndpoints sets etcd client endpoints
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
}
}
// WithName ...
// WithName sets etcd client name
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
}
}
// WithTimeout ...
// WithTimeout sets etcd client timeout
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.timeout = timeout
}
}
// WithHeartbeat ...
// WithHeartbeat sets etcd client heartbeat
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.heartbeat = heartbeat
}
}
// ValidateClient ...
// ValidateClient validates client and sets options
func ValidateClient(container clientFacade, opts ...Option) error {
options := &Options{
......@@ -131,7 +130,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
return nil
}
// Client ...
// Client represents etcd client Configuration
type Client struct {
lock sync.RWMutex
......@@ -142,7 +141,7 @@ type Client struct {
heartbeat int
ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
rawClient *clientv3.Client
exit chan struct{}
......@@ -206,7 +205,7 @@ func (c *Client) stop() bool {
return false
}
// Close ...
// nolint
func (c *Client) Close() {
if c == nil {
......@@ -265,8 +264,7 @@ func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
}
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, return nil
// if k not exist will put k/v in etcd, otherwise return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
......@@ -325,7 +323,7 @@ func (c *Client) get(k string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
// CleanKV ...
// nolint
func (c *Client) CleanKV() error {
c.lock.RLock()
......@@ -425,12 +423,12 @@ func (c *Client) keepAliveKV(k string, v string) error {
return nil
}
// Done ...
// nolint
func (c *Client) Done() <-chan struct{} {
return c.exit
}
// Valid ...
// nolint
func (c *Client) Valid() bool {
select {
case <-c.exit:
......@@ -447,7 +445,7 @@ func (c *Client) Valid() bool {
return true
}
// Create ...
// nolint
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
......@@ -457,7 +455,7 @@ func (c *Client) Create(k string, v string) error {
return nil
}
// Delete ...
// nolint
func (c *Client) Delete(k string) error {
err := c.delete(k)
......@@ -468,7 +466,7 @@ func (c *Client) Delete(k string) error {
return nil
}
// RegisterTemp ...
// RegisterTemp registers a temporary node
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
completeKey := path.Join(basePath, node)
......@@ -481,7 +479,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
return completeKey, nil
}
// GetChildrenKVList ...
// GetChildrenKVList gets children kv list by @k
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
......@@ -491,7 +489,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
return kList, vList, nil
}
// Get ...
// Get gets value by @k
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
......@@ -502,7 +500,7 @@ func (c *Client) Get(k string) (string, error) {
return v, nil
}
// Watch ...
// Watch watches on spec key
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
......@@ -512,7 +510,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
return wc, nil
}
// WatchWithPrefix ...
// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
......
......@@ -43,7 +43,7 @@ type clientFacade interface {
common.Node
}
// HandleClientRestart ...
// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r clientFacade) {
var (
......
......@@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// EventListener ...
// nolint
type EventListener struct {
client *Client
keyMapLock sync.Mutex
......@@ -41,7 +41,7 @@ type EventListener struct {
wg sync.WaitGroup
}
// NewEventListener ...
// NewEventListener returns a EventListener instance
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
......@@ -92,12 +92,10 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
}
}
}
return false
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
// return true means the event type is DELETE
// return false means the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {
logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key)
......@@ -135,7 +133,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
panic("unreachable")
}
// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
// ListenServiceNodeEventWithPrefix listens on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
defer l.wg.Done()
for {
......@@ -151,12 +149,12 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
logger.Warnf("etcd client stopped")
return
// client ctx stop
// client ctx stop
case <-l.client.ctx.Done():
logger.Warnf("etcd client ctx cancel")
return
// etcd event stream
// etcd event stream
case e, ok := <-wc:
if !ok {
......@@ -230,7 +228,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
}(key)
}
// Close ...
// nolint
func (l *EventListener) Close() {
l.wg.Wait()
}
......@@ -46,8 +46,7 @@ type Client struct {
controller *dubboRegistryController
}
// newClient
// new a client for registry
// newClient returns Client instance for registry
func newClient(url common.URL) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
......@@ -75,8 +74,7 @@ func newClient(url common.URL) (*Client, error) {
return c, nil
}
// Create
// create k/v pair in watcher-set
// Create creates k/v pair in watcher-set
func (c *Client) Create(k, v string) error {
// the read current pod must be lock, protect every
......@@ -92,8 +90,7 @@ func (c *Client) Create(k, v string) error {
return nil
}
// GetChildren
// get k children list from kubernetes-watcherSet
// GetChildren gets k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.controller.watcherSet.Get(k, true)
......@@ -112,8 +109,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
return kList, vList, nil
}
// Watch
// watch on spec key
// Watch watches on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.controller.watcherSet.Watch(k, false)
......@@ -124,8 +120,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error)
return w.ResultChan(), w.done(), nil
}
// Watch
// watch on spec prefix
// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.controller.watcherSet.Watch(prefix, true)
......@@ -136,9 +131,7 @@ func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan st
return w.ResultChan(), w.done(), nil
}
// Valid
// Valid the client
// if return false, the client is die
// if returns false, the client is die
func (c *Client) Valid() bool {
select {
......@@ -151,14 +144,12 @@ func (c *Client) Valid() bool {
return c.controller != nil
}
// Done
// read the client status
// nolint
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
// Stop
// read the client status
// nolint
func (c *Client) Close() {
select {
......@@ -174,8 +165,7 @@ func (c *Client) Close() {
// so, just wait
}
// ValidateClient
// validate the kubernetes client
// ValidateClient validates the kubernetes client
func ValidateClient(container clientFacade) error {
client := container.Client()
......@@ -194,8 +184,7 @@ func ValidateClient(container clientFacade) error {
return nil
}
// NewMockClient
// export for registry package test
// NewMockClient exports for registry package test
func NewMockClient(podList *v1.PodList) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -45,8 +45,8 @@ func NewEventListener(client *Client) *EventListener {
}
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
// this method returns true when spec key deleted,
// this method returns false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
for {
......@@ -83,8 +83,8 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
}
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
// return true means the event type is DELETE
// return false means the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key)
......
......@@ -72,8 +72,7 @@ var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
// dubboRegistryController
// work like a kubernetes controller
// dubboRegistryController works like a kubernetes controller
type dubboRegistryController struct {
// clone from client
......@@ -364,8 +363,7 @@ func (c *dubboRegistryController) processNextWorkItem() bool {
return true
}
// handleWatchedPodEvent
// handle watched pod event
// handleWatchedPodEvent handles watched pod event
func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName())
......@@ -402,8 +400,7 @@ func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType wat
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
// unmarshalRecord unmarshals the kubernetes dubbo annotation value
func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
......@@ -453,8 +450,7 @@ func (c *dubboRegistryController) initCurrentPod() error {
return nil
}
// patch current pod
// write new meta for current pod
// patchCurrentPod writes new meta for current pod
func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
if err != nil {
......@@ -463,7 +459,7 @@ func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error)
return updatedPod, nil
}
// assemble the dubbo kubernetes label
// assembleDUBBOLabel assembles the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
var (
......@@ -498,7 +494,7 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po
return oldPod, newPod, nil
}
// assemble the dubbo kubernetes annotations
// assembleDUBBOAnnotations assembles the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
......@@ -528,8 +524,7 @@ func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentP
return
}
// getPatch
// get the kubernetes pod patch bytes
// getPatch gets the kubernetes pod patch bytes
func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
......@@ -548,8 +543,7 @@ func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, erro
return patchBytes, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
// marshalRecord marshals the kubernetes dubbo annotation value
func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
......@@ -558,7 +552,7 @@ func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, err
return base64.URLEncoding.EncodeToString(msg), nil
}
// read from kubernetes-env current pod status
// readCurrentPod reads from kubernetes-env current pod status
func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
if err != nil {
......@@ -567,7 +561,7 @@ func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
return currentPod, nil
}
// add annotation for current pod
// addAnnotationForCurrentPod adds annotation for current pod
func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {
c.lock.Lock()
......
......@@ -140,14 +140,12 @@ func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
// get the watcher-set status
// Done gets the watcher-set status
func (s *watcherSetImpl) Done() <-chan struct{} {
return s.ctx.Done()
}
// Put
// put the watch event to watcher-set
// Put puts the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
blockSendMsg := func(object *WatcherEvent, w *watcher) {
......@@ -243,8 +241,7 @@ func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
return w, nil
}
// Get
// get elements from watcher-set
// Get gets elements from watcher-set
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
......@@ -297,19 +294,17 @@ type watcher struct {
exit chan struct{}
}
// ResultChan
// nolint
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}
// ID
// the watcher's id
// nolint
func (w *watcher) ID() string {
return strconv.FormatUint(w.id, 10)
}
// stop
// stop the watcher
// nolint
func (w *watcher) stop() {
// double close will panic
......@@ -318,14 +313,12 @@ func (w *watcher) stop() {
})
}
// done
// check watcher status
// done checks watcher status
func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newWatcherSet
// new watcher set from parent context
// newWatcherSet returns new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
ctx: ctx,
......
......@@ -21,7 +21,7 @@ import (
"fmt"
)
// DataListener ...
// DataListener defines common data listener interface
type DataListener interface {
DataChange(eventType Event) bool //bool is return for interface implement is interesting
}
......@@ -30,15 +30,15 @@ type DataListener interface {
// event type
//////////////////////////////////////////
// SourceObjectEventType ...
// EventType means SourceObjectEventType
type EventType int
const (
// EventTypeAdd ...
// EventTypeAdd means add event
EventTypeAdd = iota
// EventTypeDel ...
// EventTypeDel means del event
EventTypeDel
// EventTypeUpdate ...
// EventTypeUpdate means update event
EventTypeUpdate
)
......@@ -56,7 +56,7 @@ func (t EventType) String() string {
// service event
//////////////////////////////////////////
// Event ...
// Event defines common elements for service event
type Event struct {
Path string
Action EventType
......
......@@ -47,7 +47,7 @@ var (
errNilNode = perrors.Errorf("node does not exist")
)
// ZookeeperClient ...
// ZookeeperClient represents zookeeper client Configuration
type ZookeeperClient struct {
name string
ZkAddrs []string
......@@ -59,7 +59,7 @@ type ZookeeperClient struct {
eventRegistry map[string][]*chan struct{}
}
// StateToString ...
// nolint
func StateToString(state zk.State) string {
switch state {
case zk.StateDisconnected:
......@@ -89,7 +89,7 @@ func StateToString(state zk.State) string {
}
}
// Options ...
// nolint
type Options struct {
zkName string
client *ZookeeperClient
......@@ -97,17 +97,17 @@ type Options struct {
ts *zk.TestCluster
}
// Option ...
// Option will define a function of handling Options
type Option func(*Options)
// WithZkName ...
// WithZkName sets zk client name
func WithZkName(name string) Option {
return func(opt *Options) {
opt.zkName = name
}
}
// ValidateZookeeperClient ...
// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var err error
options := &Options{}
......@@ -187,14 +187,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
return z, nil
}
// WithTestCluster ...
// WithTestCluster sets test cluser for zk client
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
}
}
// NewMockZookeeperClient ...
// NewMockZookeeperClient returns a mock client instance
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
var (
err error
......@@ -226,21 +226,15 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
}
}
//callbackChan := make(chan zk.Event)
//f := func(event zk.Event) {
// callbackChan <- event
//}
z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
//z.wait.Add(1)
return ts, z, event, nil
}
// HandleZkEvent ...
// HandleZkEvent handles zookeeper events
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
......@@ -301,7 +295,7 @@ LOOP:
}
}
// RegisterEvent ...
// RegisterEvent registers zookeeper events
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
return
......@@ -316,7 +310,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
// UnregisterEvent ...
// UnregisterEvent unregisters zookeeper events
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
......@@ -343,7 +337,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
}
}
// Done ...
// nolint
func (z *ZookeeperClient) Done() <-chan struct{} {
return z.exit
}
......@@ -359,7 +353,7 @@ func (z *ZookeeperClient) stop() bool {
return false
}
// ZkConnValid ...
// ZkConnValid validates zookeeper connection
func (z *ZookeeperClient) ZkConnValid() bool {
select {
case <-z.exit:
......@@ -377,7 +371,7 @@ func (z *ZookeeperClient) ZkConnValid() bool {
return valid
}
// Close ...
// nolint
func (z *ZookeeperClient) Close() {
if z == nil {
return
......@@ -436,7 +430,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
return nil
}
// Delete ...
// nolint
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
......@@ -453,7 +447,7 @@ func (z *ZookeeperClient) Delete(basePath string) error {
return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}
// RegisterTemp ...
// RegisterTemp registers temporary node by @basePath and @node
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
......@@ -472,7 +466,6 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
return zkPath, perrors.WithStack(err)
......@@ -482,7 +475,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
return tmpPath, nil
}
// RegisterTempSeq ...
// RegisterTempSeq register temporary sequence node by @basePath and @data
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
var (
err error
......@@ -513,7 +506,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
return tmpPath, nil
}
// GetChildrenW ...
// GetChildrenW gets children watch by @path
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
var (
err error
......@@ -550,7 +543,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
return children, watcher.EvtCh, nil
}
// GetChildren ...
// GetChildren gets children by @path
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
var (
err error
......@@ -583,7 +576,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return children, nil
}
// ExistW ...
// ExistW to judge watch whether it exists or not by @zkPath
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
var (
exist bool
......@@ -611,7 +604,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
return watcher.EvtCh, nil
}
// GetContent ...
// GetContent gets content by @zkPath
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
......@@ -40,7 +40,7 @@ type zkClientFacade interface {
common.Node
}
// HandleClientRestart ...
// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r zkClientFacade) {
var (
err error
......
......@@ -37,7 +37,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// ZkEventListener ...
// nolint
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
......@@ -45,7 +45,7 @@ type ZkEventListener struct {
wg sync.WaitGroup
}
// NewZkEventListener ...
// NewZkEventListener returns a EventListener instance
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
......@@ -53,12 +53,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
}
}
// SetClient ...
// nolint
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
// ListenServiceNodeEvent ...
// nolint
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
var zkEvent zk.Event
......@@ -312,7 +312,7 @@ func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
// Close ...
// nolint
func (l *ZkEventListener) Close() {
l.wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment