Skip to content
Snippets Groups Projects
Commit 275881db authored by 陈庆祥's avatar 陈庆祥 Committed by zhenshan.cao
Browse files

Remove address param (#5556)


Signed-off-by: default avatargodchen <qingxiang.chen@zilliz.com>
parent 41794ec3
No related branches found
No related tags found
No related merge requests found
Showing
with 178 additions and 273 deletions
......@@ -35,10 +35,9 @@ type UniqueID = typeutil.UniqueID
type IDAllocator struct {
Allocator
etcdAddr []string
metaRoot string
masterAddress string
masterClient types.MasterService
etcdAddr []string
metaRoot string
masterClient types.MasterService
countPerRPC uint32
......@@ -48,7 +47,7 @@ type IDAllocator struct {
PeerID UniqueID
}
func NewIDAllocator(ctx context.Context, masterAddr, metaRoot string, etcdAddr []string) (*IDAllocator, error) {
func NewIDAllocator(ctx context.Context, metaRoot string, etcdAddr []string) (*IDAllocator, error) {
ctx1, cancel := context.WithCancel(ctx)
a := &IDAllocator{
......@@ -57,10 +56,9 @@ func NewIDAllocator(ctx context.Context, masterAddr, metaRoot string, etcdAddr [
CancelFunc: cancel,
Role: "IDAllocator",
},
countPerRPC: IDCountPerRPC,
metaRoot: metaRoot,
etcdAddr: etcdAddr,
masterAddress: masterAddr,
countPerRPC: IDCountPerRPC,
metaRoot: metaRoot,
etcdAddr: etcdAddr,
}
a.TChan = &EmptyTicker{}
a.Allocator.SyncFunc = a.syncID
......@@ -74,7 +72,7 @@ func NewIDAllocator(ctx context.Context, masterAddr, metaRoot string, etcdAddr [
func (ia *IDAllocator) Start() error {
var err error
ia.masterClient, err = msc.NewClient(ia.masterAddress, ia.metaRoot, ia.etcdAddr, 20*time.Second)
ia.masterClient, err = msc.NewClient(ia.metaRoot, ia.etcdAddr, 3*time.Second)
if err != nil {
panic(err)
}
......
......@@ -163,18 +163,20 @@ func (node *DataNode) Init() error {
return err
}
for _, kv := range resp.InitParams.StartParams {
switch kv.Key {
case "DDChannelName":
Params.DDChannelNames = []string{kv.Value}
case "SegmentStatisticsChannelName":
Params.SegmentStatisticsChannelName = kv.Value
case "TimeTickChannelName":
Params.TimeTickChannelName = kv.Value
case "CompleteFlushChannelName":
Params.CompleteFlushChannelName = kv.Value
default:
return fmt.Errorf("Invalid key: %v", kv.Key)
if resp.InitParams != nil {
for _, kv := range resp.InitParams.StartParams {
switch kv.Key {
case "DDChannelName":
Params.DDChannelNames = []string{kv.Value}
case "SegmentStatisticsChannelName":
Params.SegmentStatisticsChannelName = kv.Value
case "TimeTickChannelName":
Params.TimeTickChannelName = kv.Value
case "CompleteFlushChannelName":
Params.CompleteFlushChannelName = kv.Value
default:
return fmt.Errorf("Invalid key: %v", kv.Key)
}
}
}
log.Debug("DataNode Init", zap.Any("DDChannelName", Params.DDChannelNames),
......
......@@ -81,14 +81,13 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
flushCh: make(chan UniqueID, 1024),
}
s.dataClientCreator = func(addr string) (types.DataNode, error) {
return datanodeclient.NewClient(addr, 10*time.Second)
return datanodeclient.NewClient(addr, 3*time.Second)
}
s.masterClientCreator = func(addr string) (types.MasterService, error) {
return masterclient.NewClient(addr, Params.MetaRootPath,
return masterclient.NewClient(Params.MetaRootPath,
[]string{Params.EtcdAddress}, masterClientTimout)
}
log.Debug("DataService", zap.Any("State", s.state.Load()))
return s, nil
}
......@@ -261,7 +260,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
statsStream, _ := s.msFactory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
(??) log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
// try to restore last processed pos
pos, err := s.loadStreamLastPos(streamTypeStats)
log.Debug("load last pos of stats channel", zap.Any("pos", pos), zap.Error(err))
......
......@@ -36,22 +36,22 @@ type Client struct {
grpc datapb.DataNodeClient
conn *grpc.ClientConn
address string
addr string
timeout time.Duration
reconnTry int
recallTry int
}
func NewClient(address string, timeout time.Duration) (*Client, error) {
if address == "" {
func NewClient(addr string, timeout time.Duration) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
return &Client{
grpc: nil,
conn: nil,
address: address,
addr: addr,
ctx: context.Background(),
timeout: timeout,
recallTry: 3,
......@@ -62,10 +62,10 @@ func NewClient(address string, timeout time.Duration) (*Client, error) {
func (c *Client) Init() error {
tracer := opentracing.GlobalTracer()
connectGrpcFunc := func() error {
log.Debug("DataNodeClient try connect ", zap.String("address", c.address))
log.Debug("DataNodeClient try connect ", zap.String("address", c.addr))
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......@@ -91,8 +91,10 @@ func (c *Client) reconnect() error {
tracer := opentracing.GlobalTracer()
var err error
connectGrpcFunc := func() error {
log.Debug("DataNodeClient try reconnect ", zap.String("address", c.address))
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("DataNode connect ", zap.String("address", c.addr))
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......
......@@ -122,10 +122,10 @@ func TestRun(t *testing.T) {
Params.Init()
dnServer.newMasterServiceClient = func(s string) (types.MasterService, error) {
dnServer.newMasterServiceClient = func() (types.MasterService, error) {
return &mockMaster{}, nil
}
dnServer.newDataServiceClient = func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
dnServer.newDataServiceClient = func(etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
return &mockDataService{}
}
......
......@@ -55,8 +55,8 @@ type Server struct {
masterService types.MasterService
dataService types.DataService
newMasterServiceClient func(string) (types.MasterService, error)
newDataServiceClient func(string, string, string, time.Duration) types.DataService
newMasterServiceClient func() (types.MasterService, error)
newDataServiceClient func(string, string, time.Duration) types.DataService
closer io.Closer
}
......@@ -69,11 +69,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
cancel: cancel,
msFactory: factory,
grpcErrChan: make(chan error),
newMasterServiceClient: func(s string) (types.MasterService, error) {
return msc.NewClient(s, dn.Params.MetaRootPath, []string{dn.Params.EtcdAddress}, 20*time.Second)
newMasterServiceClient: func() (types.MasterService, error) {
return msc.NewClient(dn.Params.MetaRootPath, []string{dn.Params.EtcdAddress}, 3*time.Second)
},
newDataServiceClient: func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
return dsc.NewClient(Params.DataServiceAddress, etcdMetaRoot, []string{etcdAddress}, timeout)
newDataServiceClient: func(etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
return dsc.NewClient(etcdMetaRoot, []string{etcdAddress}, timeout)
},
}
......@@ -183,7 +183,7 @@ func (s *Server) init() error {
if s.newMasterServiceClient != nil {
log.Debug("Master service address", zap.String("address", Params.MasterAddress))
log.Debug("Init master service client ...")
masterServiceClient, err := s.newMasterServiceClient(Params.MasterAddress)
masterServiceClient, err := s.newMasterServiceClient()
if err != nil {
log.Debug("DataNode newMasterServiceClient failed", zap.Error(err))
panic(err)
......@@ -211,7 +211,7 @@ func (s *Server) init() error {
if s.newDataServiceClient != nil {
log.Debug("Data service address", zap.String("address", Params.DataServiceAddress))
log.Debug("DataNode Init data service client ...")
dataServiceClient := s.newDataServiceClient(Params.DataServiceAddress, dn.Params.MetaRootPath, dn.Params.EtcdAddress, 3*time.Second)
dataServiceClient := s.newDataServiceClient(dn.Params.MetaRootPath, dn.Params.EtcdAddress, 10*time.Second)
if err = dataServiceClient.Init(); err != nil {
log.Debug("DataNode newDataServiceClient failed", zap.Error(err))
panic(err)
......
......@@ -37,7 +37,8 @@ type Client struct {
ctx context.Context
addr string
sess *sessionutil.Session
sess *sessionutil.Session
timeout time.Duration
recallTry int
reconnTry int
......@@ -53,15 +54,14 @@ func getDataServiceAddress(sess *sessionutil.Session) (string, error) {
ms, ok := msess[key]
if !ok {
log.Debug("DataServiceClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
return "", fmt.Errorf("number of master service is incorrect, %d", len(msess))
return "", fmt.Errorf("number of dataservice is incorrect, %d", len(msess))
}
return ms.Address, nil
}
func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duration) *Client {
func NewClient(metaRoot string, etcdAddr []string, timeout time.Duration) *Client {
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr)
return &Client{
addr: address,
ctx: context.Background(),
sess: sess,
timeout: timeout,
......@@ -71,40 +71,10 @@ func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duratio
}
func (c *Client) Init() error {
tracer := opentracing.GlobalTracer()
log.Debug("DataServiceClient", zap.Any("c.addr", c.addr))
if c.addr != "" {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("DataServiceClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
if err != nil {
log.Debug("DataServiceClient connect failed", zap.Error(err))
return err
}
} else {
return c.reconnect()
}
log.Debug("DataServiceClient connect success")
c.grpcClient = datapb.NewDataServiceClient(c.conn)
return nil
return c.connect()
}
func (c *Client) reconnect() error {
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getDataServiceAddressFn := func() error {
......@@ -150,7 +120,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
err = c.reconnect()
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
......
......@@ -90,7 +90,7 @@ func TestRun(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "listen tcp: address 1000000: invalid port")
dsServer.newMasterServiceClient = func(s string) (types.MasterService, error) {
dsServer.newMasterServiceClient = func() (types.MasterService, error) {
return &mockMaster{}, nil
}
......
......@@ -54,7 +54,7 @@ type Server struct {
grpcServer *grpc.Server
masterService types.MasterService
newMasterServiceClient func(string) (types.MasterService, error)
newMasterServiceClient func() (types.MasterService, error)
closer io.Closer
}
......@@ -68,8 +68,8 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
ctx: ctx1,
cancel: cancel,
grpcErrChan: make(chan error),
newMasterServiceClient: func(s string) (types.MasterService, error) {
return msc.NewClient(s, dataservice.Params.MetaRootPath, []string{dataservice.Params.EtcdAddress}, 10*time.Second)
newMasterServiceClient: func() (types.MasterService, error) {
return msc.NewClient(dataservice.Params.MetaRootPath, []string{dataservice.Params.EtcdAddress}, 3*time.Second)
},
}
s.dataService, err = dataservice.CreateServer(s.ctx, factory)
......@@ -107,7 +107,7 @@ func (s *Server) init() error {
if s.newMasterServiceClient != nil {
log.Debug("DataService try to new master service client", zap.String("address", Params.MasterAddress))
masterServiceClient, err := s.newMasterServiceClient(Params.MasterAddress)
masterServiceClient, err := s.newMasterServiceClient()
if err != nil {
log.Debug("DataService new master service client failed", zap.Error(err))
panic(err)
......@@ -166,8 +166,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor))
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
//grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor))
datapb.RegisterDataServiceServer(s.grpcServer, s)
grpc_prometheus.Register(s.grpcServer)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
......
......@@ -34,19 +34,19 @@ type Client struct {
conn *grpc.ClientConn
ctx context.Context
address string
addr string
timeout time.Duration
reconnTry int
recallTry int
}
func NewClient(address string, timeout time.Duration) (*Client, error) {
if address == "" {
func NewClient(addr string, timeout time.Duration) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
return &Client{
address: address,
addr: addr,
ctx: context.Background(),
timeout: timeout,
recallTry: 3,
......@@ -59,8 +59,8 @@ func (c *Client) Init() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("IndexNodeClient try connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......@@ -76,7 +76,7 @@ func (c *Client) Init() error {
log.Debug("IndexNodeClient try connect failed", zap.Error(err))
return err
}
log.Debug("IndexNodeClient try connect success", zap.String("address", c.address))
log.Debug("IndexNodeClient try connect success", zap.String("address", c.addr))
c.grpcClient = indexpb.NewIndexNodeClient(c.conn)
return nil
}
......@@ -87,8 +87,8 @@ func (c *Client) reconnect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("IndexNodeClient try reconnect ", zap.String("address", c.address))
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("IndexNodeClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......@@ -105,7 +105,7 @@ func (c *Client) reconnect() error {
log.Debug("IndexNodeClient try reconnect failed", zap.Error(err))
return err
}
log.Debug("IndexNodeClient try reconnect success", zap.String("address", c.address))
log.Debug("IndexNodeClient try reconnect success", zap.String("address", c.addr))
c.grpcClient = indexpb.NewIndexNodeClient(c.conn)
return nil
}
......
......@@ -139,8 +139,7 @@ func (s *Server) init() error {
return err
}
indexServiceAddr := Params.IndexServerAddress
s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr, indexnode.Params.MetaRootPath, []string{indexnode.Params.EtcdAddress}, 10*time.Second)
s.indexServiceClient = grpcindexserviceclient.NewClient(indexnode.Params.MetaRootPath, []string{indexnode.Params.EtcdAddress}, 3*time.Second)
err = s.indexServiceClient.Init()
if err != nil {
log.Debug("IndexNode indexSerticeClient init failed", zap.Error(err))
......@@ -152,7 +151,7 @@ func (s *Server) init() error {
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
err = s.indexnode.Init()
if err != nil {
log.Debug("IndexNode Init failed", zap.Error(err))
log.Debug("IndexNode Init failed", zap.Error(err))
return err
}
return nil
......
......@@ -39,14 +39,15 @@ type Client struct {
grpcClient indexpb.IndexServiceClient
conn *grpc.ClientConn
address string
sess *sessionutil.Session
addr string
sess *sessionutil.Session
timeout time.Duration
recallTry int
reconnTry int
}
func getIndexServiceAddress(sess *sessionutil.Session) (string, error) {
func getIndexServiceaddr(sess *sessionutil.Session) (string, error) {
key := typeutil.IndexServiceRole
msess, _, err := sess.GetSessions(key)
if err != nil {
......@@ -56,15 +57,14 @@ func getIndexServiceAddress(sess *sessionutil.Session) (string, error) {
ms, ok := msess[key]
if !ok {
log.Debug("IndexServiceClient msess key not existed", zap.Any("key", key), zap.Any("len of msess", len(msess)))
return "", fmt.Errorf("number of master service is incorrect, %d", len(msess))
return "", fmt.Errorf("number of indexservice is incorrect, %d", len(msess))
}
return ms.Address, nil
}
func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duration) *Client {
func NewClient(metaRoot string, etcdAddr []string, timeout time.Duration) *Client {
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr)
return &Client{
address: address,
ctx: context.Background(),
sess: sess,
timeout: timeout,
......@@ -74,48 +74,20 @@ func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duratio
}
func (c *Client) Init() error {
tracer := opentracing.GlobalTracer()
log.Debug("IndexServiceClient Init", zap.Any("c.address", c.address))
if c.address != "" {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("IndexServiceClient try connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
if err != nil {
return err
}
log.Debug("IndexServiceClient try connect failed", zap.Error(err))
} else {
return c.reconnect()
}
log.Debug("IndexServiceClient try connect success")
c.grpcClient = indexpb.NewIndexServiceClient(c.conn)
return nil
return c.connect()
}
func (c *Client) reconnect() error {
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getIndexServiceAddressFn := func() error {
c.address, err = getIndexServiceAddress(c.sess)
getIndexServiceaddrFn := func() error {
c.addr, err = getIndexServiceaddr(c.sess)
if err != nil {
return err
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getIndexServiceAddressFn)
err = retry.Retry(c.reconnTry, 3*time.Second, getIndexServiceaddrFn)
if err != nil {
log.Debug("IndexServiceClient getIndexServiceAddress failed", zap.Error(err))
return err
......@@ -124,8 +96,8 @@ func (c *Client) reconnect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("IndexServiceClient try connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("IndexServiceClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......@@ -152,7 +124,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
err = c.reconnect()
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
......
......@@ -16,17 +16,17 @@ import (
"fmt"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/masterpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
)
......@@ -61,7 +61,7 @@ func getMasterServiceAddr(sess *sessionutil.Session) (string, error) {
return ms.Address, nil
}
func NewClient(addr string, metaRoot string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) {
func NewClient(metaRoot string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) {
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
......@@ -73,7 +73,6 @@ func NewClient(addr string, metaRoot string, etcdAddr []string, timeout time.Dur
grpcClient: nil,
conn: nil,
ctx: context.Background(),
addr: addr,
timeout: timeout,
reconnTry: 300,
recallTry: 3,
......@@ -81,26 +80,38 @@ func NewClient(addr string, metaRoot string, etcdAddr []string, timeout time.Dur
}, nil
}
func (c *GrpcClient) reconnect() error {
addr, err := getMasterServiceAddr(c.sess)
func (c *GrpcClient) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getMasterServiceAddrFn := func() error {
c.addr, err = getMasterServiceAddr(c.sess)
if err != nil {
return err
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getMasterServiceAddrFn)
if err != nil {
log.Debug("MasterServiceClient getMasterServiceAddr failed", zap.Error(err))
return err
}
log.Debug("MasterServiceClient getMasterServiceAddr success")
tracer := opentracing.GlobalTracer()
for i := 0; i < c.reconnTry; i++ {
connectGrpcFunc := func() error {
log.Debug("MasterServiceClient try reconnect ", zap.String("address", c.addr))
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
if c.conn, err = grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock(),
defer cancelFunc()
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer))); err == nil {
cancelFunc()
break
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
cancelFunc()
c.conn = conn
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
log.Debug("MasterServiceClient try reconnect failed", zap.Error(err))
return err
......@@ -111,32 +122,7 @@ func (c *GrpcClient) reconnect() error {
}
func (c *GrpcClient) Init() error {
tracer := opentracing.GlobalTracer()
var err error
log.Debug("MasterServiceClient Init", zap.Any("c.addr", c.addr))
if c.addr != "" {
for i := 0; i < 10000; i++ {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer))); err == nil {
cancelFunc()
break
}
cancelFunc()
}
if err != nil {
log.Debug("MasterServiceClient connect to master failed", zap.Error(err))
return fmt.Errorf("connect to specific address gprc error")
}
} else {
return c.reconnect()
}
log.Debug("MasterServiceClient connect to master success")
c.grpcClient = masterpb.NewMasterServiceClient(c.conn)
return nil
return c.connect()
}
func (c *GrpcClient) Start() error {
......@@ -157,7 +143,7 @@ func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, er
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
err = c.reconnect()
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
......
......@@ -247,12 +247,16 @@ func TestGrpcService(t *testing.T) {
return nil
}
cms.Params.Address = Params.Address
err = svr.masterService.Register()
assert.Nil(t, err)
err = svr.start()
assert.Nil(t, err)
svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy)
cli, err := grpcmasterserviceclient.NewClient(Params.Address, cms.Params.MetaRootPath, []string{cms.Params.EtcdAddress}, 3*time.Second)
cli, err := grpcmasterserviceclient.NewClient(cms.Params.MetaRootPath, []string{cms.Params.EtcdAddress}, 3*time.Second)
assert.Nil(t, err)
err = cli.Init()
......@@ -812,6 +816,10 @@ func TestGrpcService(t *testing.T) {
err = svr.Stop()
assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
assert.Nil(t, err)
}
type mockCore struct {
......@@ -924,14 +932,14 @@ func TestRun(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "listen tcp: address 1000000: invalid port")
svr.newDataServiceClient = func(s, metaRoot, address string, timeout time.Duration) types.DataService {
svr.newDataServiceClient = func(metaRoot, address string, timeout time.Duration) types.DataService {
return &mockDataService{}
}
svr.newIndexServiceClient = func(s, etcdAddress, metaRootPath string, timeout time.Duration) types.IndexService {
svr.newIndexServiceClient = func(etcdAddress, metaRootPath string, timeout time.Duration) types.IndexService {
return &mockIndex{}
}
svr.newQueryServiceClient = func(s, metaRootPath, etcdAddress string) (types.QueryService, error) {
return &mockQuery{}, nil
svr.newQueryServiceClient = func(metaRootPath, etcdAddress string, timeout time.Duration) types.QueryService {
return &mockQuery{}
}
Params.Port = rand.Int()%100 + 10000
......
......@@ -58,9 +58,9 @@ type Server struct {
indexService types.IndexService
queryService types.QueryService
newIndexServiceClient func(string, string, string, time.Duration) types.IndexService
newDataServiceClient func(string, string, string, time.Duration) types.DataService
newQueryServiceClient func(string, string, string) (types.QueryService, error)
newIndexServiceClient func(string, string, time.Duration) types.IndexService
newDataServiceClient func(string, string, time.Duration) types.DataService
newQueryServiceClient func(string, string, time.Duration) types.QueryService
closer io.Closer
}
......@@ -84,8 +84,8 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
func (s *Server) setClient() {
ctx := context.Background()
s.newDataServiceClient = func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
dsClient := dsc.NewClient(s, etcdMetaRoot, []string{etcdAddress}, timeout)
s.newDataServiceClient = func(etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
dsClient := dsc.NewClient(etcdMetaRoot, []string{etcdAddress}, timeout)
if err := dsClient.Init(); err != nil {
panic(err)
}
......@@ -97,8 +97,8 @@ func (s *Server) setClient() {
}
return dsClient
}
s.newIndexServiceClient = func(s, metaRootPath, etcdAddress string, timeout time.Duration) types.IndexService {
isClient := isc.NewClient(s, metaRootPath, []string{etcdAddress}, timeout)
s.newIndexServiceClient = func(metaRootPath, etcdAddress string, timeout time.Duration) types.IndexService {
isClient := isc.NewClient(metaRootPath, []string{etcdAddress}, timeout)
if err := isClient.Init(); err != nil {
panic(err)
}
......@@ -107,8 +107,8 @@ func (s *Server) setClient() {
}
return isClient
}
s.newQueryServiceClient = func(s, metaRootPath, etcdAddress string) (types.QueryService, error) {
qsClient, err := qsc.NewClient(context.Background(), s, metaRootPath, []string{etcdAddress}, 5*time.Second)
s.newQueryServiceClient = func(metaRootPath, etcdAddress string, timeout time.Duration) types.QueryService {
qsClient, err := qsc.NewClient(metaRootPath, []string{etcdAddress}, timeout)
if err != nil {
panic(err)
}
......@@ -118,7 +118,7 @@ func (s *Server) setClient() {
if err := qsClient.Start(); err != nil {
panic(err)
}
return qsClient, nil
return qsClient
}
}
......@@ -161,7 +161,7 @@ func (s *Server) init() error {
log.Debug("MasterService", zap.Any("State", internalpb.StateCode_Initializing))
s.masterService.SetNewProxyClient(
func(s *sessionutil.Session) (types.ProxyNode, error) {
cli := pnc.NewClient(ctx, s.Address, 10*time.Second)
cli := pnc.NewClient(s.Address, 3*time.Second)
if err := cli.Init(); err != nil {
return nil, err
}
......@@ -173,24 +173,24 @@ func (s *Server) init() error {
)
if s.newDataServiceClient != nil {
log.Debug("MasterService start to create DataService client", zap.String("address", Params.DataServiceAddress))
dataService := s.newDataServiceClient(Params.DataServiceAddress, cms.Params.MetaRootPath, cms.Params.EtcdAddress, 10*time.Second)
log.Debug("MasterService start to create DataService client")
dataService := s.newDataServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdAddress, 3*time.Second)
if err := s.masterService.SetDataService(ctx, dataService); err != nil {
panic(err)
}
s.dataService = dataService
}
if s.newIndexServiceClient != nil {
log.Debug("MasterService start to create IndexService client", zap.String("address", Params.IndexServiceAddress))
indexService := s.newIndexServiceClient(Params.IndexServiceAddress, cms.Params.MetaRootPath, cms.Params.EtcdAddress, 10*time.Second)
log.Debug("MasterService start to create IndexService client")
indexService := s.newIndexServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdAddress, 3*time.Second)
if err := s.masterService.SetIndexService(indexService); err != nil {
panic(err)
}
s.indexService = indexService
}
if s.newQueryServiceClient != nil {
log.Debug("MasterService start to create QueryService client", zap.String("address", Params.QueryServiceAddress))
queryService, _ := s.newQueryServiceClient(Params.QueryServiceAddress, cms.Params.MetaRootPath, cms.Params.EtcdAddress)
log.Debug("MasterService start to create QueryService client")
queryService := s.newQueryServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdAddress, 3*time.Second)
if err := s.masterService.SetQueryService(queryService); err != nil {
panic(err)
}
......
......@@ -32,16 +32,16 @@ type Client struct {
conn *grpc.ClientConn
ctx context.Context
address string
addr string
timeout time.Duration
reconnTry int
recallTry int
}
func NewClient(ctx context.Context, address string, timeout time.Duration) *Client {
func NewClient(addr string, timeout time.Duration) *Client {
return &Client{
address: address,
ctx: ctx,
addr: addr,
ctx: context.Background(),
timeout: timeout,
recallTry: 3,
reconnTry: 10,
......@@ -53,8 +53,8 @@ func (c *Client) Init() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......@@ -70,7 +70,7 @@ func (c *Client) Init() error {
log.Debug("ProxyNodeClient connect failed", zap.Error(err))
return err
}
log.Debug("ProxyNodeClient connect success", zap.String("address", c.address))
log.Debug("ProxyNodeClient connect success", zap.String("address", c.addr))
c.grpcClient = proxypb.NewProxyNodeServiceClient(c.conn)
return nil
}
......@@ -80,8 +80,8 @@ func (c *Client) reconnect() error {
connectGrpcFunc := func() error {
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("ProxyNodeClient try reconnect ", zap.String("address", c.address))
conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
log.Debug("ProxyNodeClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
......
......@@ -182,7 +182,7 @@ func (s *Server) init() error {
masterServiceAddr := Params.MasterAddress
log.Debug("ProxyNode", zap.String("master address", masterServiceAddr))
timeout := 3 * time.Second
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
if err != nil {
log.Debug("ProxyNode new masterServiceClient failed ", zap.Error(err))
return err
......@@ -203,7 +203,7 @@ func (s *Server) init() error {
dataServiceAddr := Params.DataServiceAddress
log.Debug("ProxyNode", zap.String("data service address", dataServiceAddr))
s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10*time.Second)
s.dataServiceClient = grpcdataserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
err = s.dataServiceClient.Init()
if err != nil {
log.Debug("ProxyNode dataServiceClient init failed ", zap.Error(err))
......@@ -214,7 +214,7 @@ func (s *Server) init() error {
indexServiceAddr := Params.IndexServerAddress
log.Debug("ProxyNode", zap.String("index server address", indexServiceAddr))
s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10*time.Second)
s.indexServiceClient = grpcindexserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
err = s.indexServiceClient.Init()
if err != nil {
log.Debug("ProxyNode indexServiceClient init failed ", zap.Error(err))
......@@ -225,25 +225,23 @@ func (s *Server) init() error {
queryServiceAddr := Params.QueryServiceAddress
log.Debug("ProxyNode", zap.String("query server address", queryServiceAddr))
s.queryServiceClient, err = grpcqueryserviceclient.NewClient(ctx, queryServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
s.queryServiceClient, err = grpcqueryserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
if err != nil {
log.Debug("ProxyNode new queryServiceClient failed ", zap.Error(err))
return err
}
err = s.queryServiceClient.Init()
if err != nil {
log.Debug("ProxyNode queryServiceClient Init failed ", zap.Error(err))
return err
}
s.proxynode.SetQueryServiceClient(s.queryServiceClient)
log.Debug("set query service client ...")
s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("ProxyNode state",
zap.Any("State", internalpb.StateCode_Initializing))
log.Debug("proxynode",
zap.Any("state of proxynode", internalpb.StateCode_Initializing))
if err := s.proxynode.Init(); err != nil {
log.Debug("ProxyNode init failed", zap.Error(err))
log.Debug("proxynode", zap.String("proxynode init error", err.Error()))
return err
}
......
......@@ -33,21 +33,24 @@ type Client struct {
ctx context.Context
grpcClient querypb.QueryNodeClient
conn *grpc.ClientConn
addr string
addr string
timeout time.Duration
reconnTry int
recallTry int
}
func NewClient(address string) (*Client, error) {
if address == "" {
return nil, fmt.Errorf("address is empty")
func NewClient(addr string, timeout time.Duration) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("addr is empty")
}
return &Client{
ctx: context.Background(),
addr: address,
timeout: 3 * time.Second,
addr: addr,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
}, nil
}
......
......@@ -105,7 +105,7 @@ func (s *Server) init() error {
}
// --- QueryService ---
log.Debug("QueryNode start to new QueryServiceClient", zap.Any("QueryServiceAddress", Params.QueryServiceAddress))
queryService, err := qsc.NewClient(ctx, Params.QueryServiceAddress, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 20*time.Second)
queryService, err := qsc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second)
if err != nil {
log.Debug("QueryNode new QueryServiceClient failed", zap.Error(err))
panic(err)
......@@ -138,7 +138,7 @@ func (s *Server) init() error {
addr := Params.MasterAddress
log.Debug("QueryNode start to new MasterServiceClient", zap.Any("QueryServiceAddress", addr))
masterService, err := msc.NewClient(addr, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 20*time.Second)
masterService, err := msc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second)
if err != nil {
log.Debug("QueryNode new MasterServiceClient failed", zap.Error(err))
panic(err)
......@@ -166,8 +166,8 @@ func (s *Server) init() error {
}
// --- IndexService ---
log.Debug("QueryNode start to new IndexServiceClient", zap.Any("IndexServiceAddress", Params.IndexServiceAddress))
indexService := isc.NewClient(Params.IndexServiceAddress, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 10*time.Second)
log.Debug("Index service", zap.String("address", Params.IndexServiceAddress))
indexService := isc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second)
if err := indexService.Init(); err != nil {
log.Debug("QueryNode IndexServiceClient Init failed", zap.Error(err))
......@@ -193,7 +193,7 @@ func (s *Server) init() error {
// --- DataService ---
log.Debug("QueryNode start to new DataServiceClient", zap.Any("DataServiceAddress", Params.DataServiceAddress))
dataService := dsc.NewClient(Params.DataServiceAddress, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 10*time.Second)
dataService := dsc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second)
if err = dataService.Init(); err != nil {
log.Debug("QueryNode DataServiceClient Init failed", zap.Error(err))
panic(err)
......
......@@ -53,20 +53,19 @@ func getQueryServiceAddress(sess *sessionutil.Session) (string, error) {
ms, ok := msess[key]
if !ok {
log.Debug("QueryServiceClient msess key not existed", zap.Any("key", key))
return "", fmt.Errorf("number of master service is incorrect, %d", len(msess))
return "", fmt.Errorf("number of queryservice is incorrect, %d", len(msess))
}
return ms.Address, nil
}
// NewClient creates a client for QueryService grpc call.
func NewClient(ctx context.Context, address, metaRootPath string, etcdAddr []string, timeout time.Duration) (*Client, error) {
func NewClient(metaRootPath string, etcdAddr []string, timeout time.Duration) (*Client, error) {
sess := sessionutil.NewSession(context.Background(), metaRootPath, etcdAddr)
return &Client{
ctx: ctx,
ctx: context.Background(),
grpcClient: nil,
conn: nil,
addr: address,
timeout: timeout,
reconnTry: 10,
recallTry: 3,
......@@ -75,40 +74,9 @@ func NewClient(ctx context.Context, address, metaRootPath string, etcdAddr []str
}
func (c *Client) Init() error {
tracer := opentracing.GlobalTracer()
log.Debug("QueryServiceClient try connect QueryService", zap.Any("c.addr", c.addr))
if c.addr != "" {
connectGrpcFunc := func() error {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
log.Debug("QueryServiceClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
if err != nil {
log.Debug("QueryServiceClient try connect failed", zap.Error(err))
return err
}
} else {
return c.reconnect()
}
log.Debug("QueryServiceClient try connect success", zap.Any("QueryServiceAddr", c.addr))
c.grpcClient = querypb.NewQueryServiceClient(c.conn)
return nil
return c.connect()
}
func (c *Client) reconnect() error {
func (c *Client) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getQueryServiceAddressFn := func() error {
......@@ -154,7 +122,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
err = c.reconnect()
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment