From cb3ca1ab7c47362730fe9477bdae8b1c5a1d55b0 Mon Sep 17 00:00:00 2001 From: bigsheeper <yihao.dai@zilliz.com> Date: Mon, 4 Jan 2021 14:17:59 +0800 Subject: [PATCH] Refactor kvPath param and unify the segment flush kvPath in write node Signed-off-by: bigsheeper <yihao.dai@zilliz.com> --- configs/milvus.yaml | 3 ++- internal/master/param_table.go | 18 +++++++++++---- internal/writenode/client/client.go | 4 ++-- internal/writenode/meta_table.go | 8 +++---- internal/writenode/param_table.go | 34 +++++++++++++++++++++++------ 5 files changed, 49 insertions(+), 18 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 055309840..f6910e166 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -21,7 +21,8 @@ etcd: rootPath: by-dev metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath - writeNodeKvSubPath: writer + writeNodeSegKvSubPath: writer/segment + writeNodeDDLKvSubPath: writer/ddl segThreshold: 10000 minio: diff --git a/internal/master/param_table.go b/internal/master/param_table.go index 2fa2f59d1..89f3a6345 100644 --- a/internal/master/param_table.go +++ b/internal/master/param_table.go @@ -16,10 +16,11 @@ type ParamTable struct { Address string Port int - EtcdAddress string - MetaRootPath string - KvRootPath string - PulsarAddress string + EtcdAddress string + MetaRootPath string + KvRootPath string + WriteNodeSegKvSubPath string + PulsarAddress string // nodeID ProxyIDList []typeutil.UniqueID @@ -68,6 +69,7 @@ func (p *ParamTable) Init() { p.initEtcdAddress() p.initMetaRootPath() p.initKvRootPath() + p.initWriteNodeSegKvSubPath() p.initPulsarAddress() p.initProxyIDList() @@ -147,6 +149,14 @@ func (p *ParamTable) initKvRootPath() { p.KvRootPath = rootPath + "/" + subPath } +func (p *ParamTable) initWriteNodeSegKvSubPath() { + subPath, err := p.Load("etcd.writeNodeSegKvSubPath") + if err != nil { + panic(err) + } + p.WriteNodeSegKvSubPath = subPath + "/" +} + func (p *ParamTable) initTopicNum() { iRangeStr, err := p.Load("msgChannel.channelRange.insert") if err != nil { diff --git a/internal/writenode/client/client.go b/internal/writenode/client/client.go index e6b99280f..3b67c4a90 100644 --- a/internal/writenode/client/client.go +++ b/internal/writenode/client/client.go @@ -25,7 +25,7 @@ type Client struct { flushStream msgstream.MsgStream } -func NewWriterClient(etcdAddress string, kvRootPath string, writeNodeKvSubPath string, flushStream msgstream.MsgStream) (*Client, error) { +func NewWriterClient(etcdAddress string, kvRootPath string, writeNodeSegKvSubPath string, flushStream msgstream.MsgStream) (*Client, error) { // init kv client etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) if err != nil { @@ -35,7 +35,7 @@ func NewWriterClient(etcdAddress string, kvRootPath string, writeNodeKvSubPath s return &Client{ kvClient: kvClient, - kvPrefix: writeNodeKvSubPath, + kvPrefix: writeNodeSegKvSubPath, flushStream: flushStream, }, nil } diff --git a/internal/writenode/meta_table.go b/internal/writenode/meta_table.go index 67596e852..ab5ac0446 100644 --- a/internal/writenode/meta_table.go +++ b/internal/writenode/meta_table.go @@ -105,12 +105,12 @@ func (mt *metaTable) saveDDLFlushMeta(meta *pb.DDLFlushMeta) error { mt.collID2DdlMeta[meta.CollectionID] = meta - return mt.client.Save("/writer/ddl/"+strconv.FormatInt(meta.CollectionID, 10), value) + return mt.client.Save(Params.WriteNodeDDLKvSubPath+strconv.FormatInt(meta.CollectionID, 10), value) } func (mt *metaTable) reloadDdlMetaFromKV() error { mt.collID2DdlMeta = make(map[UniqueID]*pb.DDLFlushMeta) - _, values, err := mt.client.LoadWithPrefix("writer/ddl") + _, values, err := mt.client.LoadWithPrefix(Params.WriteNodeDDLKvSubPath) if err != nil { return err } @@ -132,13 +132,13 @@ func (mt *metaTable) saveSegFlushMeta(meta *pb.SegmentFlushMeta) error { mt.segID2FlushMeta[meta.SegmentID] = *meta - return mt.client.Save("/writer/segment/"+strconv.FormatInt(meta.SegmentID, 10), value) + return mt.client.Save(Params.WriteNodeSegKvSubPath+strconv.FormatInt(meta.SegmentID, 10), value) } func (mt *metaTable) reloadSegMetaFromKV() error { mt.segID2FlushMeta = make(map[UniqueID]pb.SegmentFlushMeta) - _, values, err := mt.client.LoadWithPrefix("writer/segment") + _, values, err := mt.client.LoadWithPrefix(Params.WriteNodeSegKvSubPath) if err != nil { return err } diff --git a/internal/writenode/param_table.go b/internal/writenode/param_table.go index c1e9bae2e..654f6032f 100644 --- a/internal/writenode/param_table.go +++ b/internal/writenode/param_table.go @@ -36,13 +36,15 @@ type ParamTable struct { DefaultPartitionTag string SliceIndex int - EtcdAddress string - MetaRootPath string - MinioAddress string - MinioAccessKeyID string - MinioSecretAccessKey string - MinioUseSSL bool - MinioBucketName string + EtcdAddress string + MetaRootPath string + WriteNodeSegKvSubPath string + WriteNodeDDLKvSubPath string + MinioAddress string + MinioAccessKeyID string + MinioSecretAccessKey string + MinioUseSSL bool + MinioBucketName string FlushInsertBufSize int FlushDdBufSize int @@ -78,6 +80,8 @@ func (p *ParamTable) Init() { p.initPulsarAddress() p.initEtcdAddress() p.initMetaRootPath() + p.initWriteNodeSegKvSubPath() + p.initWriteNodeDDLKvSubPath() p.initInsertLogRootPath() p.initDdLogRootPath() @@ -299,6 +303,22 @@ func (p *ParamTable) initMetaRootPath() { p.MetaRootPath = rootPath + "/" + subPath } +func (p *ParamTable) initWriteNodeSegKvSubPath() { + subPath, err := p.Load("etcd.writeNodeSegKvSubPath") + if err != nil { + panic(err) + } + p.WriteNodeSegKvSubPath = subPath + "/" +} + +func (p *ParamTable) initWriteNodeDDLKvSubPath() { + subPath, err := p.Load("etcd.writeNodeDDLKvSubPath") + if err != nil { + panic(err) + } + p.WriteNodeDDLKvSubPath = subPath + "/" +} + func (p *ParamTable) initInsertLogRootPath() { rootPath, err := p.Load("etcd.rootPath") if err != nil { -- GitLab