Skip to content
Snippets Groups Projects
Select Git revision
  • b98b226d8c45d28809df8aae12debcfc12c87f51
  • master default protected
  • benchmark protected
  • v2.0.0-rc4
  • v2.0.0-rc2
  • v2.0.0-rc1
  • v1.1.1
  • v1.1.0
  • v1.0.0
  • v0.10.6
  • v0.10.5
  • v0.10.4
  • v0.10.3
  • v0.10.2
  • v0.10.1
  • v0.8.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
23 results

proxy.go

Blame
  • proxy.go 5.75 KiB
    package proxynode
    
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"math/rand"
    	"net"
    	"strconv"
    	"sync"
    	"time"
    
    	"github.com/opentracing/opentracing-go"
    	"github.com/uber/jaeger-client-go"
    	"github.com/uber/jaeger-client-go/config"
    
    	"google.golang.org/grpc"
    
    	"github.com/zilliztech/milvus-distributed/internal/allocator"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream"
    	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
    	"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
    	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
    )
    
    type UniqueID = typeutil.UniqueID
    type Timestamp = typeutil.Timestamp
    
    type Proxy struct {
    	proxyLoopCtx    context.Context
    	proxyLoopCancel func()
    	proxyLoopWg     sync.WaitGroup
    
    	grpcServer   *grpc.Server
    	masterConn   *grpc.ClientConn
    	masterClient masterpb.MasterClient
    	sched        *TaskScheduler
    	tick         *timeTick
    
    	idAllocator  *allocator.IDAllocator
    	tsoAllocator *allocator.TimestampAllocator
    	segAssigner  *allocator.SegIDAssigner
    
    	manipulationMsgStream *msgstream.PulsarMsgStream
    	queryMsgStream        *msgstream.PulsarMsgStream
    
    	tracer opentracing.Tracer
    	closer io.Closer
    
    	// Add callback functions at different stages
    	startCallbacks []func()
    	closeCallbacks []func()
    }
    
    func Init() {
    	Params.Init()
    }
    
    func CreateProxy(ctx context.Context) (*Proxy, error) {
    	rand.Seed(time.Now().UnixNano())
    	ctx1, cancel := context.WithCancel(ctx)
    	var err error
    	p := &Proxy{
    		proxyLoopCtx:    ctx1,
    		proxyLoopCancel: cancel,
    	}
    
    	cfg := &config.Configuration{
    		ServiceName: "proxynode",
    		Sampler: &config.SamplerConfig{
    			Type:  "const",
    			Param: 1,
    		},
    		Reporter: &config.ReporterConfig{
    			LogSpans: true,
    		},
    	}
    	p.tracer, p.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
    	if err != nil {
    		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    	}
    	opentracing.SetGlobalTracer(p.tracer)
    
    	pulsarAddress := Params.PulsarAddress()
    
    	p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize())
    	p.queryMsgStream.SetPulsarClient(pulsarAddress)
    	p.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames())
    
    	masterAddr := Params.MasterAddress()
    	idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr)
    
    	if err != nil {
    		return nil, err
    	}
    	p.idAllocator = idAllocator
    	p.idAllocator.PeerID = Params.ProxyID()
    
    	tsoAllocator, err := allocator.NewTimestampAllocator(p.proxyLoopCtx, masterAddr)
    	if err != nil {
    		return nil, err
    	}
    	p.tsoAllocator = tsoAllocator
    	p.tsoAllocator.PeerID = Params.ProxyID()
    
    	segAssigner, err := allocator.NewSegIDAssigner(p.proxyLoopCtx, masterAddr, p.lastTick)
    	if err != nil {
    		panic(err)
    	}
    	p.segAssigner = segAssigner
    	p.segAssigner.PeerID = Params.ProxyID()
    
    	p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize())
    	p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
    	p.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames())
    	repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
    		return insertRepackFunc(tsMsgs, hashKeys, p.segAssigner, false)
    	}
    	p.manipulationMsgStream.SetRepackFunc(repackFuncImpl)
    
    	p.sched, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator)
    	if err != nil {
    		return nil, err
    	}
    
    	p.tick = newTimeTick(p.proxyLoopCtx, p.tsoAllocator, time.Millisecond*200, p.sched.TaskDoneTest)
    
    	return p, nil
    }
    
    // AddStartCallback adds a callback in the startServer phase.
    func (p *Proxy) AddStartCallback(callbacks ...func()) {
    	p.startCallbacks = append(p.startCallbacks, callbacks...)
    }
    
    func (p *Proxy) lastTick() Timestamp {
    	return p.tick.LastTick()
    }
    
    func (p *Proxy) startProxy() error {
    	err := p.connectMaster()
    	if err != nil {
    		return err
    	}
    	initGlobalMetaCache(p.proxyLoopCtx, p)
    	p.manipulationMsgStream.Start()
    	p.queryMsgStream.Start()
    	p.sched.Start()
    	p.idAllocator.Start()
    	p.tsoAllocator.Start()
    	p.segAssigner.Start()
    	p.tick.Start()
    
    	// Start callbacks
    	for _, cb := range p.startCallbacks {
    		cb()
    	}
    
    	p.proxyLoopWg.Add(1)
    	go p.grpcLoop()
    
    	return nil
    }
    
    // AddCloseCallback adds a callback in the Close phase.
    func (p *Proxy) AddCloseCallback(callbacks ...func()) {
    	p.closeCallbacks = append(p.closeCallbacks, callbacks...)
    }
    
    func (p *Proxy) grpcLoop() {
    	defer p.proxyLoopWg.Done()
    
    	lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.NetworkPort()))
    	if err != nil {
    		log.Fatalf("Proxy grpc server fatal error=%v", err)
    	}
    
    	p.grpcServer = grpc.NewServer()
    	servicepb.RegisterMilvusServiceServer(p.grpcServer, p)
    	if err = p.grpcServer.Serve(lis); err != nil {
    		log.Fatalf("Proxy grpc server fatal error=%v", err)
    	}
    }
    
    func (p *Proxy) connectMaster() error {
    	masterAddr := Params.MasterAddress()
    	log.Printf("Proxy connected to master, master_addr=%s", masterAddr)
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    	conn, err := grpc.DialContext(ctx, masterAddr, grpc.WithInsecure(), grpc.WithBlock())
    	if err != nil {
    		log.Printf("Proxy connect to master failed, error= %v", err)
    		return err
    	}
    	log.Printf("Proxy connected to master, master_addr=%s", masterAddr)
    	p.masterConn = conn
    	p.masterClient = masterpb.NewMasterClient(conn)
    	return nil
    }
    
    func (p *Proxy) Start() error {
    	return p.startProxy()
    }
    
    func (p *Proxy) stopProxyLoop() {
    	p.proxyLoopCancel()
    
    	if p.grpcServer != nil {
    		p.grpcServer.GracefulStop()
    	}
    	p.tsoAllocator.Close()
    
    	p.idAllocator.Close()
    
    	p.segAssigner.Close()
    
    	p.sched.Close()
    
    	p.manipulationMsgStream.Close()
    
    	p.queryMsgStream.Close()
    
    	p.tick.Close()
    
    	p.proxyLoopWg.Wait()
    
    }
    
    // Close closes the server.
    func (p *Proxy) Close() {
    	p.stopProxyLoop()
    
    	if p.closer != nil {
    		p.closer.Close()
    	}
    
    	for _, cb := range p.closeCallbacks {
    		cb()
    	}
    	log.Print("proxynode closed.")
    }