Select Git revision
data_node.go
data_node.go 6.16 KiB
package datanode
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
)
const (
RPCConnectionTimeout = 30 * time.Second
)
type DataNode struct {
ctx context.Context
cancel context.CancelFunc
NodeID UniqueID
Role string
State atomic.Value // internalpb.StateCode_Initializing
watchDm chan struct{}
dataSyncService *dataSyncService
metaService *metaService
masterService types.MasterService
dataService types.DataService
flushChan chan *flushMsg
replica Replica
closer io.Closer
msFactory msgstream.Factory
}
func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
rand.Seed(time.Now().UnixNano())
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
ctx: ctx2,
cancel: cancel2,
Role: typeutil.DataNodeRole,
watchDm: make(chan struct{}),
dataSyncService: nil,
metaService: nil,
masterService: nil,
dataService: nil,
replica: nil,
msFactory: factory,
}
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
}