Skip to content
Snippets Groups Projects
Select Git revision
  • 51f669f1cb7c8c5d51dda343084afd7740ed4d32
  • master default protected
  • benchmark protected
  • v2.0.0-rc4
  • v2.0.0-rc2
  • v2.0.0-rc1
  • v1.1.1
  • v1.1.0
  • v1.0.0
  • v0.10.6
  • v0.10.5
  • v0.10.4
  • v0.10.3
  • v0.10.2
  • v0.10.1
  • v0.8.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
23 results

data_node.go

Blame
  • user avatar
    XuanYang-cn authored and yefu.chen committed
    Signed-off-by: default avatarXuanYang-cn <xuan.yang@zilliz.com>
    51f669f1
    History
    data_node.go 6.16 KiB
    package datanode
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"io"
    	"math/rand"
    	"sync/atomic"
    	"time"
    
    	"go.uber.org/zap"
    
    	"github.com/zilliztech/milvus-distributed/internal/log"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream"
    	"github.com/zilliztech/milvus-distributed/internal/types"
    	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
    
    	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
    	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
    	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
    	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
    )
    
    const (
    	RPCConnectionTimeout = 30 * time.Second
    )
    
    type DataNode struct {
    	ctx     context.Context
    	cancel  context.CancelFunc
    	NodeID  UniqueID
    	Role    string
    	State   atomic.Value // internalpb.StateCode_Initializing
    	watchDm chan struct{}
    
    	dataSyncService *dataSyncService
    	metaService     *metaService
    
    	masterService types.MasterService
    	dataService   types.DataService
    
    	flushChan chan *flushMsg
    	replica   Replica
    
    	closer io.Closer
    
    	msFactory msgstream.Factory
    }
    
    func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
    	rand.Seed(time.Now().UnixNano())
    	ctx2, cancel2 := context.WithCancel(ctx)
    	node := &DataNode{
    		ctx:     ctx2,
    		cancel:  cancel2,
    		Role:    typeutil.DataNodeRole,
    		watchDm: make(chan struct{}),
    
    		dataSyncService: nil,
    		metaService:     nil,
    		masterService:   nil,
    		dataService:     nil,
    		replica:         nil,
    		msFactory:       factory,
    	}
    	node.UpdateStateCode(internalpb.StateCode_Abnormal)
    	return node
    }