diff --git a/cmd/datanode/main.go b/cmd/datanode/main.go index 42e8de857ca864f43ce095ca5cf3bf1e10c9db25..14f5468bf576361145d7e3ea543729f26add1c04 100644 --- a/cmd/datanode/main.go +++ b/cmd/datanode/main.go @@ -2,12 +2,16 @@ package main import ( "context" - "log" "os" "os/signal" "syscall" + "go.uber.org/zap" + + dn "github.com/zilliztech/milvus-distributed/internal/datanode" + distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) @@ -17,6 +21,8 @@ func main() { defer cancel() msFactory := pulsarms.NewFactory() + dn.Params.Init() + log.SetupLogger(&dn.Params.Log) dn, err := distributed.NewDataNode(ctx, msFactory) if err != nil { @@ -34,7 +40,7 @@ func main() { syscall.SIGQUIT) sig := <-sc - log.Println("Got signal to exit signal:", sig.String()) + log.Debug("Got signal to exit signal", zap.String("signal", sig.String())) err = dn.Stop() if err != nil { diff --git a/cmd/distributed/components/data_node.go b/cmd/distributed/components/data_node.go index eba3c0761be2cf67068be6f04a8ae1a3d123a90e..840a8179ad71f7c32366c6ce9ce8c97438ca1095 100644 --- a/cmd/distributed/components/data_node.go +++ b/cmd/distributed/components/data_node.go @@ -2,9 +2,9 @@ package components import ( "context" - "log" grpcdatanode "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -30,7 +30,7 @@ func (d *DataNode) Run() error { if err := d.svr.Run(); err != nil { panic(err) } - log.Println("Data node successfully started ...") + log.Debug("Datanode successfully started") return nil } diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 568295a09cd82a6611b328c30084bdf63ec8eb90..740afb781748f68c367d324b2b04a22289964a3d 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -1,10 +1,12 @@ package datanode import ( - "log" "sync" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -80,7 +82,7 @@ func (replica *ReplicaImpl) addSegment( replica.mu.Lock() defer replica.mu.Unlock() - log.Println("Add Segment", segmentID) + log.Debug("Add Segment", zap.Int64("Segment ID", segmentID)) position := &internalpb2.MsgPosition{ ChannelName: channelName, @@ -105,7 +107,7 @@ func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error { for index, ele := range replica.segments { if ele.segmentID == segmentID { - log.Println("Removing segment:", segmentID) + log.Debug("Removing segment", zap.Int64("Segment ID", segmentID)) numOfSegs := len(replica.segments) replica.segments[index] = replica.segments[numOfSegs-1] replica.segments = replica.segments[:numOfSegs-1] @@ -133,7 +135,7 @@ func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) for _, ele := range replica.segments { if ele.segmentID == segmentID { - log.Printf("updating segment(%v) row nums: (%v)", segmentID, numRows) + log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows)) ele.memorySize = 0 ele.numRows += numRows return nil @@ -187,7 +189,7 @@ func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemap } replica.collections[collectionID] = newCollection - log.Println("Create collection:", newCollection.GetName()) + log.Debug("Create collection", zap.String("collection name", newCollection.GetName())) return nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 8909e17f521e6500ec7877663cc7a3f7047e9228..7c040874e646e616eeff3888d330e31dcd7cb677 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "io" - "log" "sync/atomic" "time" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -139,7 +141,7 @@ func (node *DataNode) Init() error { case <-time.After(RPCConnectionTimeout): return errors.New("Get DmChannels failed in 30 seconds") case <-node.watchDm: - log.Println("insert channel names set") + log.Debug("insert channel names set") } for _, kv := range resp.InitParams.StartParams { @@ -209,7 +211,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common } func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { - log.Println("DataNode current state:", node.State.Load()) + log.Debug("DataNode current state", zap.Any("State", node.State.Load())) states := &internalpb2.ComponentStates{ State: &internalpb2.ComponentInfo{ NodeID: Params.NodeID, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index f0626a857143976e577233e72103b3357bb681ed..83f68beb539eda73123fc8a7891b7398be97dfdd 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -3,7 +3,6 @@ package datanode import ( "bytes" "encoding/binary" - "log" "math" "math/rand" "os" @@ -11,9 +10,11 @@ import ( "testing" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" @@ -74,7 +75,7 @@ func clearEtcd(rootPath string) error { if err != nil { return err } - log.Println("Clear ETCD with prefix writer/segment ") + log.Debug("Clear ETCD with prefix writer/segment ") err = etcdKV.RemoveWithPrefix("writer/ddl") if err != nil { @@ -84,7 +85,7 @@ func clearEtcd(rootPath string) error { if err != nil { return err } - log.Println("Clear ETCD with prefix writer/ddl") + log.Debug("Clear ETCD with prefix writer/ddl") return nil } @@ -324,7 +325,7 @@ func GenRowData() (rawData []byte) { panic(err) } rawData = append(rawData, bfloat64.Bytes()...) - log.Println("Rawdata length:", len(rawData)) + log.Debug("Rawdata length:", zap.Int("Length of rawData", len(rawData))) return } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 3ca4820644fc05f5c00f3982e5d5e5163923b1a2..4542f79505721322a65c10fe937ea904508d94e4 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -2,12 +2,14 @@ package datanode import ( "context" - "log" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" "go.etcd.io/etcd/clientv3" + + "go.uber.org/zap" ) type dataSyncService struct { @@ -34,7 +36,7 @@ func newDataSyncService(ctx context.Context, flushChan chan *flushMsg, func (dsService *dataSyncService) init() { if len(Params.InsertChannelNames) == 0 { - log.Println("InsertChannels not readly, init datasync service failed") + log.Error("InsertChannels not readly, init datasync service failed") return } @@ -42,7 +44,7 @@ func (dsService *dataSyncService) init() { } func (dsService *dataSyncService) start() { - log.Println("Data Sync Service Start Successfully") + log.Debug("Data Sync Service Start Successfully") dsService.fg.Start() } @@ -100,7 +102,8 @@ func (dsService *dataSyncService) initNodes() { []string{filterDmNode.Name()}, ) if err != nil { - log.Fatal("set edges failed in node:", dmStreamNode.Name()) + log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err)) + panic("set edges faild in the node") } // ddStreamNode @@ -109,7 +112,8 @@ func (dsService *dataSyncService) initNodes() { []string{ddNode.Name()}, ) if err != nil { - log.Fatal("set edges failed in node:", ddStreamNode.Name()) + log.Error("set edges failed in node", zap.String("name", ddStreamNode.Name()), zap.Error(err)) + panic("set edges faild in the node") } // filterDmNode @@ -118,7 +122,8 @@ func (dsService *dataSyncService) initNodes() { []string{insertBufferNode.Name()}, ) if err != nil { - log.Fatal("set edges failed in node:", filterDmNode.Name()) + log.Error("set edges failed in node", zap.String("name", filterDmNode.Name()), zap.Error(err)) + panic("set edges faild in the node") } // ddNode @@ -127,7 +132,8 @@ func (dsService *dataSyncService) initNodes() { []string{filterDmNode.Name()}, ) if err != nil { - log.Fatal("set edges failed in node:", ddNode.Name()) + log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err)) + panic("set edges faild in the node") } // insertBufferNode @@ -136,7 +142,8 @@ func (dsService *dataSyncService) initNodes() { []string{gcNode.Name()}, ) if err != nil { - log.Fatal("set edges failed in node:", insertBufferNode.Name()) + log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err)) + panic("set edges faild in the node") } // gcNode @@ -144,6 +151,7 @@ func (dsService *dataSyncService) initNodes() { []string{insertBufferNode.Name()}, []string{}) if err != nil { - log.Fatal("set edges failed in node:", gcNode.Name()) + log.Error("set edges failed in node", zap.String("name", gcNode.Name()), zap.Error(err)) + panic("set edges faild in the node") } } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 0135ee4ba09ff736ebb10d453577a6f617a1b3e3..f89e4132a1d2f05eba35e5ddaffa8eed75c032eb 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -3,14 +3,16 @@ package datanode import ( "context" "errors" - "log" "path" "sort" "strconv" "github.com/golang/protobuf/proto" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -67,22 +69,21 @@ func (ddNode *ddNode) Name() string { } func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Context) { - //fmt.Println("Do filterDdNode operation") if len(in) != 1 { - log.Println("Invalid operate message input in ddNode, input length = ", len(in)) + log.Error("Invalid operate message input in ddNode", zap.Int("input length", len(in))) // TODO: add error handling } msMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Println("type assertion failed for MsgStreamMsg") + log.Error("type assertion failed for MsgStreamMsg") // TODO: add error handling } ddNode.ddMsg = &ddMsg{ - collectionRecords: make(map[string][]metaOperateRecord), - partitionRecords: make(map[string][]metaOperateRecord), + collectionRecords: make(map[UniqueID][]*metaOperateRecord), + partitionRecords: make(map[UniqueID][]*metaOperateRecord), timeRange: TimeRange{ timestampMin: msMsg.TimestampMin(), timestampMax: msMsg.TimestampMax(), @@ -112,13 +113,13 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con case commonpb.MsgType_kDropPartition: ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) default: - log.Println("Not supporting message type:", msg.Type()) + log.Error("Not supporting message type", zap.Any("Type", msg.Type())) } } select { case fmsg := <-ddNode.inFlushCh: - log.Println(". receive flush message, flushing ...") + log.Debug(". receive flush message, flushing ...") localSegs := make([]UniqueID, 0) for _, segID := range fmsg.segmentIDs { if ddNode.replica.hasSegment(segID) { @@ -136,7 +137,7 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con // generate binlog if ddNode.ddBuffer.full() { - log.Println(". dd buffer full, auto flushing ...") + log.Debug(". dd buffer full, auto flushing ...") ddNode.flush() } @@ -144,6 +145,17 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con return []Msg{res}, ctx } +/* +flush() will do the following: + generate binlogs for all buffer data in ddNode, + store the generated binlogs to minIO/S3, + store the keys(paths to minIO/s3) of the binlogs to etcd. + +The keys of the binlogs are generated as below: + ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} + ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} + +*/ func (ddNode *ddNode) flush() { // generate binlog ddCodec := &storage.DataDefinitionCodec{} @@ -151,49 +163,46 @@ func (ddNode *ddNode) flush() { // buffer data to binlog binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes) if err != nil { - log.Println(err) + log.Error("Codec Serialize wrong", zap.Error(err)) continue } if len(binLogs) != 2 { - log.Println("illegal binLogs") + log.Error("illegal binLogs") continue } // binLogs -> minIO/S3 if len(data.ddRequestString) != len(data.timestamps) || len(data.timestamps) != len(data.eventTypes) { - log.Println("illegal ddBuffer, failed to save binlog") + log.Error("illegal ddBuffer, failed to save binlog") continue } else { - log.Println(".. dd buffer flushing ...") - // Blob key example: - // ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} - // ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} + log.Debug(".. dd buffer flushing ...") keyCommon := path.Join(Params.DdBinlogRootPath, strconv.FormatInt(collectionID, 10)) // save ts binlog timestampLogIdx, err := ddNode.idAllocator.allocID() if err != nil { - log.Println(err) + log.Error("Id allocate wrong", zap.Error(err)) } timestampKey := path.Join(keyCommon, binLogs[0].GetKey(), strconv.FormatInt(timestampLogIdx, 10)) err = ddNode.kv.Save(timestampKey, string(binLogs[0].GetValue())) if err != nil { - log.Println(err) + log.Error("Save to minIO/S3 Wrong", zap.Error(err)) } - log.Println("save ts binlog, key = ", timestampKey) + log.Debug("save ts binlog", zap.String("key", timestampKey)) // save dd binlog ddLogIdx, err := ddNode.idAllocator.allocID() if err != nil { - log.Println(err) + log.Error("Id allocate wrong", zap.Error(err)) } ddKey := path.Join(keyCommon, binLogs[1].GetKey(), strconv.FormatInt(ddLogIdx, 10)) err = ddNode.kv.Save(ddKey, string(binLogs[1].GetValue())) if err != nil { - log.Println(err) + log.Error("Save to minIO/S3 Wrong", zap.Error(err)) } - log.Println("save dd binlog, key = ", ddKey) + log.Debug("save dd binlog", zap.String("key", ddKey)) ddNode.flushMeta.AppendDDLBinlogPaths(collectionID, []string{timestampKey, ddKey}) } @@ -209,7 +218,7 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { // add collection if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; ok { err := errors.New("collection " + strconv.FormatInt(collectionID, 10) + " is already exists") - log.Println(err) + log.Error("String conversion wrong", zap.Error(err)) return } ddNode.ddRecords.collectionRecords[collectionID] = nil @@ -219,20 +228,19 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { var schema schemapb.CollectionSchema err := proto.Unmarshal(msg.Schema, &schema) if err != nil { - log.Println(err) + log.Error("proto unmarshal wrong", zap.Error(err)) return } // add collection err = ddNode.replica.addCollection(collectionID, &schema) if err != nil { - log.Println(err) + log.Error("replica add collection wrong", zap.Error(err)) return } - collectionName := schema.Name - ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], - metaOperateRecord{ + ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID], + &metaOperateRecord{ createOrDrop: true, timestamp: msg.Base.Timestamp, }) @@ -251,25 +259,21 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreateCollectionEventType) } +/* +dropCollection will drop collection in ddRecords but won't drop collection in replica +*/ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { collectionID := msg.CollectionID - //err := ddNode.replica.removeCollection(collectionID) - //if err != nil { - // log.Println(err) - //} - // remove collection if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; !ok { - err := errors.New("cannot found collection " + strconv.FormatInt(collectionID, 10)) - log.Println(err) + log.Error("Cannot find collection", zap.Int64("collection ID", collectionID)) return } delete(ddNode.ddRecords.collectionRecords, collectionID) - collectionName := msg.CollectionName - ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], - metaOperateRecord{ + ddNode.ddMsg.collectionRecords[collectionID] = append(ddNode.ddMsg.collectionRecords[collectionID], + &metaOperateRecord{ createOrDrop: false, timestamp: msg.Base.Timestamp, }) @@ -296,15 +300,13 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { // add partition if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; ok { - err := errors.New("partition " + strconv.FormatInt(partitionID, 10) + " is already exists") - log.Println(err) + log.Error("partition is already exists", zap.Int64("partition ID", partitionID)) return } ddNode.ddRecords.partitionRecords[partitionID] = nil - partitionName := msg.PartitionName - ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], - metaOperateRecord{ + ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID], + &metaOperateRecord{ createOrDrop: true, timestamp: msg.Base.Timestamp, }) @@ -334,15 +336,15 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { // remove partition if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; !ok { - err := errors.New("cannot found partition " + strconv.FormatInt(partitionID, 10)) - log.Println(err) + log.Error("cannot found partition", zap.Int64("partition ID", partitionID)) return } delete(ddNode.ddRecords.partitionRecords, partitionID) - partitionName := msg.PartitionName - ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], - metaOperateRecord{ + // partitionName := msg.PartitionName + // ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], + ddNode.ddMsg.partitionRecords[partitionID] = append(ddNode.ddMsg.partitionRecords[partitionID], + &metaOperateRecord{ createOrDrop: false, timestamp: msg.Base.Timestamp, }) diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 8fdac3ad6275cc979915109a5c86b277ffd896c3..05d17052c0b622ae46a7ddb39d2fec213bc57d4c 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -2,7 +2,6 @@ package datanode import ( "context" - "log" "testing" "time" @@ -40,7 +39,6 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { replica := newReplica() allocatorMock := NewAllocatorFactory() ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, allocatorMock) - log.Print() collID := UniqueID(0) collName := "col-test-0" diff --git a/internal/datanode/flow_graph_filter_dm_node.go b/internal/datanode/flow_graph_filter_dm_node.go index 752c9167255ec5ec15e954b8b3ea52ffcaa609bf..5e2ac9ca0222d62e133794a24f9f1846ad73fa68 100644 --- a/internal/datanode/flow_graph_filter_dm_node.go +++ b/internal/datanode/flow_graph_filter_dm_node.go @@ -2,9 +2,11 @@ package datanode import ( "context" - "log" "math" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -20,22 +22,21 @@ func (fdmNode *filterDmNode) Name() string { } func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Context) { - //fmt.Println("Do filterDmNode operation") if len(in) != 2 { - log.Println("Invalid operate message input in filterDmNode, input length = ", len(in)) + log.Error("Invalid operate message input in filterDmNode", zap.Int("input length", len(in))) // TODO: add error handling } msgStreamMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Println("type assertion failed for MsgStreamMsg") + log.Error("type assertion failed for MsgStreamMsg") // TODO: add error handling } ddMsg, ok := in[1].(*ddMsg) if !ok { - log.Println("type assertion failed for ddMsg") + log.Error("type assertion failed for ddMsg") // TODO: add error handling } @@ -63,7 +64,7 @@ func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, cont // case commonpb.MsgType_kDelete: // dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask)) default: - log.Println("Non supporting message type:", msg.Type()) + log.Error("Not supporting message type", zap.Any("Type", msg.Type())) } } @@ -75,7 +76,7 @@ func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, cont func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { // No dd record, do all insert requests. - records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionName] + records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionID] if !ok { return msg } @@ -88,7 +89,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // Filter insert requests before last record. if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { // TODO: what if the messages are misaligned? Here, we ignore those messages and print error - log.Println("Error, misaligned messages detected") + log.Error("misaligned messages detected") return nil } tmpTimestamps := make([]Timestamp, 0) diff --git a/internal/datanode/flow_graph_gc_node.go b/internal/datanode/flow_graph_gc_node.go index 1732dbb6eb7d4b851eabcad2dac4a856c7db85c9..396b752c21c246d70a435f59db374557caa63821 100644 --- a/internal/datanode/flow_graph_gc_node.go +++ b/internal/datanode/flow_graph_gc_node.go @@ -2,7 +2,10 @@ package datanode import ( "context" - "log" + + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" ) type gcNode struct { @@ -15,16 +18,15 @@ func (gcNode *gcNode) Name() string { } func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Context) { - //fmt.Println("Do gcNode operation") if len(in) != 1 { - log.Println("Invalid operate message input in gcNode, input length = ", len(in)) + log.Error("Invalid operate message input in gcNode", zap.Int("input length", len(in))) // TODO: add error handling } gcMsg, ok := in[0].(*gcMsg) if !ok { - log.Println("type assertion failed for gcMsg") + log.Error("type assertion failed for gcMsg") // TODO: add error handling } @@ -32,7 +34,7 @@ func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con for _, collectionID := range gcMsg.gcRecord.collections { err := gcNode.replica.removeCollection(collectionID) if err != nil { - log.Println(err) + log.Error("replica remove collection wrong", zap.Error(err)) } } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index c526946a61cc5c97bbb249717a142fedf40aa903..c7aea7fdba191945ce3f9c2a589b033f9bc9da02 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -4,21 +4,23 @@ import ( "bytes" "context" "encoding/binary" - "log" "path" "strconv" "unsafe" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "go.uber.org/zap" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/storage" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/storage" ) const ( @@ -86,16 +88,15 @@ func (ibNode *insertBufferNode) Name() string { } func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Context) { - // log.Println("=========== insert buffer Node Operating") if len(in) != 1 { - log.Println("Error: Invalid operate message input in insertBuffertNode, input length = ", len(in)) + log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in))) // TODO: add error handling } iMsg, ok := in[0].(*insertMsg) if !ok { - log.Println("Error: type assertion failed for insertMsg") + log.Error("type assertion failed for insertMsg") // TODO: add error handling } @@ -109,20 +110,20 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c if !ibNode.replica.hasSegment(currentSegID) { err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID()) if err != nil { - log.Println("Error: add segment error", err) + log.Error("add segment wrong", zap.Error(err)) } } if !ibNode.flushMeta.hasSegmentFlush(currentSegID) { err := ibNode.flushMeta.addSegmentFlush(currentSegID) if err != nil { - log.Println("Error: add segment flush meta error", err) + log.Error("add segment flush meta wrong", zap.Error(err)) } } err := ibNode.replica.updateStatistics(currentSegID, int64(len(msg.RowIDs))) if err != nil { - log.Println("Error: update Segment Row number wrong, ", err) + log.Error("update Segment Row number wrong", zap.Error(err)) } if _, ok := uniqueSeg[currentSegID]; !ok { @@ -138,11 +139,11 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c if len(segIDs) > 0 { switch { case iMsg.startPositions == nil || len(iMsg.startPositions) <= 0: - log.Println("Warning: insert Msg StartPosition empty") + log.Error("insert Msg StartPosition empty") default: err := ibNode.updateSegStatistics(segIDs, iMsg.startPositions[0]) if err != nil { - log.Println("Error: update segment statistics error, ", err) + log.Error("update segment statistics error", zap.Error(err)) } } } @@ -151,7 +152,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c // 1. iMsg -> buffer for _, msg := range iMsg.insertMessages { if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { - log.Println("Error: misaligned messages detected") + log.Error("misaligned messages detected") continue } currentSegID := msg.GetSegmentID() @@ -168,7 +169,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c collection, err := ibNode.replica.getCollectionByID(collectionID) if err != nil { // GOOSE TODO add error handler - log.Println("bbb, Get meta wrong:", err) + log.Error("Get meta wrong:", zap.Error(err)) continue } @@ -183,13 +184,14 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c if t.Key == "dim" { dim, err = strconv.Atoi(t.Value) if err != nil { - log.Println("strconv wrong") + log.Error("strconv wrong") } break } } if dim <= 0 { - log.Println("invalid dim") + log.Error("invalid dim") + continue // TODO: add error handling } @@ -210,7 +212,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c var v float32 buf := bytes.NewBuffer(blob.GetValue()[pos+offset:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.read float32 err:", err) + log.Error("binary.read float32 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) offset += int(unsafe.Sizeof(*(&v))) @@ -225,13 +227,13 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c if t.Key == "dim" { dim, err = strconv.Atoi(t.Value) if err != nil { - log.Println("strconv wrong") + log.Error("strconv wrong") } break } } if dim <= 0 { - log.Println("invalid dim") + log.Error("invalid dim") // TODO: add error handling } @@ -266,7 +268,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read bool failed:", err) + log.Error("binary.Read bool wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) @@ -287,7 +289,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read int8 failed:", err) + log.Error("binary.Read int8 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } @@ -307,7 +309,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read int16 failed:", err) + log.Error("binary.Read int16 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } @@ -327,7 +329,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewReader(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read int32 failed:", err) + log.Error("binary.Read int32 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } @@ -357,7 +359,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read int64 failed:", err) + log.Error("binary.Read int64 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } @@ -378,7 +380,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read float32 failed:", err) + log.Error("binary.Read float32 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } @@ -398,7 +400,7 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, blob := range msg.RowData { buf := bytes.NewBuffer(blob.GetValue()[pos:]) if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Println("binary.Read float64 failed:", err) + log.Error("binary.Read float64 wrong", zap.Error(err)) } fieldData.Data = append(fieldData.Data, v) } @@ -414,24 +416,24 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c // 1.4 if full // 1.4.1 generate binlogs if ibNode.insertBuffer.full(currentSegID) { - log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID)) + log.Debug(". Insert Buffer full, auto flushing ", zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID))) err = ibNode.flushSegment(currentSegID, msg.GetPartitionID(), collection.GetID()) if err != nil { - log.Printf("flush segment (%v) fail: %v", currentSegID, err) + log.Error("flush segment fail", zap.Int64("segmentID", currentSegID), zap.Error(err)) } } } if len(iMsg.insertMessages) > 0 { - log.Println("---insert buffer status---") + log.Debug("---insert buffer status---") var stopSign int = 0 for k := range ibNode.insertBuffer.insertData { if stopSign >= 10 { - log.Printf("......") + log.Debug("......") break } - log.Printf("seg(%v) buffer size = (%v)", k, ibNode.insertBuffer.size(k)) + log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k))) stopSign++ } } @@ -440,31 +442,31 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c // 1. insertBuffer(not empty) -> binLogs -> minIO/S3 for _, msg := range iMsg.flushMessages { for _, currentSegID := range msg.segmentIDs { - log.Printf(". Receiving flush message segID(%v)...", currentSegID) + log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID)) if ibNode.insertBuffer.size(currentSegID) > 0 { - log.Println(".. Buffer not empty, flushing ...") + log.Debug(".. Buffer not empty, flushing ...") seg, err := ibNode.replica.getSegmentByID(currentSegID) if err != nil { - log.Printf("flush segment fail: %v", err) + log.Error("flush segment fail", zap.Error(err)) continue } err = ibNode.flushSegment(currentSegID, seg.partitionID, seg.collectionID) if err != nil { - log.Printf("flush segment (%v) fail: %v", currentSegID, err) + log.Error("flush segment fail", zap.Int64("segmentID", currentSegID), zap.Error(err)) continue } } err := ibNode.completeFlush(currentSegID) if err != nil { - log.Println(err) + log.Error("complete flush wrong", zap.Error(err)) } - log.Println("Flush completed") + log.Debug("Flush completed") } } if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil { - log.Printf("Error: send hard time tick into pulsar channel failed, %s\n", err.Error()) + log.Error("send hard time tick into pulsar channel failed", zap.Error(err)) } var res Msg = &gcMsg{ @@ -499,7 +501,7 @@ func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueI // clear buffer delete(ibNode.insertBuffer.insertData, segID) - log.Println(".. Clearing buffer") + log.Debug(".. Clearing buffer") // 1.5.2 binLogs -> minIO/S3 collIDStr := strconv.FormatInt(collID, 10) @@ -507,7 +509,7 @@ func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueI segIDStr := strconv.FormatInt(segID, 10) keyPrefix := path.Join(ibNode.minioPrefix, collIDStr, partitionIDStr, segIDStr) - log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs)) + log.Debug(".. Saving binlogs to MinIO ...", zap.Int("number", len(binLogs))) for index, blob := range binLogs { uid, err := ibNode.idAllocator.allocID() if err != nil { @@ -525,7 +527,7 @@ func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueI return errors.Errorf("string to fieldID wrong, %v", err) } - log.Println("... Appending binlog paths ...", index) + log.Debug("... Appending binlog paths ...", zap.Int("number", index)) ibNode.flushMeta.AppendSegBinlogPaths(segID, fieldID, []string{key}) } return nil @@ -575,12 +577,12 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { } func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID, currentPosition *internalpb2.MsgPosition) error { - log.Println("Updating segments statistics...") + log.Debug("Updating segments statistics...") statsUpdates := make([]*internalpb2.SegmentStatisticsUpdates, 0, len(segIDs)) for _, segID := range segIDs { updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID) if err != nil { - log.Println("Error get segment", segID, "statistics updates", err) + log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err)) continue } updates.StartPosition.Timestamp = currentPosition.GetTimestamp() diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index 983d621de1aee470fc94be4110db78692ddb4f58..ee768bef15c17022821faeaca55e19377d35bdcc 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -18,13 +18,11 @@ type ( } ddMsg struct { - // TODO: use collection id - collectionRecords map[string][]metaOperateRecord - // TODO: use partition id - partitionRecords map[string][]metaOperateRecord - flushMessages []*flushMsg - gcRecord *gcRecord - timeRange TimeRange + collectionRecords map[UniqueID][]*metaOperateRecord + partitionRecords map[UniqueID][]*metaOperateRecord + flushMessages []*flushMsg + gcRecord *gcRecord + timeRange TimeRange } metaOperateRecord struct { diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index a7dab0883a064aa2fd850a1417af2219ccd03d54..a95e13d79de668393aa465b7b2c007172fd2dd24 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -3,10 +3,12 @@ package datanode import ( "context" "fmt" - "log" "reflect" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -27,10 +29,10 @@ func newMetaService(ctx context.Context, replica Replica, m MasterServiceInterfa } func (mService *metaService) init() { - log.Println("Initing meta ...") + log.Debug("Initing meta ...") err := mService.loadCollections() if err != nil { - log.Fatal("metaService init failed:", err) + log.Error("metaService init failed", zap.Error(err)) } } @@ -68,7 +70,7 @@ func (mService *metaService) getCollectionNames() ([]string, error) { } func (mService *metaService) createCollection(name string) error { - log.Println("Describing collections") + log.Debug("Describing collections") req := &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeCollection, diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 20f99a16721e69d07aac0d2d2f5262b1dc021da1..5a2c71a4a42ed56b655c8c1d063be263efe836b4 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -1,12 +1,12 @@ package datanode import ( - "log" "os" "path" "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -26,12 +26,9 @@ type ParamTable struct { FlushDdBufferSize int32 InsertBinlogRootPath string DdBinlogRootPath string + Log log.Config // === DataNode External Components Configs === - // --- External Client Address --- - //MasterAddress string - //ServiceAddress string // GOOSE TODO: init from config file - // --- Pulsar --- PulsarAddress string @@ -86,6 +83,7 @@ func (p *ParamTable) Init() { p.initFlushDdBufferSize() p.initInsertBinlogRootPath() p.initDdBinlogRootPath() + p.initLogCfg() // === DataNode External Components Configs === // --- Pulsar --- @@ -192,7 +190,7 @@ func (p *ParamTable) initDDChannelNames() { func (p *ParamTable) initMsgChannelSubName() { name, err := p.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") if err != nil { - log.Panic(err) + panic(err) } p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.NodeID, 10) } @@ -284,3 +282,34 @@ func (p *ParamTable) sliceIndex() int { } return -1 } + +func (p *ParamTable) initLogCfg() { + p.Log = log.Config{} + format, err := p.Load("log.format") + if err != nil { + panic(err) + } + p.Log.Format = format + level, err := p.Load("log.level") + if err != nil { + panic(err) + } + p.Log.Level = level + devStr, err := p.Load("log.dev") + if err != nil { + panic(err) + } + dev, err := strconv.ParseBool(devStr) + if err != nil { + panic(err) + } + p.Log.Development = dev + p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") + p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") + p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") + rootPath, err := p.Load("log.file.rootPath") + if err != nil { + panic(err) + } + p.Log.File.Filename = path.Join(rootPath, "datanode-"+strconv.FormatInt(p.NodeID, 10)+".log") +} diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 3c2390c0cfee0099d431001464fe606232d46829..3658b72237455af6d34dc056bb272297e58bbf34 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -2,14 +2,15 @@ package grpcdatanodeclient import ( "context" - "log" "time" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/util/retry" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -30,7 +31,7 @@ func NewClient(address string) *Client { func (c *Client) Init() error { connectGrpcFunc := func() error { - log.Println("DataNode connect czs::", c.address) + log.Debug("DataNode connect czs::", zap.String("address", c.address)) conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return err diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index c177ae8b2bdcf7b529753cbb4e3db03dcc953857..1bdfcf2b7cd8128d4ffd06e1ed46c333f71cb8ce 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -4,27 +4,27 @@ import ( "context" "fmt" "io" - "sync" - "time" - - "log" "net" "strconv" - - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "sync" + "time" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/config" + "go.uber.org/zap" + "google.golang.org/grpc" + dn "github.com/zilliztech/milvus-distributed/internal/datanode" dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "google.golang.org/grpc" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" ) type Server struct { @@ -64,11 +64,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { lis, err := net.Listen("tcp", addr) if err != nil { - log.Printf("DataNode GrpcServer:failed to listen: %v", err) + log.Warn("GrpcServer failed to listen", zap.Error(err)) s.grpcErrChan <- err return } - log.Println("DataNode:: addr:", addr) + log.Debug("DataNode address", zap.String("address", addr)) s.grpcServer = grpc.NewServer() datapb.RegisterDataNodeServer(s.grpcServer, s) @@ -78,7 +78,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(lis); err != nil { - log.Println("DataNode Start Grpc Failed!!!!") + log.Warn("DataNode Start Grpc Failed!") s.grpcErrChan <- err } @@ -97,12 +97,12 @@ func (s *Server) Run() error { if err := s.init(); err != nil { return err } - log.Println("data node init done ...") + log.Debug("data node init done ...") if err := s.start(); err != nil { return err } - log.Println("data node start done ...") + log.Debug("data node start done ...") return nil } @@ -131,7 +131,7 @@ func (s *Server) init() error { Params.LoadFromEnv() Params.LoadFromArgs() - log.Println("DataNode, port:", Params.Port) + log.Debug("DataNode port", zap.Int("port", Params.Port)) s.wg.Add(1) go s.startGrpcLoop(Params.Port) // wait for grpc server loop start @@ -141,8 +141,8 @@ func (s *Server) init() error { } // --- Master Server Client --- - log.Println("Master service address:", Params.MasterAddress) - log.Println("Init master service client ...") + log.Debug("Master service address", zap.String("address", Params.MasterAddress)) + log.Debug("Init master service client ...") masterClient, err := msc.NewClient(Params.MasterAddress, 20*time.Second) if err != nil { panic(err) @@ -166,8 +166,8 @@ func (s *Server) init() error { } // --- Data Server Client --- - log.Println("Data service address: ", Params.DataServiceAddress) - log.Println("DataNode Init data service client ...") + log.Debug("Data service address", zap.String("address", Params.DataServiceAddress)) + log.Debug("DataNode Init data service client ...") dataService := dsc.NewClient(Params.DataServiceAddress) if err = dataService.Init(); err != nil { panic(err) @@ -206,7 +206,7 @@ func (s *Server) init() error { s.closer = closer if err := s.impl.Init(); err != nil { - log.Println("impl init error: ", err) + log.Warn("impl init error: ", zap.Error(err)) return err } return nil