diff --git a/internal/allocator/global_id.go b/internal/allocator/global_id.go index 19cb984153bd2be313723dbc855a73561842dd9d..24710ebbae64d4be1763f2a2f8c15ba43894131e 100644 --- a/internal/allocator/global_id.go +++ b/internal/allocator/global_id.go @@ -17,7 +17,7 @@ type GlobalIDAllocator struct { allocator tso.Allocator } -func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { +func NewGlobalIDAllocator(key string, base kv.TxnKV) *GlobalIDAllocator { allocator := tso.NewGlobalTSOAllocator(key, base) allocator.SetLimitMaxLogic(false) return &GlobalIDAllocator{ diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go index 7513f6feeb2cde5ec7eaa7a30c76167e99468829..a0c83623d4173809ffd0d11bb7ae7d2823e20b39 100644 --- a/internal/datanode/binlog_meta.go +++ b/internal/datanode/binlog_meta.go @@ -14,11 +14,11 @@ import ( // segment binlog meta key: // ${prefix}/${segmentID}/${fieldID}/${idx} type binlogMeta struct { - client kv.TxnBase // etcd kv + client kv.TxnKV // etcd kv idAllocator allocatorInterface } -func NewBinlogMeta(kv kv.TxnBase, idAllocator allocatorInterface) (*binlogMeta, error) { +func NewBinlogMeta(kv kv.TxnKV, idAllocator allocatorInterface) (*binlogMeta, error) { mt := &binlogMeta{ client: kv, idAllocator: idAllocator, diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 7a7c397a9b7a2810a8f67d95953ec4a6fa66b539..3b192b8c05ea6f0ea70711015634ffdc02be9541 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -31,7 +31,7 @@ type ddNode struct { flushMap *sync.Map inFlushCh <-chan *flushMsg - kv kv.Base + kv kv.BaseKV replica Replica binlogMeta *binlogMeta } @@ -194,7 +194,7 @@ The keys of the binlogs are generated as below: */ func flushTxn(ddlData *sync.Map, - kv kv.Base, + kv kv.BaseKV, meta *binlogMeta) { // generate binlog ddCodec := &storage.DataDefinitionCodec{} diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 76b339a56f87ff9c1bec0e8a0d811464374a6fe0..b3e5cd9300698ea896cd620007a4f73f72baab93 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -42,7 +42,7 @@ type insertBufferNode struct { flushMeta *binlogMeta flushMap sync.Map - minIOKV kv.Base + minIOKV kv.BaseKV timeTickStream msgstream.MsgStream segmentStatisticsStream msgstream.MsgStream @@ -564,7 +564,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID, - insertData *sync.Map, meta *binlogMeta, kv kv.Base, finishCh chan<- bool) { + insertData *sync.Map, meta *binlogMeta, kv kv.BaseKV, finishCh chan<- bool) { clearFn := func(isSuccess bool) { finishCh <- isSuccess diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go index e4b057d14af234b4e611eeea20c1dd9a98aae8be..4b0c54aac550af6724ab0d1d91004b1f4940965c 100644 --- a/internal/dataservice/meta.go +++ b/internal/dataservice/meta.go @@ -25,7 +25,7 @@ type errCollectionNotFound struct { } type meta struct { sync.RWMutex - client kv.TxnBase // client of a reliable kv service, i.e. etcd client + client kv.TxnKV // client of a reliable kv service, i.e. etcd client collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info segments map[UniqueID]*datapb.SegmentInfo // segment id to segment info } @@ -46,7 +46,7 @@ func (err errCollectionNotFound) Error() string { return fmt.Sprintf("collection %d not found", err.collectionID) } -func newMeta(kv kv.TxnBase) (*meta, error) { +func newMeta(kv kv.TxnKV) (*meta, error) { mt := &meta{ client: kv, collections: make(map[UniqueID]*datapb.CollectionInfo), diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 3dfab913e18051074c305cc49e247fba0c455375..7f74de656e5ca80e01a67131dad08fde53672cf4 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -36,7 +36,7 @@ type IndexNode struct { sched *TaskScheduler - kv kv.Base + kv kv.BaseKV serviceClient types.IndexService // method factory diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 3189e6e67b8fff365b85e34b4328ef7db88cf5f2..25cef08698453c102c847ad586db292f4236f9c5 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -71,7 +71,7 @@ func (bt *BaseTask) Notify(err error) { type IndexBuildTask struct { BaseTask index Index - kv kv.Base + kv kv.BaseKV savePaths []string req *indexpb.BuildIndexRequest serviceClient types.IndexService diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index 6673a65c6d4c46917fc2f8c88e7b8080ccd8a8c4..d076f7c36de0b99794751734f97aa325e25d7360 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -164,14 +164,14 @@ type TaskScheduler struct { IndexBuildQueue TaskQueue buildParallel int - kv kv.Base + kv kv.BaseKV wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } func NewTaskScheduler(ctx context.Context, - kv kv.Base) (*TaskScheduler, error) { + kv kv.BaseKV) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ kv: kv, diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index b9388e069b1cf90b364c3bc49cab6d096c5d3a5d..bb38c3b5bbe95dea273f7301e973b5684996978c 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -44,7 +44,7 @@ type IndexService struct { idAllocator *allocator.GlobalIDAllocator - kv kv.Base + kv kv.BaseKV metaTable *metaTable diff --git a/internal/indexservice/meta_table.go b/internal/indexservice/meta_table.go index 74871d79364c40aac59c44ae47df6a818c4c39bc..3d988271109d6d3e26fd21066cc76fad06edd7d6 100644 --- a/internal/indexservice/meta_table.go +++ b/internal/indexservice/meta_table.go @@ -26,13 +26,13 @@ import ( ) type metaTable struct { - client kv.TxnBase // client of a reliable kv service, i.e. etcd client + client kv.TxnKV // client of a reliable kv service, i.e. etcd client indexBuildID2Meta map[UniqueID]indexpb.IndexMeta // index build id to index meta lock sync.RWMutex } -func NewMetaTable(kv kv.TxnBase) (*metaTable, error) { +func NewMetaTable(kv kv.TxnKV) (*metaTable, error) { mt := &metaTable{ client: kv, lock: sync.RWMutex{}, diff --git a/internal/indexservice/task.go b/internal/indexservice/task.go index c0026685a70606e11bcb80c23e62cbe10d127dbf..8ab4d1845eefdb6d21e25077c98a06552d6bb626 100644 --- a/internal/indexservice/task.go +++ b/internal/indexservice/task.go @@ -65,7 +65,7 @@ type IndexAddTask struct { indexBuildID UniqueID idAllocator *allocator.GlobalIDAllocator buildQueue TaskQueue - kv kv.Base + kv kv.BaseKV builderClient types.IndexNode nodeClients *PriorityQueue buildClientNodeID UniqueID diff --git a/internal/indexservice/task_scheduler.go b/internal/indexservice/task_scheduler.go index 253dcfdda20fc5d2a5bf14e0568388823a57c5ec..9d5c00229f192a6edf7aba9d02414fa81e984de9 100644 --- a/internal/indexservice/task_scheduler.go +++ b/internal/indexservice/task_scheduler.go @@ -177,7 +177,7 @@ type TaskScheduler struct { idAllocator *allocator.GlobalIDAllocator metaTable *metaTable - kv kv.Base + kv kv.BaseKV wg sync.WaitGroup ctx context.Context @@ -186,7 +186,7 @@ type TaskScheduler struct { func NewTaskScheduler(ctx context.Context, idAllocator *allocator.GlobalIDAllocator, - kv kv.Base, + kv kv.BaseKV, table *metaTable) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 332cb5015893389c90a4bf911750d54b88247622..170f9e5a0257ae9119a44791bd5e71c3e5327678 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -1,6 +1,6 @@ package kv -type Base interface { +type BaseKV interface { Load(key string) (string, error) MultiLoad(keys []string) ([]string, error) LoadWithPrefix(key string) ([]string, []string, error) @@ -13,8 +13,8 @@ type Base interface { Close() } -type TxnBase interface { - Base +type TxnKV interface { + BaseKV MultiSaveAndRemove(saves map[string]string, removals []string) error MultiRemoveWithPrefix(keys []string) error MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index f895ee21509d04f9c845c4ff6fa874f941ad31b7..6a0640e0b214d69d734f18bb1f67074c33ea11f8 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -32,7 +32,7 @@ const ( ) type metaTable struct { - client kv.TxnBase // client of a reliable kv service, i.e. etcd client + client kv.TxnKV // client of a reliable kv service, i.e. etcd client tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection_id -> meta @@ -50,7 +50,7 @@ type metaTable struct { ddLock sync.RWMutex } -func NewMetaTable(kv kv.TxnBase) (*metaTable, error) { +func NewMetaTable(kv kv.TxnKV) (*metaTable, error) { mt := &metaTable{ client: kv, tenantLock: sync.RWMutex{}, diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 2a7289f2277d3f3617c5f6ed57b2af952b87749c..0f0b6ca806952be1e4a9e91c23ee23545ae93271 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -19,7 +19,7 @@ import ( ) type mockTestKV struct { - kv.TxnBase + kv.TxnKV loadWithPrefix func(key string) ([]string, []string, error) save func(key, value string) error diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 50769fc898eb854ad1a01ede8781848779db9154..b3644fbbfa94a5d596d24447b25d5f425cf87033 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -36,7 +36,7 @@ type indexLoader struct { masterService types.MasterService indexService types.IndexService - kv kv.Base // minio kv + kv kv.BaseKV // minio kv } func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) { diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 90726015fada128400f3d3713e6f255288aaeeb9..2d64d29a86f4da798cc0ebae5d50d40edce9606e 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -24,7 +24,7 @@ type segmentLoader struct { dataService types.DataService - kv kv.Base // minio kv + kv kv.BaseKV // minio kv indexLoader *indexLoader } diff --git a/internal/tso/global_allocator.go b/internal/tso/global_allocator.go index e68b6f2ac6348d0cf40eb7cf117653d6360ecc28..2546d82acb61563e7e1f7026f367eedb08979757 100644 --- a/internal/tso/global_allocator.go +++ b/internal/tso/global_allocator.go @@ -51,10 +51,10 @@ type GlobalTSOAllocator struct { } // NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator { +func NewGlobalTSOAllocator(key string, txnKV kv.TxnKV) *GlobalTSOAllocator { return &GlobalTSOAllocator{ tso: ×tampOracle{ - kvBase: kvBase, + txnKV: txnKV, saveInterval: 3 * time.Second, maxResetTSGap: func() time.Duration { return 3 * time.Second }, key: key, diff --git a/internal/tso/tso.go b/internal/tso/tso.go index 04f58beae4420e615c4cddffd58cec16f1f19a30..aa762783908b799efa39b26a9df623c0d5365b62 100644 --- a/internal/tso/tso.go +++ b/internal/tso/tso.go @@ -46,8 +46,8 @@ type atomicObject struct { // timestampOracle is used to maintain the logic of tso. type timestampOracle struct { - key string - kvBase kv.TxnBase + key string + txnKV kv.TxnKV // TODO: remove saveInterval saveInterval time.Duration @@ -58,7 +58,7 @@ type timestampOracle struct { } func (t *timestampOracle) loadTimestamp() (time.Time, error) { - strData, err := t.kvBase.Load(t.key) + strData, err := t.txnKV.Load(t.key) var binData []byte = []byte(strData) @@ -75,7 +75,7 @@ func (t *timestampOracle) loadTimestamp() (time.Time, error) { // otherwise, update it. func (t *timestampOracle) saveTimestamp(ts time.Time) error { data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - err := t.kvBase.Save(t.key, string(data)) + err := t.txnKV.Save(t.key, string(data)) if err != nil { return errors.WithStack(err) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 15aaf5ed03ebd331e534a61e109e964601a25ed1..6c84f6bef2e27671ec68ac70c805a5381842ddc3 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -56,7 +56,7 @@ func combKey(channelName string, id UniqueID) (string, error) { type rocksmq struct { store *gorocksdb.DB - kv kv.Base + kv kv.BaseKV idAllocator allocator.GIDAllocator channelMu map[string]*sync.Mutex