diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index e1886b77aaaf0b7e7d58de657b2e13b5ea58676e..a683e4a28f79e99bc612ede3cbade3305ec34399 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -36,6 +36,9 @@ func main() { psc.Params.Init() log.Printf("proxy service address : %s", psc.Params.ServiceAddress) proxyService := psc.NewClient(psc.Params.ServiceAddress) + if err = proxyService.Init(); err != nil { + panic(err) + } for cnt = 0; cnt < reTryCnt; cnt++ { pxStates, err := proxyService.GetComponentStates() diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 83400a8ea206af8f18e99faaab15545048f5d81a..48827b965be801df7f105dcbcdad57a77548c0fb 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl { func (alloc *allocatorImpl) allocID() (UniqueID, error) { resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowCollections, + MsgType: commonpb.MsgType_kRequestID, MsgID: 1, // GOOSE TODO Timestamp: 0, // GOOSE TODO SourceID: Params.NodeID, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 1cbdf38d3c6c994ae577548db6819518fb4772f3..7de7101c190304d4e28d5bdac0ee70672e22dcb9 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -1,20 +1,15 @@ package datanode import ( - "context" - "fmt" "log" "math/rand" "os" "strconv" "testing" - "time" "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "github.com/zilliztech/milvus-distributed/internal/master" ) func makeNewChannelNames(names []string, suffix string) []string { @@ -36,52 +31,10 @@ func refreshChannelNames() { Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) } -func startMaster(ctx context.Context) { - master.Init() - etcdAddr := master.Params.EtcdAddress - metaRootPath := master.Params.MetaRootPath - - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - if err != nil { - panic(err) - } - _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix()) - if err != nil { - panic(err) - } - - masterPort := 53101 - master.Params.Port = masterPort - svr, err := master.CreateServer(ctx) - if err != nil { - log.Print("create server failed", zap.Error(err)) - } - if err := svr.Run(int64(master.Params.Port)); err != nil { - log.Fatal("run server failed", zap.Error(err)) - } - - fmt.Println("Waiting for server!", svr.IsServing()) - Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort) -} - func TestMain(m *testing.M) { Params.Init() refreshChannelNames() - const ctxTimeInMillisecond = 2000 - const closeWithDeadline = true - var ctx context.Context - - if closeWithDeadline { - var cancel context.CancelFunc - d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel = context.WithDeadline(context.Background(), d) - defer cancel() - } else { - ctx = context.Background() - } - - startMaster(ctx) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/datanode/meta_table.go b/internal/datanode/meta_table.go index 898a6aa64a0426d5f578d32e1a808882f953d8c6..6a9128ff8690a40e149db17f082ad3a21b2c2e6a 100644 --- a/internal/datanode/meta_table.go +++ b/internal/datanode/meta_table.go @@ -12,9 +12,9 @@ import ( ) type metaTable struct { - client kv.TxnBase // + client kv.Base // segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta - collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush + collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta lock sync.RWMutex } @@ -36,24 +36,6 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) { return mt, nil } -func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { - mt.lock.Lock() - defer mt.lock.Unlock() - - _, ok := mt.collID2DdlMeta[collID] - if !ok { - mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ - CollectionID: collID, - BinlogPaths: make([]string, 0), - } - } - - meta := mt.collID2DdlMeta[collID] - meta.BinlogPaths = append(meta.BinlogPaths, paths...) - - return mt.saveDDLFlushMeta(meta) -} - func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error { _, ok := mt.segID2FlushMeta[segmentID] if !ok { @@ -97,31 +79,23 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error { return mt.saveSegFlushMeta(meta) } -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.collID2DdlMeta[meta.CollectionID] = meta - prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) - - return mt.client.Save(prefix, value) -} +func (mt *metaTable) reloadSegMetaFromKV() error { + mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta) -func (mt *metaTable) reloadDdlMetaFromKV() error { - mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) - _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) + _, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath) if err != nil { return err } for _, value := range values { - ddlMeta := &datapb.DDLFlushMeta{} - err = proto.UnmarshalText(value, ddlMeta) + flushMeta := &datapb.SegmentFlushMeta{} + err = proto.UnmarshalText(value, flushMeta) if err != nil { return err } - mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta + mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta } + return nil } @@ -135,26 +109,6 @@ func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { return mt.client.Save(prefix, value) } -func (mt *metaTable) reloadSegMetaFromKV() error { - mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta) - - _, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath) - if err != nil { - return err - } - - for _, value := range values { - flushMeta := &datapb.SegmentFlushMeta{} - err = proto.UnmarshalText(value, flushMeta) - if err != nil { - return err - } - mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta - } - - return nil -} - func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() @@ -197,6 +151,61 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, return ret, nil } +// --- DDL --- +func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { + mt.lock.Lock() + defer mt.lock.Unlock() + + _, ok := mt.collID2DdlMeta[collID] + if !ok { + mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ + CollectionID: collID, + BinlogPaths: make([]string, 0), + } + } + + meta := mt.collID2DdlMeta[collID] + meta.BinlogPaths = append(meta.BinlogPaths, paths...) + + return mt.saveDDLFlushMeta(meta) +} + +func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool { + mt.lock.RLock() + defer mt.lock.RUnlock() + + _, ok := mt.collID2DdlMeta[collID] + return ok +} + +// metaTable.lock.Lock() before call this function +func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { + value := proto.MarshalTextString(meta) + + mt.collID2DdlMeta[meta.CollectionID] = meta + prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) + + return mt.client.Save(prefix, value) +} + +func (mt *metaTable) reloadDdlMetaFromKV() error { + mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) + _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) + if err != nil { + return err + } + + for _, value := range values { + ddlMeta := &datapb.DDLFlushMeta{} + err = proto.UnmarshalText(value, ddlMeta) + if err != nil { + return err + } + mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta + } + return nil +} + func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) { mt.lock.RLock() defer mt.lock.RUnlock() diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go index dd5a4251dc5baaf8c98a9727e5d4165d76edc187..247cbff51f82542f7ca7e5597787686abfb856e7 100644 --- a/internal/datanode/meta_table_test.go +++ b/internal/datanode/meta_table_test.go @@ -1,26 +1,16 @@ package datanode import ( - "context" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "go.etcd.io/etcd/clientv3" + memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" ) -func TestMetaTable_all(t *testing.T) { +func TestMetaTable_SegmentFlush(t *testing.T) { - etcdAddr := Params.EtcdAddress - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - require.NoError(t, err) - etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root") - - _, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix()) - require.NoError(t, err) - - meta, err := NewMetaTable(etcdKV) + kvMock := memkv.NewMemoryKV() + meta, err := NewMetaTable(kvMock) assert.NoError(t, err) defer meta.client.Close() @@ -65,8 +55,37 @@ func TestMetaTable_all(t *testing.T) { ret) }) + t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) { + + var segmentID UniqueID = 401 + + err := meta.addSegmentFlush(segmentID) + assert.NoError(t, err) + + ret, err := meta.checkFlushComplete(segmentID) + assert.NoError(t, err) + assert.Equal(t, false, ret) + + meta.CompleteFlush(segmentID) + + ret, err = meta.checkFlushComplete(segmentID) + assert.NoError(t, err) + assert.Equal(t, true, ret) + }) + +} + +func TestMetaTable_DDLFlush(t *testing.T) { + kvMock := memkv.NewMemoryKV() + meta, err := NewMetaTable(kvMock) + assert.NoError(t, err) + defer meta.client.Close() + t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) { + assert.False(t, meta.hasDDLFlushMeta(301)) + assert.False(t, meta.hasDDLFlushMeta(302)) + collID2Paths := map[UniqueID][]string{ 301: {"a", "b", "c"}, 302: {"c", "b", "a"}, @@ -84,24 +103,8 @@ func TestMetaTable_all(t *testing.T) { assert.Nil(t, err) assert.Equal(t, map[UniqueID][]string{k: v}, ret) } - }) - - t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) { - - var segmentID UniqueID = 401 - - err := meta.addSegmentFlush(segmentID) - assert.NoError(t, err) - - ret, err := meta.checkFlushComplete(segmentID) - assert.NoError(t, err) - assert.Equal(t, false, ret) - meta.CompleteFlush(segmentID) - - ret, err = meta.checkFlushComplete(segmentID) - assert.NoError(t, err) - assert.Equal(t, true, ret) + assert.True(t, meta.hasDDLFlushMeta(301)) + assert.True(t, meta.hasDDLFlushMeta(302)) }) - } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 7617b078135306c92b814eb3d845e63bac69474c..b295e685fe4dd4b0278cf6e9bb94fd2e3506774f 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -123,7 +123,7 @@ func (s *Server) init() error { }() s.wg.Add(1) - s.startGrpcLoop(Params.Port) + go s.startGrpcLoop(Params.Port) // wait for grpc server loop start err = <-s.grpcErrChan if err != nil { diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index ece32912be16795f2c5d1c6ba768c341136a744f..035321e55f3ad4338505acde6e2074fab345f7cf 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -59,7 +59,7 @@ func (s *Server) init() error { proxyservice.Params.Init() s.wg.Add(1) - s.startGrpcLoop(Params.ServicePort) + go s.startGrpcLoop(Params.ServicePort) // wait for grpc server loop start if err := <-s.grpcErrChan; err != nil { return err diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 89d7b10749f3fd24b0405c386ded475d4fa1e5ba..d2d8ab7613d93607d6df72f2dab4e3b891d59db4 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -247,7 +247,9 @@ func (c *Core) checkInit() error { if c.DataNodeSegmentFlushCompletedChan == nil { return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil") } - log.Printf("master node id = %d\n", Params.NodeID) + log.Printf("master node id = %d", Params.NodeID) + log.Printf("master dd channel name = %s", Params.DdChannel) + log.Printf("master time ticke channel name = %s", Params.TimeTickChannel) return nil } @@ -607,6 +609,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { return err } Params.ProxyTimeTickChannel = rsp + log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel) c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ @@ -633,6 +636,8 @@ func (c *Core) SetDataService(s DataServiceInterface) error { return err } Params.DataServiceSegmentChannel = rsp + log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel) + c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { ts, err := c.tsoAllocator.Alloc(1) if err != nil { diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index fe975d9fc4ffb5b5ee9ca28f2c18199877a3d420..6f6287931d09480bd48de6fb2e450964db95bd4a 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -39,8 +39,9 @@ func (s *ServiceImpl) fillNodeInitParams() error { getConfigContentByName := func(fileName string) []byte { _, fpath, _, _ := runtime.Caller(0) - configFile := path.Dir(fpath) + "/../../../configs/" + fileName + configFile := path.Dir(fpath) + "/../../configs/" + fileName _, err := os.Stat(configFile) + log.Printf("configFile = %s", configFile) if os.IsNotExist(err) { runPath, err := os.Getwd() if err != nil { diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index d91ee4d3266287f7661667a27356342ff4d50493..0a105d0d97dc7311ddbb6146ccde17a1d4c6eaff 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -18,7 +18,7 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast # TODO: remove to distributed #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast #go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast -#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast +go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast #go test -race -cover "${MILVUS_DIR}/master/..." -failfast #go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast #go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast