diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 24f2858e091435cd73fba751c59693930378058b..89dbe313510f427dd8f7e0bb91f47d5236c77d21 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -157,9 +157,9 @@ func (r *etcdV3Registry) stop() { // close current client r.client.Close() - r.client = nil r.cltLock.Lock() + r.client = nil r.services = nil r.cltLock.Unlock() } diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index bc781200ef4347dccbac7b8eb31b73e19e51a0a9..9391e5307f17bb36bf04ea3abbe50975fbab6081 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -28,7 +28,9 @@ func initRegistry(t *testing.T) *etcdV3Registry { t.Fatal(err) } - return reg.(*etcdV3Registry) + out := reg.(*etcdV3Registry) + out.client.CleanKV() + return out } func Test_Register(t *testing.T) { diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 4cac0c8ae8954a3f226dc94c7eee33b40b3b569a..dbc77a2f0544bbae7d9c57e4e8a8cf42ee2448f6 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -157,10 +157,8 @@ func (c *Client) clean() { // cancel ctx for raw client c.cancel() - // clean ctx cancel raw client + // clean raw client c.rawClient = nil - c.ctx = nil - c.cancel = nil } func (c *Client) stop() bool { @@ -290,6 +288,23 @@ func (c *Client) get(k string) (string, error) { return string(resp.Kvs[0].Value), nil } +func (c *Client) CleanKV()error{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3Client + } + + _, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix()) + if err != nil { + return err + } + + return nil +} + func (c *Client) getChildren(k string) ([]string, []string, error) { c.lock.RLock() diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go index 848aefce6cbd01c89bc02ab374057064e452be99..8646adf262bf6163b67ea85e6a4ea6e153c6169e 100644 --- a/remoting/etcdv3/client_test.go +++ b/remoting/etcdv3/client_test.go @@ -55,6 +55,7 @@ func initClient(t *testing.T) *Client { if err != nil { t.Fatal(err) } + c.CleanKV() return c } @@ -79,7 +80,7 @@ func TestClient_Create(t *testing.T) { tests := tests c := initClient(t) - c.Close() + defer c.Close() for _, tc := range tests { @@ -110,7 +111,7 @@ func TestClient_Delete(t *testing.T) { tests := tests c := initClient(t) - c.Close() + defer c.Close() for _, tc := range tests { diff --git a/remoting/etcdv3/facede_test.go b/remoting/etcdv3/facede_test.go index ce0bfdc3c444df2028fda9c96b53ba43a957cfeb..590d30dc2fade430ce4d2f961e008faac8718042 100644 --- a/remoting/etcdv3/facede_test.go +++ b/remoting/etcdv3/facede_test.go @@ -56,14 +56,14 @@ func (r *mockFacade) IsAvailable() bool { func Test_Fascade(t *testing.T) { c := initClient(t) - defer c.Close() - url, err := common.NewURL(context.Background(), "mock://127.0.0.1") + url, err := common.NewURL(context.Background(), "mock://127.0.0.1:2379") if err != nil { t.Fatal(err) } mock := &mockFacade{client: c, URL: &url} go HandleClientRestart(mock) + c.Close() time.Sleep(2 * time.Second) } diff --git a/remoting/etcdv3/listener_test.go b/remoting/etcdv3/listener_test.go index 57a2b34452cccd760c518ec0436e87202241256e..22f8b70889faee52e9d3ee5571f03a77a02446f7 100644 --- a/remoting/etcdv3/listener_test.go +++ b/remoting/etcdv3/listener_test.go @@ -2,12 +2,12 @@ package etcdv3 import ( "testing" + "time" "github.com/apache/dubbo-go/remoting" "github.com/stretchr/testify/assert" ) - var changedData = ` dubbo.consumer.request_timeout=3s dubbo.consumer.connect_timeout=5s @@ -29,10 +29,11 @@ var changedData = ` dubbo.service.com.ikurento.user.UserProvider.warmup=100 dubbo.service.com.ikurento.user.UserProvider.cluster=failover ` + func TestListener(t *testing.T) { - var tests = []struct{ - input struct{ + var tests = []struct { + input struct { k string v string } @@ -47,31 +48,37 @@ func TestListener(t *testing.T) { defer c.Close() listener := NewEventListener(c) - dataListener := &mockDataListener{client: c, changedData: changedData} + dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} listener.ListenServiceEvent("/dubbo", dataListener) - - for _, tc := range tests{ + // NOTICE: direct listen will lose create msg + time.Sleep(time.Second) + for _, tc := range tests { k := tc.input.k v := tc.input.v - if err := c.Create(k, v); err != nil{ + if err := c.Create(k, v); err != nil { t.Fatal(err) } + } - assert.Equal(t, changedData, dataListener.eventList[0].Content) + msg := <-dataListener.rc + assert.Equal(t, changedData, msg.Content) + } type mockDataListener struct { eventList []remoting.Event client *Client changedData string + + rc chan remoting.Event } func (m *mockDataListener) DataChange(eventType remoting.Event) bool { m.eventList = append(m.eventList, eventType) if eventType.Content == m.changedData { - //m.client.Close() + m.rc <- eventType } return true }