diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index adedbee98e812358c5ea297da7df04f616b79fab..a11c7fccb01cccebaf4a371515bf4c58a16c6650 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -13,6 +13,8 @@ msgChannel: # channel name generation rule: ${namePrefix}-${ChannelIdx} chanNamePrefix: dataDefinition: "data-definition" + masterTimeTick: "master-timetick" + masterStatistics: "master-statistics" insert: "insert" delete: "delete" search: "search" diff --git a/configs/advanced/common.yaml b/configs/advanced/common.yaml index 367aa3dc65e78f20e9e89fe2fdfe6f9a900c2a7c..f1f5a25ffe18921de80dc34c28f32bb57d083cc8 100644 --- a/configs/advanced/common.yaml +++ b/configs/advanced/common.yaml @@ -10,4 +10,6 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. common: - defaultPartitionTag: _default + defaultPartitionTag: _default #TODO, remove + defaultPartitionName: "_default" + defaultIndexName: "_default_idx" diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index b6386554bb5c44203f66e4292fded7463b5f5a93..69474322ae7cca630a3823294effab00227af124 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -22,4 +22,5 @@ master: # old name: segmentExpireDuration: 2000 IDAssignExpiration: 2000 # ms - maxPartitionNum: 4096 \ No newline at end of file + maxPartitionNum: 4096 + nodeID: 100 \ No newline at end of file diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 3d3219a65eac4df3dd2f37f99ecaa2b8f0965ad9..933447d2da88412eb645f41452dd76e162c72fc2 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "regexp" + "sync" "testing" "time" @@ -22,12 +23,15 @@ func TestGrpcService(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - cms.Params.Address = "127.0.0.1" + //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - cms.Params.NodeID = 0 - cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650" - cms.Params.EtcdAddress = "127.0.0.1:2379" + svr, err := NewGrpcServer() + assert.Nil(t, err) + + // cms.Params.NodeID = 0 + //cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650" + //cms.Params.EtcdAddress = "127.0.0.1:2379" cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) cms.Params.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) cms.Params.ProxyTimeTickChannel = fmt.Sprintf("proxyTimeTick%d", randVal) @@ -43,9 +47,6 @@ func TestGrpcService(t *testing.T) { t.Logf("master service port = %d", cms.Params.Port) - svr, err := NewGrpcServer() - assert.Nil(t, err) - core := svr.core.(*cms.Core) err = svr.Init(&cms.InitParams{ProxyTimeTickChannel: fmt.Sprintf("proxyTimeTick%d", randVal)}) @@ -94,8 +95,11 @@ func TestGrpcService(t *testing.T) { return []string{"file1", "file2", "file3"}, nil } + var binlogLock sync.Mutex binlogPathArray := make([]string, 0, 16) core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) { + binlogLock.Lock() + defer binlogLock.Unlock() binlogPathArray = append(binlogPathArray, binlog...) return 2000, nil } @@ -109,7 +113,7 @@ func TestGrpcService(t *testing.T) { err = svr.Start() assert.Nil(t, err) - cli, err := NewGrpcClient(fmt.Sprintf("127.0.0.1:%d", cms.Params.Port), 3*time.Second) + cli, err := NewGrpcClient(fmt.Sprintf("%s:%d", cms.Params.Address, cms.Params.Port), 3*time.Second) assert.Nil(t, err) err = cli.Init(&cms.InitParams{ProxyTimeTickChannel: fmt.Sprintf("proxyTimeTick%d", randVal)}) @@ -403,6 +407,8 @@ func TestGrpcService(t *testing.T) { rsp, err := cli.CreateIndex(req) assert.Nil(t, err) assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_SUCCESS) + binlogLock.Lock() + defer binlogLock.Unlock() assert.Equal(t, 3, len(binlogPathArray)) assert.ElementsMatch(t, binlogPathArray, []string{"file1", "file2", "file3"}) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 33b5a47bb714b6806c6ddbddc2b843a88cc752f8..ae27748381b870d4313c14e85bea54eeea88fca9 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -96,14 +96,17 @@ type Interface interface { // master core type Core struct { - //TODO DataService Interface - //TODO IndexService Interface - //TODO ProxyServiceClient Interface, get proxy service time tick channel,InvalidateCollectionMetaCache + /* + ProxyServiceClient Interface: + get proxy service time tick channel,InvalidateCollectionMetaCache - //TODO Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta + DataService Interface: + Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta + Segment Flush Watcher, monitor if segment has flushed into disk - //TODO Segment Flush Watcher, monitor if segment has flushed into disk - //TODO indexBuilder Sch, tell index service to build index + IndexService Interface: + indexBuilder Sch, tell index service to build index + */ MetaTable *metaTable //id allocator @@ -485,8 +488,8 @@ func (c *Core) setMsgStreams() error { } // receive time tick from msg stream + c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024) go func() { - c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024) for { select { case <-c.ctx.Done(): @@ -515,6 +518,7 @@ func (c *Core) setMsgStreams() error { dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024) dataServiceStream.Start() c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024) + c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024) // receive segment info from msg stream go func() { @@ -536,7 +540,7 @@ func (c *Core) setMsgStreams() error { if ok { c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID } else { - log.Printf("receiver unexpected msg from data service stream, value = %v", segm) + log.Printf("receive unexpected msg from data service stream, value = %v", segm) } } } diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 3e0e4d8b30201825db1c05be2c237a82d5d1719d..24486a3d0dfb49b5ba4b845be88b512264f5f956 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -18,7 +18,8 @@ import ( func TestMetaTable(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - etcdAddr := "127.0.0.1:2379" + Params.Init() + etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 7d15e119e80fdddb23f485228e7db33767fc668b..61dde4ba0958e846a48c56d0ed6df0c0d9a193e2 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -17,14 +17,149 @@ type ParamTable struct { EtcdAddress string MetaRootPath string KvRootPath string - ProxyTimeTickChannel string + ProxyTimeTickChannel string //get from proxy client MsgChannelSubName string TimeTickChannel string DdChannel string StatisticsChannel string - DataServiceSegmentChannel string // data service create segment, or data node flush segment + DataServiceSegmentChannel string // get from data service, data service create segment, or data node flush segment MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string } + +func (p *ParamTable) Init() { + // load yaml + p.BaseTable.Init() + err := p.LoadYaml("advanced/master.yaml") + if err != nil { + panic(err) + } + + p.initAddress() + p.initPort() + p.initNodeID() + + p.initPulsarAddress() + p.initEtcdAddress() + p.initMetaRootPath() + p.initKvRootPath() + + p.initMsgChannelSubName() + p.initTimeTickChannel() + p.initDdChannelName() + p.initStatisticsChannelName() + + p.initMaxPartitionNum() + p.initDefaultPartitionName() + p.initDefaultIndexName() +} + +func (p *ParamTable) initAddress() { + masterAddress, err := p.Load("master.address") + if err != nil { + panic(err) + } + p.Address = masterAddress +} + +func (p *ParamTable) initPort() { + p.Port = p.ParseInt("master.port") +} + +func (p *ParamTable) initNodeID() { + p.NodeID = uint64(p.ParseInt64("master.nodeID")) +} + +func (p *ParamTable) initPulsarAddress() { + addr, err := p.Load("_PulsarAddress") + if err != nil { + panic(err) + } + p.PulsarAddress = addr +} + +func (p *ParamTable) initEtcdAddress() { + addr, err := p.Load("_EtcdAddress") + if err != nil { + panic(err) + } + p.EtcdAddress = addr +} + +func (p *ParamTable) initMetaRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := p.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + p.MetaRootPath = rootPath + "/" + subPath +} + +func (p *ParamTable) initKvRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := p.Load("etcd.kvSubPath") + if err != nil { + panic(err) + } + p.KvRootPath = rootPath + "/" + subPath +} + +func (p *ParamTable) initMsgChannelSubName() { + name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix") + if err != nil { + panic(err) + } + p.MsgChannelSubName = name +} + +func (p *ParamTable) initTimeTickChannel() { + channel, err := p.Load("msgChannel.chanNamePrefix.masterTimeTick") + if err != nil { + panic(err) + } + p.TimeTickChannel = channel +} + +func (p *ParamTable) initDdChannelName() { + channel, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") + if err != nil { + panic(err) + } + p.DdChannel = channel +} + +func (p *ParamTable) initStatisticsChannelName() { + channel, err := p.Load("msgChannel.chanNamePrefix.masterStatistics") + if err != nil { + panic(err) + } + p.StatisticsChannel = channel +} + +func (p *ParamTable) initMaxPartitionNum() { + p.MaxPartitionNum = p.ParseInt64("master.maxPartitionNum") +} + +func (p *ParamTable) initDefaultPartitionName() { + name, err := p.Load("common.defaultPartitionName") + if err != nil { + panic(err) + } + p.DefaultPartitionName = name +} + +func (p *ParamTable) initDefaultIndexName() { + name, err := p.Load("common.defaultIndexName") + if err != nil { + panic(err) + } + p.DefaultIndexName = name +} diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2c2b071e7c35202240bedb45202a66fdf36b3590 --- /dev/null +++ b/internal/masterservice/param_table_test.go @@ -0,0 +1,53 @@ +package masterservice + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.Address, "") + t.Logf("master address = %s", Params.Address) + + assert.NotEqual(t, Params.Port, 0) + t.Logf("master port = %d", Params.Port) + + assert.NotEqual(t, Params.NodeID, 0) + t.Logf("master node ID = %d", Params.NodeID) + + assert.NotEqual(t, Params.PulsarAddress, "") + t.Logf("pulsar address = %s", Params.PulsarAddress) + + assert.NotEqual(t, Params.EtcdAddress, "") + t.Logf("etcd address = %s", Params.EtcdAddress) + + assert.NotEqual(t, Params.MetaRootPath, "") + t.Logf("meta root path = %s", Params.MetaRootPath) + + assert.NotEqual(t, Params.KvRootPath, "") + t.Logf("kv root path = %s", Params.KvRootPath) + + assert.NotEqual(t, Params.MsgChannelSubName, "") + t.Logf("msg channel sub name = %s", Params.MsgChannelSubName) + + assert.NotEqual(t, Params.TimeTickChannel, "") + t.Logf("master time tick channel = %s", Params.TimeTickChannel) + + assert.NotEqual(t, Params.DdChannel, "") + t.Logf("master dd channel = %s", Params.DdChannel) + + assert.NotEqual(t, Params.StatisticsChannel, "") + t.Logf("master statistics channel = %s", Params.StatisticsChannel) + + assert.NotEqual(t, Params.MaxPartitionNum, 0) + t.Logf("master initMaxPartitionNum = %d", Params.MaxPartitionNum) + + assert.NotEqual(t, Params.DefaultPartitionName, "") + t.Logf("default partition name = %s", Params.DefaultPartitionName) + + assert.NotEqual(t, Params.DefaultIndexName, "") + t.Logf("default index name = %s", Params.DefaultIndexName) +} diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 48438fa2a756d206f7557bd8e6f79cf208003b29..fc37534e360267e272c2264a231106699a89b565 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -20,4 +20,5 @@ go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast go test -race -cover "${MILVUS_DIR}/master/..." -failfast go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast +go test -race -cover -v "${MILVUS_DIR}/masterservice" "${MILVUS_DIR}/distributed/masterservice" -failfast #go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast