Skip to content
Snippets Groups Projects
Select Git revision
  • a8b78f11c36ee39f238d4eeffc8977c50d8d8d92
  • master default protected
  • benchmark protected
  • v2.0.0-rc4
  • v2.0.0-rc2
  • v2.0.0-rc1
  • v1.1.1
  • v1.1.0
  • v1.0.0
  • v0.10.6
  • v0.10.5
  • v0.10.4
  • v0.10.3
  • v0.10.2
  • v0.10.1
  • v0.8.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
23 results

service_msg.pb.cc

Blame
  • indexnode_test.go 2.93 KiB
    package indexnode
    
    import (
    	"context"
    	"fmt"
    	"log"
    	"os"
    	"strconv"
    	"testing"
    	"time"
    
    	"go.etcd.io/etcd/clientv3"
    
    	"go.uber.org/zap"
    
    	"github.com/stretchr/testify/assert"
    	indexnodeclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client"
    	"github.com/zilliztech/milvus-distributed/internal/master"
    	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
    )
    
    var ctx context.Context
    var cancel func()
    
    var buildClient *indexnodeclient.Client
    
    var builderServer *Builder
    
    var masterPort = 53101
    var masterServer *master.Master
    
    func makeMasterAddress(port int64) string {
    	masterAddr := "127.0.0.1:" + strconv.FormatInt(port, 10)
    	return masterAddr
    }
    
    func refreshMasterAddress() {
    	masterAddr := makeMasterAddress(int64(masterPort))
    	Params.MasterAddress = masterAddr
    	master.Params.Port = masterPort
    }
    
    func startMaster(ctx context.Context) {
    	master.Init()
    	refreshMasterAddress()
    	etcdAddr := master.Params.EtcdAddress
    	metaRootPath := master.Params.MetaRootPath
    
    	etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
    	if err != nil {
    		panic(err)
    	}
    	_, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
    	if err != nil {
    		panic(err)
    	}
    
    	svr, err := master.CreateServer(ctx)
    	masterServer = svr
    	if err != nil {
    		log.Print("create server failed", zap.Error(err))
    	}
    	if err := svr.Run(int64(master.Params.Port)); err != nil {
    		log.Fatal("run server failed", zap.Error(err))
    	}
    
    	fmt.Println("Waiting for server!", svr.IsServing())
    
    }
    
    func startBuilder(ctx context.Context) {
    	var err error
    	builderServer, err = CreateBuilder(ctx)
    	if err != nil {
    		log.Print("create builder failed", zap.Error(err))
    	}
    
    	// TODO: change to wait until master is ready
    	if err := builderServer.Start(); err != nil {
    		log.Fatal("run builder failed", zap.Error(err))
    	}
    }
    
    func setup() {
    	Params.Init()
    	ctx, cancel = context.WithCancel(context.Background())
    	startMaster(ctx)
    	startBuilder(ctx)
    	addr := Params.Address
    	var err error
    	buildClient, err = indexnodeclient.NewBuildIndexClient(ctx, addr)
    	if err != nil {
    		panic("Create buildClient Failed!")
    	}
    
    }
    
    func shutdown() {
    	cancel()
    	builderServer.Close()
    	masterServer.Close()
    }
    
    func TestMain(m *testing.M) {
    	setup()
    	code := m.Run()
    	shutdown()
    	os.Exit(code)
    }
    
    func TestBuilder_GRPC(t *testing.T) {
    	typeParams := make(map[string]string)
    	typeParams["a"] = "1"
    	indexParams := make(map[string]string)
    	indexParams["b"] = "2"
    	columnDataPaths := []string{"dataA", "dataB"}
    	indexID, err := buildClient.BuildIndex(columnDataPaths, typeParams, indexParams)
    	assert.Nil(t, err)
    
    	time.Sleep(time.Second * 3)
    
    	description, err := buildClient.GetIndexStates([]UniqueID{indexID})
    	assert.Nil(t, err)
    	assert.Equal(t, commonpb.IndexState_INPROGRESS, description.States[0].State)
    	assert.Equal(t, indexID, description.States[0].IndexID)
    
    	indexDataPaths, err := buildClient.GetIndexFilePaths([]UniqueID{indexID})
    	assert.Nil(t, err)
    	assert.Nil(t, indexDataPaths[0])
    }