Skip to content
Snippets Groups Projects
Commit 9fe672ab authored by XuanYang-cn's avatar XuanYang-cn Committed by yefu.chen
Browse files

Enchance datanode interface


Signed-off-by: default avatarXuanYang-cn <xuan.yang@zilliz.com>
parent 5aec8bc5
No related branches found
No related tags found
No related merge requests found
Showing
with 397 additions and 261 deletions
...@@ -73,3 +73,7 @@ indexNode: ...@@ -73,3 +73,7 @@ indexNode:
indexServer: indexServer:
address: localhost address: localhost
port: 21118 port: 21118
dataNode:
address: localhost
port: 21124
package datanode
import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
)
type (
allocator interface {
allocID() (UniqueID, error)
}
allocatorImpl struct {
masterService MasterServiceInterface
}
)
func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
return &allocatorImpl{
masterService: s,
}
}
func (alloc *allocatorImpl) allocID() (UniqueID, error) {
resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: 1, // GOOSE TODO add msg id
Timestamp: 0, // GOOSE TODO
SourceID: Params.NodeID,
},
Count: 1,
})
if err != nil {
return 0, err
}
return resp.ID, nil
}
...@@ -4,24 +4,23 @@ import ( ...@@ -4,24 +4,23 @@ import (
"testing" "testing"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func newReplica() collectionReplica { func newReplica() collectionReplica {
collections := make([]*Collection, 0) collections := make([]*Collection, 0)
segments := make([]*Segment, 0)
var replica collectionReplica = &collectionReplicaImpl{ var replica collectionReplica = &collectionReplicaImpl{
collections: collections, collections: collections,
segments: segments,
} }
return replica return replica
} }
func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) { func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) {
Factory := &factory.MetaFactory{} Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema) schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
......
...@@ -5,14 +5,12 @@ import ( ...@@ -5,14 +5,12 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
) )
func TestCollection_newCollection(t *testing.T) { func TestCollection_newCollection(t *testing.T) {
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(1) collectionID := UniqueID(1)
Factory := &factory.MetaFactory{} Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema) schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
...@@ -26,7 +24,7 @@ func TestCollection_newCollection(t *testing.T) { ...@@ -26,7 +24,7 @@ func TestCollection_newCollection(t *testing.T) {
func TestCollection_deleteCollection(t *testing.T) { func TestCollection_deleteCollection(t *testing.T) {
collectionName := "collection0" collectionName := "collection0"
collectionID := UniqueID(1) collectionID := UniqueID(1)
Factory := &factory.MetaFactory{} Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema) schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
......
...@@ -4,51 +4,130 @@ import ( ...@@ -4,51 +4,130 @@ import (
"context" "context"
"io" "io"
"log" "log"
"time"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config" "github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type DataNode struct { const (
ctx context.Context RPCConnectionTimeout = 30 * time.Second
DataNodeID uint64 )
dataSyncService *dataSyncService
metaService *metaService
replica collectionReplica type (
Inteface interface {
typeutil.Service
tracer opentracing.Tracer GetComponentStates() (*internalpb2.ComponentStates, error)
closer io.Closer WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
} FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
}
func NewDataNode(ctx context.Context, dataNodeID uint64) *DataNode { DataServiceInterface interface {
RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
}
collections := make([]*Collection, 0) MasterServiceInterface interface {
AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error)
ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
}
var replica collectionReplica = &collectionReplicaImpl{ DataNode struct {
collections: collections, ctx context.Context
NodeID UniqueID
Role string
State internalpb2.StateCode
dataSyncService *dataSyncService
metaService *metaService
masterService MasterServiceInterface
dataService DataServiceInterface
replica collectionReplica
tracer opentracing.Tracer
closer io.Closer
} }
)
func NewDataNode(ctx context.Context, nodeID UniqueID, masterService MasterServiceInterface,
dataService DataServiceInterface) *DataNode {
Params.Init()
node := &DataNode{ node := &DataNode{
ctx: ctx, ctx: ctx,
DataNodeID: dataNodeID, NodeID: nodeID, // GOOSE TODO
Role: "DataNode", // GOOSE TODO
State: internalpb2.StateCode_INITIALIZING,
dataSyncService: nil, dataSyncService: nil,
// metaService: nil, metaService: nil,
replica: replica, masterService: masterService,
dataService: dataService,
replica: nil,
} }
return node return node
} }
func (node *DataNode) Init() error { func (node *DataNode) Init() error {
Params.Init()
return nil
}
func (node *DataNode) Start() error { req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kNone, //GOOSE TODO
SourceID: node.NodeID,
},
Address: &commonpb.Address{
Ip: Params.IP,
Port: Params.Port,
},
}
resp, err := node.dataService.RegisterNode(req)
if err != nil {
return errors.Errorf("Init failed: %v", err)
}
for _, kv := range resp.InitParams.StartParams {
log.Println(kv)
switch kv.Key {
case "DDChannelName":
Params.DDChannelNames = []string{kv.Value}
case "SegmentStatisticsChannelName":
Params.SegmentStatisticsChannelName = kv.Value
case "TimeTickChannelName":
Params.TimeTickChannelName = kv.Value
case "CompleteFlushChannelName":
Params.CompleteFlushChannelName = kv.Value
default:
return errors.Errorf("Invalid key: %v", kv.Key)
}
}
var replica collectionReplica = &collectionReplicaImpl{
collections: make([]*Collection, 0),
segments: make([]*Segment, 0),
}
var alloc allocator = newAllocatorImpl(node.masterService)
chanSize := 100
flushChan := make(chan *flushMsg, chanSize)
node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc)
node.metaService = newMetaService(node.ctx, replica)
node.replica = replica
// Opentracing
cfg := &config.Configuration{ cfg := &config.Configuration{
ServiceName: "data_node", ServiceName: "data_node",
Sampler: &config.SamplerConfig{ Sampler: &config.SamplerConfig{
...@@ -59,24 +138,22 @@ func (node *DataNode) Start() error { ...@@ -59,24 +138,22 @@ func (node *DataNode) Start() error {
LogSpans: true, LogSpans: true,
}, },
} }
tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
var err error
node.tracer, node.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
if err != nil { if err != nil {
log.Printf("ERROR: cannot init Jaeger: %v\n", err) return errors.Errorf("ERROR: cannot init Jaeger: %v\n", err)
} else {
opentracing.SetGlobalTracer(node.tracer)
} }
node.tracer = tracer
node.closer = closer
// TODO GOOSE Init Size?? opentracing.SetGlobalTracer(node.tracer)
chanSize := 100
flushChan := make(chan *flushMsg, chanSize)
node.dataSyncService = newDataSyncService(node.ctx, flushChan, node.replica) node.State = internalpb2.StateCode_HEALTHY
node.metaService = newMetaService(node.ctx, node.replica) return nil
}
func (node *DataNode) Start() error {
go node.dataSyncService.start() go node.dataSyncService.start()
// go node.flushSyncService.start()
node.metaService.start() node.metaService.start()
return nil return nil
...@@ -88,8 +165,16 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) error { ...@@ -88,8 +165,16 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) error {
} }
func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
// GOOSE TODO: Implement me states := &internalpb2.ComponentStates{
return nil, nil State: &internalpb2.ComponentInfo{
NodeID: Params.NodeID,
Role: node.Role,
StateCode: node.State,
},
SubcomponentStates: make([]*internalpb2.ComponentInfo, 0),
Status: &commonpb.Status{},
}
return states, nil
} }
func (node *DataNode) FlushSegments(in *datapb.FlushSegRequest) error { func (node *DataNode) FlushSegments(in *datapb.FlushSegRequest) error {
......
...@@ -26,6 +26,10 @@ func makeNewChannelNames(names []string, suffix string) []string { ...@@ -26,6 +26,10 @@ func makeNewChannelNames(names []string, suffix string) []string {
} }
func refreshChannelNames() { func refreshChannelNames() {
Params.DDChannelNames = []string{"datanode-test"}
Params.SegmentStatisticsChannelName = "segtment-statistics"
Params.CompleteFlushChannelName = "flush-completed"
Params.TimeTickChannelName = "hard-timetick"
suffix := "-test-data-node" + strconv.FormatInt(rand.Int63n(100), 10) suffix := "-test-data-node" + strconv.FormatInt(rand.Int63n(100), 10)
Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix) Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
...@@ -81,28 +85,6 @@ func TestMain(m *testing.M) { ...@@ -81,28 +85,6 @@ func TestMain(m *testing.M) {
os.Exit(exitCode) os.Exit(exitCode)
} }
func newDataNode() *DataNode {
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
go func() {
<-ctx.Done()
cancel()
}()
} else {
ctx = context.Background()
}
svr := NewDataNode(ctx, 0)
return svr
}
func newMetaTable() *metaTable { func newMetaTable() *metaTable {
etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
......
...@@ -10,20 +10,22 @@ import ( ...@@ -10,20 +10,22 @@ import (
) )
type dataSyncService struct { type dataSyncService struct {
ctx context.Context ctx context.Context
fg *flowgraph.TimeTickedFlowGraph fg *flowgraph.TimeTickedFlowGraph
flushChan chan *flushMsg flushChan chan *flushMsg
replica collectionReplica replica collectionReplica
idAllocator allocator
} }
func newDataSyncService(ctx context.Context, flushChan chan *flushMsg, func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
replica collectionReplica) *dataSyncService { replica collectionReplica, alloc allocator) *dataSyncService {
return &dataSyncService{ return &dataSyncService{
ctx: ctx, ctx: ctx,
fg: nil, fg: nil,
flushChan: flushChan, flushChan: flushChan,
replica: replica, replica: replica,
idAllocator: alloc,
} }
} }
...@@ -59,8 +61,8 @@ func (dsService *dataSyncService) initNodes() { ...@@ -59,8 +61,8 @@ func (dsService *dataSyncService) initNodes() {
var filterDmNode Node = newFilteredDmNode() var filterDmNode Node = newFilteredDmNode()
var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica) var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica) var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator)
var gcNode Node = newGCNode(dsService.replica) var gcNode Node = newGCNode(dsService.replica)
dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&dmStreamNode)
......
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
...@@ -35,10 +34,16 @@ func TestDataSyncService_Start(t *testing.T) { ...@@ -35,10 +34,16 @@ func TestDataSyncService_Start(t *testing.T) {
// init data node // init data node
pulsarURL := Params.PulsarAddress pulsarURL := Params.PulsarAddress
Factory := &factory.MetaFactory{} Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
node := NewDataNode(ctx, 0)
node.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema)) chanSize := 100
flushChan := make(chan *flushMsg, chanSize)
replica := newReplica()
allocFactory := AllocatorFactory{}
sync := newDataSyncService(ctx, flushChan, replica, allocFactory)
sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema))
go sync.start()
// test data generate // test data generate
// GOOSE TODO orgnize // GOOSE TODO orgnize
...@@ -204,10 +209,6 @@ func TestDataSyncService_Start(t *testing.T) { ...@@ -204,10 +209,6 @@ func TestDataSyncService_Start(t *testing.T) {
// dataSync // dataSync
Params.FlushInsertBufferSize = 1 Params.FlushInsertBufferSize = 1
node.dataSyncService = newDataSyncService(node.ctx, nil, node.replica)
go node.dataSyncService.start()
node.Stop()
<-ctx.Done() sync.close()
} }
package factory package datanode
import ( import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type ( type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
Factory interface { Factory interface {
} }
MetaFactory struct { MetaFactory struct {
} }
AllocatorFactory struct {
}
) )
func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
...@@ -152,3 +151,8 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa ...@@ -152,3 +151,8 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
} }
return &collection return &collection
} }
func (alloc AllocatorFactory) allocID() (UniqueID, error) {
// GOOSE TODO: random ID generate
return UniqueID(0), nil
}
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"strconv" "strconv"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
...@@ -25,7 +24,7 @@ type ddNode struct { ...@@ -25,7 +24,7 @@ type ddNode struct {
ddBuffer *ddBuffer ddBuffer *ddBuffer
inFlushCh chan *flushMsg inFlushCh chan *flushMsg
idAllocator *allocator.IDAllocator idAllocator allocator
kv kv.Base kv kv.Base
replica collectionReplica replica collectionReplica
flushMeta *metaTable flushMeta *metaTable
...@@ -174,7 +173,7 @@ func (ddNode *ddNode) flush() { ...@@ -174,7 +173,7 @@ func (ddNode *ddNode) flush() {
keyCommon := path.Join(Params.DdBinlogRootPath, strconv.FormatInt(collectionID, 10)) keyCommon := path.Join(Params.DdBinlogRootPath, strconv.FormatInt(collectionID, 10))
// save ts binlog // save ts binlog
timestampLogIdx, err := ddNode.idAllocator.AllocOne() timestampLogIdx, err := ddNode.idAllocator.allocID()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
...@@ -186,7 +185,7 @@ func (ddNode *ddNode) flush() { ...@@ -186,7 +185,7 @@ func (ddNode *ddNode) flush() {
log.Println("save ts binlog, key = ", timestampKey) log.Println("save ts binlog, key = ", timestampKey)
// save dd binlog // save dd binlog
ddLogIdx, err := ddNode.idAllocator.AllocOne() ddLogIdx, err := ddNode.idAllocator.allocID()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
...@@ -370,7 +369,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { ...@@ -370,7 +369,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
} }
func newDDNode(ctx context.Context, flushMeta *metaTable, func newDDNode(ctx context.Context, flushMeta *metaTable,
inFlushCh chan *flushMsg, replica collectionReplica) *ddNode { inFlushCh chan *flushMsg, replica collectionReplica, alloc allocator) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism maxParallelism := Params.FlowGraphMaxParallelism
...@@ -397,15 +396,6 @@ func newDDNode(ctx context.Context, flushMeta *metaTable, ...@@ -397,15 +396,6 @@ func newDDNode(ctx context.Context, flushMeta *metaTable,
panic(err) panic(err)
} }
idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress)
if err != nil {
panic(err)
}
err = idAllocator.Start()
if err != nil {
panic(err)
}
return &ddNode{ return &ddNode{
BaseNode: baseNode, BaseNode: baseNode,
ddRecords: ddRecords, ddRecords: ddRecords,
...@@ -413,10 +403,9 @@ func newDDNode(ctx context.Context, flushMeta *metaTable, ...@@ -413,10 +403,9 @@ func newDDNode(ctx context.Context, flushMeta *metaTable,
ddData: make(map[UniqueID]*ddData), ddData: make(map[UniqueID]*ddData),
maxSize: Params.FlushDdBufferSize, maxSize: Params.FlushDdBufferSize,
}, },
// outCh: outCh,
inFlushCh: inFlushCh, inFlushCh: inFlushCh,
idAllocator: idAllocator, idAllocator: alloc,
kv: minioKV, kv: minioKV,
replica: replica, replica: replica,
flushMeta: flushMeta, flushMeta: flushMeta,
......
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
func TestFlowGraphDDNode_Operate(t *testing.T) { func TestFlowGraphDDNode_Operate(t *testing.T) {
const ctxTimeInMillisecond = 2000 const ctxTimeInMillisecond = 2000
const closeWithDeadline = false const closeWithDeadline = true
var ctx context.Context var ctx context.Context
if closeWithDeadline { if closeWithDeadline {
...@@ -37,7 +37,8 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { ...@@ -37,7 +37,8 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
Params.FlushDdBufferSize = 4 Params.FlushDdBufferSize = 4
replica := newReplica() replica := newReplica()
ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica) idFactory := AllocatorFactory{}
ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, idFactory)
colID := UniqueID(0) colID := UniqueID(0)
colName := "col-test-0" colName := "col-test-0"
......
...@@ -15,7 +15,6 @@ import ( ...@@ -15,7 +15,6 @@ import (
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log" oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
...@@ -45,7 +44,7 @@ type ( ...@@ -45,7 +44,7 @@ type (
minIOKV kv.Base minIOKV kv.Base
minioPrefix string minioPrefix string
idAllocator *allocator.IDAllocator idAllocator allocator
timeTickStream msgstream.MsgStream timeTickStream msgstream.MsgStream
segmentStatisticsStream msgstream.MsgStream segmentStatisticsStream msgstream.MsgStream
...@@ -514,7 +513,7 @@ func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueI ...@@ -514,7 +513,7 @@ func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueI
log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs)) log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs))
for index, blob := range binLogs { for index, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne() uid, err := ibNode.idAllocator.allocID()
if err != nil { if err != nil {
return errors.Errorf("Allocate Id failed, %v", err) return errors.Errorf("Allocate Id failed, %v", err)
} }
...@@ -543,7 +542,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error { ...@@ -543,7 +542,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error {
MsgType: commonpb.MsgType_kSegmentFlushDone, MsgType: commonpb.MsgType_kSegmentFlushDone,
MsgID: 0, // GOOSE TODO MsgID: 0, // GOOSE TODO
Timestamp: 0, // GOOSE TODO Timestamp: 0, // GOOSE TODO
SourceID: Params.DataNodeID, SourceID: Params.NodeID,
}, },
SegmentID: segID, SegmentID: segID,
} }
...@@ -572,7 +571,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { ...@@ -572,7 +571,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
MsgType: commonpb.MsgType_kTimeTick, MsgType: commonpb.MsgType_kTimeTick,
MsgID: 0, // GOOSE TODO MsgID: 0, // GOOSE TODO
Timestamp: ts, // GOOSE TODO Timestamp: ts, // GOOSE TODO
SourceID: Params.DataNodeID, SourceID: Params.NodeID,
}, },
}, },
} }
...@@ -597,7 +596,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error { ...@@ -597,7 +596,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
MsgType: commonpb.MsgType_kSegmentStatistics, MsgType: commonpb.MsgType_kSegmentStatistics,
MsgID: UniqueID(0), // GOOSE TODO MsgID: UniqueID(0), // GOOSE TODO
Timestamp: Timestamp(0), // GOOSE TODO Timestamp: Timestamp(0), // GOOSE TODO
SourceID: Params.DataNodeID, SourceID: Params.NodeID,
}, },
SegStats: statsUpdates, SegStats: statsUpdates,
} }
...@@ -623,8 +622,8 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) ( ...@@ -623,8 +622,8 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (
return ret.schema, nil return ret.schema, nil
} }
func newInsertBufferNode(ctx context.Context, func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
flushMeta *metaTable, replica collectionReplica) *insertBufferNode { replica collectionReplica, alloc allocator) *insertBufferNode {
maxQueueLength := Params.FlowGraphMaxQueueLength maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism maxParallelism := Params.FlowGraphMaxParallelism
...@@ -654,42 +653,33 @@ func newInsertBufferNode(ctx context.Context, ...@@ -654,42 +653,33 @@ func newInsertBufferNode(ctx context.Context,
} }
minioPrefix := Params.InsertBinlogRootPath minioPrefix := Params.InsertBinlogRootPath
idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress)
if err != nil {
panic(err)
}
err = idAllocator.Start()
if err != nil {
panic(err)
}
//input stream, data node time tick //input stream, data node time tick
wTt := pulsarms.NewPulsarMsgStream(ctx, 1024) wTt := pulsarms.NewPulsarMsgStream(ctx, 1024)
wTt.SetPulsarClient(Params.PulsarAddress) wTt.SetPulsarClient(Params.PulsarAddress)
wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName}) wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName})
var wTtMsgStream msgstream.MsgStream = wTt var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start() // GOOSE TODO remove wTtMsgStream.Start()
// update statistics channel // update statistics channel
segS := pulsarms.NewPulsarMsgStream(ctx, Params.SegmentStatisticsBufSize) segS := pulsarms.NewPulsarMsgStream(ctx, 1024)
segS.SetPulsarClient(Params.PulsarAddress) segS.SetPulsarClient(Params.PulsarAddress)
segS.CreatePulsarProducers([]string{Params.SegmentStatisticsChannelName}) segS.CreatePulsarProducers([]string{Params.SegmentStatisticsChannelName})
var segStatisticsMsgStream msgstream.MsgStream = segS var segStatisticsMsgStream msgstream.MsgStream = segS
segStatisticsMsgStream.Start() // GOOSE TODO remove segStatisticsMsgStream.Start()
// segment flush completed channel // segment flush completed channel
cf := pulsarms.NewPulsarMsgStream(ctx, 1024) cf := pulsarms.NewPulsarMsgStream(ctx, 1024)
cf.SetPulsarClient(Params.PulsarAddress) cf.SetPulsarClient(Params.PulsarAddress)
cf.CreatePulsarProducers([]string{Params.CompleteFlushChannelName}) cf.CreatePulsarProducers([]string{Params.CompleteFlushChannelName})
var completeFlushStream msgstream.MsgStream = cf var completeFlushStream msgstream.MsgStream = cf
completeFlushStream.Start() // GOOSE TODO remove completeFlushStream.Start()
return &insertBufferNode{ return &insertBufferNode{
BaseNode: baseNode, BaseNode: baseNode,
insertBuffer: iBuffer, insertBuffer: iBuffer,
minIOKV: minIOKV, minIOKV: minIOKV,
minioPrefix: minioPrefix, minioPrefix: minioPrefix,
idAllocator: idAllocator, idAllocator: alloc,
timeTickStream: wTtMsgStream, timeTickStream: wTtMsgStream,
segmentStatisticsStream: segStatisticsMsgStream, segmentStatisticsStream: segStatisticsMsgStream,
completeFlushStream: completeFlushStream, completeFlushStream: completeFlushStream,
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
...@@ -38,7 +37,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { ...@@ -38,7 +37,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
Params.MetaRootPath = testPath Params.MetaRootPath = testPath
Factory := &factory.MetaFactory{} Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
schemaBlob := proto.MarshalTextString(collMeta.Schema) schemaBlob := proto.MarshalTextString(collMeta.Schema)
require.NotEqual(t, "", schemaBlob) require.NotEqual(t, "", schemaBlob)
...@@ -48,7 +47,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { ...@@ -48,7 +47,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Params.FlushInsertBufSize = 2 // Params.FlushInsertBufSize = 2
iBNode := newInsertBufferNode(ctx, newMetaTable(), replica) idFactory := AllocatorFactory{}
iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory)
inMsg := genInsertMsg() inMsg := genInsertMsg()
var iMsg flowgraph.Msg = &inMsg var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]*flowgraph.Msg{&iMsg}) iBNode.Operate([]*flowgraph.Msg{&iMsg})
......
...@@ -35,18 +35,14 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { ...@@ -35,18 +35,14 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
} }
func newDDInputNode(ctx context.Context) *flowgraph.InputNode { func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.DDReceiveBufSize
pulsarBufSize := Params.DDPulsarBufSize
msgStreamURL := Params.PulsarAddress
consumeChannels := Params.DDChannelNames consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName consumeSubName := Params.MsgChannelSubName
ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize) ddStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
ddStream.SetPulsarClient(msgStreamURL) ddStream.SetPulsarClient(Params.PulsarAddress)
unmarshalDispatcher := util.NewUnmarshalDispatcher() unmarshalDispatcher := util.NewUnmarshalDispatcher()
ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024)
var stream msgstream.MsgStream = ddStream var stream msgstream.MsgStream = ddStream
......
package datanode
import (
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
)
type Interface interface {
Init() error
Start() error
Stop() error
WatchDmChannels(in *datapb.WatchDmChannelRequest) error
FlushSegments(req *datapb.FlushSegRequest) error
}
...@@ -16,7 +16,9 @@ type ParamTable struct { ...@@ -16,7 +16,9 @@ type ParamTable struct {
paramtable.BaseTable paramtable.BaseTable
// === DataNode Internal Components Configs === // === DataNode Internal Components Configs ===
DataNodeID UniqueID NodeID UniqueID
IP string // GOOSE TODO load from config file
Port int64
FlowGraphMaxQueueLength int32 FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32 FlowGraphMaxParallelism int32
FlushInsertBufferSize int32 FlushInsertBufferSize int32
...@@ -25,8 +27,9 @@ type ParamTable struct { ...@@ -25,8 +27,9 @@ type ParamTable struct {
DdBinlogRootPath string DdBinlogRootPath string
// === DataNode External Components Configs === // === DataNode External Components Configs ===
// --- Master --- // --- External Client Address ---
MasterAddress string MasterAddress string
ServiceAddress string // GOOSE TODO: init from config file
// --- Pulsar --- // --- Pulsar ---
PulsarAddress string PulsarAddress string
...@@ -38,20 +41,20 @@ type ParamTable struct { ...@@ -38,20 +41,20 @@ type ParamTable struct {
InsertPulsarBufSize int64 InsertPulsarBufSize int64
// - dd channel - // - dd channel -
DDChannelNames []string DDChannelNames []string // GOOSE TODO, set after Init
DDReceiveBufSize int64 // DDReceiveBufSize int64
DDPulsarBufSize int64 // DDPulsarBufSize int64
// - seg statistics channel - // - seg statistics channel -
SegmentStatisticsChannelName string SegmentStatisticsChannelName string // GOOSE TODO, set after init
SegmentStatisticsBufSize int64 // SegmentStatisticsBufSize int64
SegmentStatisticsUpdateInterval int // GOOSE TODO remove // SegmentStatisticsUpdateInterval int // GOOSE TODO remove
// - timetick channel - // - timetick channel -
TimeTickChannelName string TimeTickChannelName string // GOOSE TODO: set after init
// - complete flush channel - // - complete flush channel -
CompleteFlushChannelName string CompleteFlushChannelName string // GOOSE TODO: set after init
// - channel subname - // - channel subname -
MsgChannelSubName string MsgChannelSubName string
...@@ -82,7 +85,9 @@ func (p *ParamTable) Init() { ...@@ -82,7 +85,9 @@ func (p *ParamTable) Init() {
} }
// === DataNode Internal Components Configs === // === DataNode Internal Components Configs ===
p.initDataNodeID() p.initNodeID()
p.initIP()
p.initPort()
p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism() p.initFlowGraphMaxParallelism()
p.initFlushInsertBufferSize() p.initFlushInsertBufferSize()
...@@ -104,20 +109,20 @@ func (p *ParamTable) Init() { ...@@ -104,20 +109,20 @@ func (p *ParamTable) Init() {
p.initInsertPulsarBufSize() p.initInsertPulsarBufSize()
// - dd channel - // - dd channel -
p.initDDChannelNames() // p.initDDChannelNames()
p.initDDReceiveBufSize() // p.initDDReceiveBufSize()
p.initDDPulsarBufSize() // p.initDDPulsarBufSize()
// - seg statistics channel - // - seg statistics channel -
p.initSegmentStatisticsChannelName() // p.initSegmentStatisticsChannelName()
p.initSegmentStatisticsBufSize() // p.initSegmentStatisticsBufSize()
p.initSegmentStatisticsUpdateInterval() // p.initSegmentStatisticsUpdateInterval()
// - timetick channel - // - timetick channel -
p.initTimeTickChannelName() // p.initTimeTickChannelName()
// - flush completed channel - // - flush completed channel -
p.initCompleteFlushChannelName() // p.initCompleteFlushChannelName()
// - channel subname - // - channel subname -
p.initMsgChannelSubName() p.initMsgChannelSubName()
...@@ -141,7 +146,7 @@ func (p *ParamTable) Init() { ...@@ -141,7 +146,7 @@ func (p *ParamTable) Init() {
} }
// ==== DataNode internal components configs ==== // ==== DataNode internal components configs ====
func (p *ParamTable) initDataNodeID() { func (p *ParamTable) initNodeID() {
p.dataNodeIDList = p.DataNodeIDList() p.dataNodeIDList = p.DataNodeIDList()
dataNodeIDStr := os.Getenv("DATA_NODE_ID") dataNodeIDStr := os.Getenv("DATA_NODE_ID")
if dataNodeIDStr == "" { if dataNodeIDStr == "" {
...@@ -156,7 +161,20 @@ func (p *ParamTable) initDataNodeID() { ...@@ -156,7 +161,20 @@ func (p *ParamTable) initDataNodeID() {
panic(err) panic(err)
} }
p.DataNodeID = p.ParseInt64("_dataNodeID") p.NodeID = p.ParseInt64("_dataNodeID")
}
func (p *ParamTable) initIP() {
addr, err := p.Load("dataNode.address")
if err != nil {
panic(err)
}
p.IP = addr
}
func (p *ParamTable) initPort() {
port := p.ParseInt64("dataNode.port")
p.Port = port
} }
// ---- flowgraph configs ---- // ---- flowgraph configs ----
...@@ -257,7 +275,7 @@ func (p *ParamTable) initInsertPulsarBufSize() { ...@@ -257,7 +275,7 @@ func (p *ParamTable) initInsertPulsarBufSize() {
p.InsertPulsarBufSize = p.ParseInt64("dataNode.msgStream.insert.pulsarBufSize") p.InsertPulsarBufSize = p.ParseInt64("dataNode.msgStream.insert.pulsarBufSize")
} }
// - dd channel - // - dd channel - GOOSE TODO: remove
func (p *ParamTable) initDDChannelNames() { func (p *ParamTable) initDDChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
if err != nil { if err != nil {
...@@ -276,31 +294,31 @@ func (p *ParamTable) initDDChannelNames() { ...@@ -276,31 +294,31 @@ func (p *ParamTable) initDDChannelNames() {
p.DDChannelNames = ret p.DDChannelNames = ret
} }
func (p *ParamTable) initDDReceiveBufSize() { // func (p *ParamTable) initDDReceiveBufSize() {
revBufSize, err := p.Load("dataNode.msgStream.dataDefinition.recvBufSize") // revBufSize, err := p.Load("dataNode.msgStream.dataDefinition.recvBufSize")
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
bufSize, err := strconv.Atoi(revBufSize) // bufSize, err := strconv.Atoi(revBufSize)
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
p.DDReceiveBufSize = int64(bufSize) // p.DDReceiveBufSize = int64(bufSize)
} // }
func (p *ParamTable) initDDPulsarBufSize() { // func (p *ParamTable) initDDPulsarBufSize() {
pulsarBufSize, err := p.Load("dataNode.msgStream.dataDefinition.pulsarBufSize") // pulsarBufSize, err := p.Load("dataNode.msgStream.dataDefinition.pulsarBufSize")
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
bufSize, err := strconv.Atoi(pulsarBufSize) // bufSize, err := strconv.Atoi(pulsarBufSize)
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
p.DDPulsarBufSize = int64(bufSize) // p.DDPulsarBufSize = int64(bufSize)
} // }
// - seg statistics channel - // - seg statistics channel - GOOSE TODO: remove
func (p *ParamTable) initSegmentStatisticsChannelName() { func (p *ParamTable) initSegmentStatisticsChannelName() {
channelName, err := p.Load("msgChannel.chanNamePrefix.dataNodeSegStatistics") channelName, err := p.Load("msgChannel.chanNamePrefix.dataNodeSegStatistics")
...@@ -311,28 +329,26 @@ func (p *ParamTable) initSegmentStatisticsChannelName() { ...@@ -311,28 +329,26 @@ func (p *ParamTable) initSegmentStatisticsChannelName() {
p.SegmentStatisticsChannelName = channelName p.SegmentStatisticsChannelName = channelName
} }
func (p *ParamTable) initSegmentStatisticsBufSize() { // func (p *ParamTable) initSegmentStatisticsBufSize() {
p.SegmentStatisticsBufSize = p.ParseInt64("dataNode.msgStream.segStatistics.recvBufSize") // p.SegmentStatisticsBufSize = p.ParseInt64("dataNode.msgStream.segStatistics.recvBufSize")
} // }
//
func (p *ParamTable) initSegmentStatisticsUpdateInterval() { // func (p *ParamTable) initSegmentStatisticsUpdateInterval() {
p.SegmentStatisticsUpdateInterval = p.ParseInt("dataNode.msgStream.segStatistics.updateInterval") // p.SegmentStatisticsUpdateInterval = p.ParseInt("dataNode.msgStream.segStatistics.updateInterval")
} // }
// - flush completed channel -
// - flush completed channel - GOOSE TODO: remove
func (p *ParamTable) initCompleteFlushChannelName() { func (p *ParamTable) initCompleteFlushChannelName() {
// GOOSE TODO
p.CompleteFlushChannelName = "flush-completed" p.CompleteFlushChannelName = "flush-completed"
} }
// - Timetick channel - // - Timetick channel - GOOSE TODO: remove
func (p *ParamTable) initTimeTickChannelName() { func (p *ParamTable) initTimeTickChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.dataNodeTimeTick") channels, err := p.Load("msgChannel.chanNamePrefix.dataNodeTimeTick")
if err != nil { if err != nil {
panic(err) panic(err)
} }
p.TimeTickChannelName = channels + "-" + strconv.FormatInt(p.DataNodeID, 10) p.TimeTickChannelName = channels + "-" + strconv.FormatInt(p.NodeID, 10)
} }
// - msg channel subname - // - msg channel subname -
...@@ -341,7 +357,7 @@ func (p *ParamTable) initMsgChannelSubName() { ...@@ -341,7 +357,7 @@ func (p *ParamTable) initMsgChannelSubName() {
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.DataNodeID, 10) p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.NodeID, 10)
} }
func (p *ParamTable) initDefaultPartitionName() { func (p *ParamTable) initDefaultPartitionName() {
...@@ -431,8 +447,8 @@ func (p *ParamTable) initMinioBucketName() { ...@@ -431,8 +447,8 @@ func (p *ParamTable) initMinioBucketName() {
} }
func (p *ParamTable) sliceIndex() int { func (p *ParamTable) sliceIndex() int {
dataNodeID := p.DataNodeID dataNodeID := p.NodeID
dataNodeIDList := p.DataNodeIDList() dataNodeIDList := p.dataNodeIDList
for i := 0; i < len(dataNodeIDList); i++ { for i := 0; i < len(dataNodeIDList); i++ {
if dataNodeID == dataNodeIDList[i] { if dataNodeID == dataNodeIDList[i] {
return i return i
......
...@@ -9,9 +9,9 @@ func TestParamTable_DataNode(t *testing.T) { ...@@ -9,9 +9,9 @@ func TestParamTable_DataNode(t *testing.T) {
Params.Init() Params.Init()
t.Run("Test DataNodeID", func(t *testing.T) { t.Run("Test NodeID", func(t *testing.T) {
id := Params.DataNodeID id := Params.NodeID
log.Println("DataNodeID:", id) log.Println("NodeID:", id)
}) })
t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) { t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) {
...@@ -79,31 +79,11 @@ func TestParamTable_DataNode(t *testing.T) { ...@@ -79,31 +79,11 @@ func TestParamTable_DataNode(t *testing.T) {
log.Println("DDChannelNames:", names) log.Println("DDChannelNames:", names)
}) })
t.Run("Test DdMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.DDReceiveBufSize
log.Println("DDReceiveBufSize:", bufSize)
})
t.Run("Test DdPulsarBufSize", func(t *testing.T) {
bufSize := Params.DDPulsarBufSize
log.Println("DDPulsarBufSize:", bufSize)
})
t.Run("Test SegmentStatisticsChannelName", func(t *testing.T) { t.Run("Test SegmentStatisticsChannelName", func(t *testing.T) {
name := Params.SegmentStatisticsChannelName name := Params.SegmentStatisticsChannelName
log.Println("SegmentStatisticsChannelName:", name) log.Println("SegmentStatisticsChannelName:", name)
}) })
t.Run("Test SegmentStatisticsBufSize", func(t *testing.T) {
size := Params.SegmentStatisticsBufSize
log.Println("SegmentStatisticsBufSize:", size)
})
t.Run("Test SegmentStatisticsUpdateInterval", func(t *testing.T) {
interval := Params.SegmentStatisticsUpdateInterval
log.Println("SegmentStatisticsUpdateInterval:", interval)
})
t.Run("Test timeTickChannelName", func(t *testing.T) { t.Run("Test timeTickChannelName", func(t *testing.T) {
name := Params.TimeTickChannelName name := Params.TimeTickChannelName
log.Println("TimeTickChannelName:", name) log.Println("TimeTickChannelName:", name)
......
...@@ -84,7 +84,7 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) { ...@@ -84,7 +84,7 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) {
defer c.mu.Unlock() defer c.mu.Unlock()
sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum }) sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum })
for i, group := range groups { for i, group := range groups {
err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{ _, err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection, MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: -1, // todo MsgID: -1, // todo
...@@ -119,7 +119,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) { ...@@ -119,7 +119,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
for _, node := range c.nodes { for _, node := range c.nodes {
if err := node.client.FlushSegments(request); err != nil { if _, err := node.client.FlushSegments(request); err != nil {
log.Println(err.Error()) log.Println(err.Error())
continue continue
} }
......
...@@ -6,34 +6,55 @@ import ( ...@@ -6,34 +6,55 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"google.golang.org/grpc"
) )
type Client struct { type Client struct {
ctx context.Context ctx context.Context
grpc datapb.DataNodeClient
conn *grpc.ClientConn
address string
}
// GOOSE TODO: add DataNodeClient func NewClient(address string) *Client {
return &Client{
address: address,
}
} }
func (c *Client) Init() error { func (c *Client) Init() error {
panic("implement me") ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout)
defer cancel()
var err error
for i := 0; i < Retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock()); err == nil {
break
}
}
if err != nil {
return err
}
c.grpc = datapb.NewDataNodeClient(c.conn)
return nil
} }
func (c *Client) Start() error { func (c *Client) Start() error {
panic("implement me") return nil
} }
func (c *Client) Stop() error { func (c *Client) Stop() error {
panic("implement me") return c.conn.Close()
} }
func (c *Client) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { func (c *Client) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
panic("implement me") return c.grpc.GetComponentStates(context.Background(), empty)
} }
func (c *Client) WatchDmChannels(in *datapb.WatchDmChannelRequest) error { func (c *Client) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
panic("implement me") return c.grpc.WatchDmChannels(context.Background(), in)
} }
func (c *Client) FlushSegments(in *datapb.FlushSegRequest) error { func (c *Client) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) {
panic("implement me") return c.grpc.FlushSegments(context.Background(), in)
} }
...@@ -2,9 +2,13 @@ package datanode ...@@ -2,9 +2,13 @@ package datanode
import ( import (
"context" "context"
"net"
"strconv"
"sync" "sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/datanode" dn "github.com/zilliztech/milvus-distributed/internal/datanode"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
...@@ -12,9 +16,13 @@ import ( ...@@ -12,9 +16,13 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
const (
RPCConnectionTimeout = 30 * time.Second
Retry = 3
)
type Server struct { type Server struct {
node datanode.Interface core *dn.DataNode
core datanode.DataNode
grpcServer *grpc.Server grpcServer *grpc.Server
grpcError error grpcError error
...@@ -24,11 +32,43 @@ type Server struct { ...@@ -24,11 +32,43 @@ type Server struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
func NewGrpcServer() (*Server, error) { func New(masterService dn.MasterServiceInterface, dataService dn.DataServiceInterface) (*Server, error) {
panic("implement me") var s = &Server{}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.core = dn.NewDataNode(s.ctx, 0, masterService, dataService)
s.grpcServer = grpc.NewServer()
datapb.RegisterDataNodeServer(s.grpcServer, s)
addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10)
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
go func() {
if err = s.grpcServer.Serve(lis); err != nil {
s.grpcErrMux.Lock()
defer s.grpcErrMux.Unlock()
s.grpcError = err
}
}()
s.grpcErrMux.Lock()
err = s.grpcError
s.grpcErrMux.Unlock()
if err != nil {
return nil, err
}
return s, nil
} }
func (s *Server) Init() error { func (s *Server) Init() error {
err := s.core.Init()
if err != nil {
return errors.Errorf("Init failed: %v", err)
}
return s.core.Init() return s.core.Init()
} }
...@@ -41,13 +81,17 @@ func (s *Server) Stop() error { ...@@ -41,13 +81,17 @@ func (s *Server) Stop() error {
} }
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return nil, nil return s.core.GetComponentStates()
} }
func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) error { func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
return s.core.WatchDmChannels(in) return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, s.core.WatchDmChannels(in)
} }
func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) error { func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {
return s.core.FlushSegments(in) return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, s.core.FlushSegments(in)
} }
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment