Skip to content
Snippets Groups Projects
Unverified Commit 52a7d2a5 authored by lni's avatar lni Committed by GitHub
Browse files

hakeeper: internally retry HAKeeper client requests (#4194)

parent fce28392
No related branches found
No related tags found
No related merge requests found
......@@ -109,12 +109,14 @@ func (c *managedHAKeeperClient) Close() error {
func (c *managedHAKeeperClient) GetClusterDetails(ctx context.Context) (pb.ClusterDetails, error) {
for {
if err := c.prepareClient(ctx); err != nil {
return pb.ClusterDetails{}, err
}
cd, err := c.client.getClusterDetails(ctx)
if err != nil {
c.resetClient()
}
if c.isInternalError(err) {
if cerr := c.prepareClient(ctx); cerr != nil {
plog.Errorf("failed to prepare HAKeeper client %v", cerr)
return pb.ClusterDetails{}, cerr
}
continue
}
return cd, err
......@@ -124,12 +126,14 @@ func (c *managedHAKeeperClient) GetClusterDetails(ctx context.Context) (pb.Clust
func (c *managedHAKeeperClient) SendCNHeartbeat(ctx context.Context,
hb pb.CNStoreHeartbeat) error {
for {
if err := c.prepareClient(ctx); err != nil {
return err
}
err := c.client.sendCNHeartbeat(ctx, hb)
if err != nil {
c.resetClient()
}
if c.isInternalError(err) {
if cerr := c.prepareClient(ctx); cerr != nil {
plog.Errorf("failed to prepare HAKeeper client %v", cerr)
return cerr
}
continue
}
return err
......@@ -139,12 +143,14 @@ func (c *managedHAKeeperClient) SendCNHeartbeat(ctx context.Context,
func (c *managedHAKeeperClient) SendDNHeartbeat(ctx context.Context,
hb pb.DNStoreHeartbeat) (pb.CommandBatch, error) {
for {
if err := c.prepareClient(ctx); err != nil {
return pb.CommandBatch{}, err
}
cb, err := c.client.sendDNHeartbeat(ctx, hb)
if err != nil {
c.resetClient()
}
if c.isInternalError(err) {
if cerr := c.prepareClient(ctx); cerr != nil {
plog.Errorf("failed to prepare HAKeeper client %v", cerr)
return pb.CommandBatch{}, cerr
}
continue
}
return cb, err
......@@ -154,12 +160,14 @@ func (c *managedHAKeeperClient) SendDNHeartbeat(ctx context.Context,
func (c *managedHAKeeperClient) SendLogHeartbeat(ctx context.Context,
hb pb.LogStoreHeartbeat) (pb.CommandBatch, error) {
for {
if err := c.prepareClient(ctx); err != nil {
return pb.CommandBatch{}, err
}
cb, err := c.client.sendLogHeartbeat(ctx, hb)
if err != nil {
c.resetClient()
}
if c.isInternalError(err) {
if cerr := c.prepareClient(ctx); cerr != nil {
plog.Errorf("failed to prepare HAKeeper client %v", cerr)
return pb.CommandBatch{}, cerr
}
continue
}
return cb, err
......@@ -170,14 +178,24 @@ func (c *managedHAKeeperClient) isInternalError(err error) bool {
return errors.Is(err, ErrNotHAKeeper)
}
func (c *managedHAKeeperClient) resetClient() {
if c.client != nil {
cc := c.client
c.client = nil
if err := cc.close(); err != nil {
plog.Errorf("failed to close client %v", err)
}
}
}
func (c *managedHAKeeperClient) prepareClient(ctx context.Context) error {
if c.client != nil {
return nil
}
cc, err := newHAKeeperClient(ctx, c.cfg)
if err != nil {
return err
}
if err := c.client.close(); err != nil {
plog.Errorf("failed to close the client %v", err)
}
c.client = cc
return nil
}
......@@ -233,7 +251,14 @@ func newHAKeeperClient(ctx context.Context,
}
func (c *hakeeperClient) close() error {
return c.client.Close()
if c == nil {
panic("!!!")
}
if c.client != nil {
return c.client.Close()
}
return nil
}
func (c *hakeeperClient) getClusterDetails(ctx context.Context) (pb.ClusterDetails, error) {
......
......@@ -78,6 +78,12 @@ func TestHAKeeperClientSendCNHeartbeat(t *testing.T) {
defer func() {
assert.NoError(t, c1.Close())
}()
// should be transparently handled
cc := c1.(*managedHAKeeperClient)
assert.NoError(t, cc.client.close())
cc.client = nil
hb := pb.CNStoreHeartbeat{
UUID: s.ID(),
ServiceAddress: "addr1",
......@@ -89,6 +95,12 @@ func TestHAKeeperClientSendCNHeartbeat(t *testing.T) {
defer func() {
assert.NoError(t, c2.Close())
}()
// should be transparently handled
cc = c2.(*managedHAKeeperClient)
assert.NoError(t, cc.client.close())
cc.client = nil
hb2 := pb.DNStoreHeartbeat{
UUID: s.ID(),
ServiceAddress: "addr2",
......@@ -97,6 +109,11 @@ func TestHAKeeperClientSendCNHeartbeat(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 0, len(cb.Commands))
// should be transparently handled
cc = c1.(*managedHAKeeperClient)
assert.NoError(t, cc.client.close())
cc.client = nil
cd, err := c1.GetClusterDetails(ctx)
require.NoError(t, err)
cn := pb.CNNode{
......@@ -160,6 +177,12 @@ func TestHAKeeperClientSendLogHeartbeat(t *testing.T) {
defer func() {
assert.NoError(t, c.Close())
}()
// should be transparently handled
cc := c.(*managedHAKeeperClient)
assert.NoError(t, cc.client.close())
cc.client = nil
hb := s.store.getHeartbeatMessage()
cb, err := c.SendLogHeartbeat(ctx, hb)
require.NoError(t, err)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment