diff --git a/cmd/db-server/main.go b/cmd/db-server/main.go index 96659ef65a109161fa042e2d37909c7dcfa40eb9..6aa5082285db4fc70d5279342507b38b6a171fef 100644 --- a/cmd/db-server/main.go +++ b/cmd/db-server/main.go @@ -19,6 +19,7 @@ import ( "flag" "fmt" "github.com/BurntSushi/toml" + "github.com/matrixorigin/matrixone/pkg/util" "os" "os/signal" "syscall" @@ -67,10 +68,13 @@ func createMOServer(inputCtx context.Context, pu *config.ParameterUnit) { moServerCtx := context.WithValue(inputCtx, config.ParameterUnitKey, pu) mo = frontend.NewMOServer(moServerCtx, address, pu) { + if err := util.SetUUIDNodeID(nil); err != nil { + panic(err) + } // init trace/log/error framework if _, err := trace.Init(moServerCtx, trace.WithMOVersion(MoVersion), - trace.WithNode(0, trace.NodeTypeNode), + trace.WithNode("node_uuid", trace.NodeTypeNode), trace.EnableTracer(!pu.SV.DisableTrace), trace.WithBatchProcessMode(pu.SV.TraceBatchProcessor), trace.DebugMode(pu.SV.EnableTraceDebug), diff --git a/pkg/cnservice/server.go b/pkg/cnservice/server.go index 4fdebe1478378638721be9b973ee18e38b78f5f5..d2f309e0626abaa747b05babce3f9d0d97079137 100644 --- a/pkg/cnservice/server.go +++ b/pkg/cnservice/server.go @@ -164,7 +164,7 @@ func (s *service) createMOServer(inputCtx context.Context, pu *config.ParameterU // init trace/log/error framework if _, err := trace.Init(moServerCtx, trace.WithMOVersion(pu.SV.MoVersion), - trace.WithNode(0, trace.NodeTypeNode), + trace.WithNode("node_uuid", trace.NodeTypeCN), trace.EnableTracer(!pu.SV.DisableTrace), trace.WithBatchProcessMode(pu.SV.TraceBatchProcessor), trace.DebugMode(pu.SV.EnableTraceDebug), diff --git a/pkg/common/morpc/codec_header_test.go b/pkg/common/morpc/codec_header_test.go index cdaa715e7ef71371a379e7937324ba54cf9e3cb6..0154545b9eebe17c29b030ce595347602a6bdd3f 100644 --- a/pkg/common/morpc/codec_header_test.go +++ b/pkg/common/morpc/codec_header_test.go @@ -52,7 +52,7 @@ func TestDecodeContext(t *testing.T) { func TestEncodeAndDecodeTrace(t *testing.T) { hc := &traceCodec{} out := buf.NewByteBuf(8) - span := trace.SpanContextWithID(trace.TraceID(1)) + span := trace.SpanContextWithIDs(trace.TraceID{}, trace.SpanID{}) n, err := hc.Encode(&RPCMessage{Ctx: trace.ContextWithSpanContext(context.Background(), span)}, out) assert.Equal(t, 1+span.Size(), n) assert.NoError(t, err) diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index e12c3f5517cd15b17634824ecc4fdc83a803d4b0..2a054cea7653fe10cb77e0511a76b1bfe5fda96b 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -19,8 +19,6 @@ import ( "encoding/binary" goErrors "errors" "fmt" - "hash/fnv" - "os" "runtime/pprof" "sort" @@ -55,6 +53,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/process" "github.com/matrixorigin/matrixone/pkg/util/trace" + + "github.com/google/uuid" ) func onlyCreateStatementErrorInfo() string { @@ -148,35 +148,35 @@ func (mce *MysqlCmdExecutor) GetRoutineManager() *RoutineManager { return mce.routineMgr } -func (mce *MysqlCmdExecutor) RecordStatement(ctx context.Context, ses *Session, proc *process.Process, sql string, beginIns time.Time) context.Context { - statementId := util.Fastrand64() +func (mce *MysqlCmdExecutor) RecordStatement(ctx context.Context, ses *Session, proc *process.Process, cw ComputationWrapper, beginIns time.Time) context.Context { sessInfo := proc.SessionInfo - txnID := uint64(0) + var stmID uuid.UUID + copy(stmID[:], cw.GetUUID()) + var txnID uuid.UUID if handler := ses.GetTxnHandler(); handler.IsValidTxn() { - idBytes := handler.GetTxn().Txn().ID - hasher := fnv.New64() - if _, err := hasher.Write(idBytes); err != nil { - panic(err) - } - txnID = hasher.Sum64() + copy(txnID[:], handler.GetTxn().Txn().ID) } + var sesID uuid.UUID + copy(sesID[:], ses.GetUUID()) + fmtCtx := tree.NewFmtCtx(dialect.MYSQL) + cw.GetAst().Format(fmtCtx) trace.ReportStatement( ctx, &trace.StatementInfo{ - StatementID: statementId, - SessionID: sessInfo.GetConnectionID(), + StatementID: stmID, TransactionID: txnID, + SessionID: sesID, Account: "account", //fixme: sessInfo.GetAccount() User: sessInfo.GetUser(), Host: sessInfo.GetHost(), Database: sessInfo.GetDatabase(), - Statement: sql, + Statement: fmtCtx.String(), StatementFingerprint: "", // fixme StatementTag: "", // fixme RequestAt: util.NowNS(), }, ) - return trace.ContextWithSpanContext(ctx, trace.SpanContextWithID(trace.TraceID(statementId))) + return trace.ContextWithSpanContext(ctx, trace.SpanContextWithID(trace.TraceID(stmID))) } // outputPool outputs the data @@ -1528,13 +1528,17 @@ type TxnComputationWrapper struct { proc *process.Process ses *Session compile *compile.Compile + + uuid uuid.UUID } func InitTxnComputationWrapper(ses *Session, stmt tree.Statement, proc *process.Process) *TxnComputationWrapper { + uuid, _ := uuid.NewUUID() return &TxnComputationWrapper{ stmt: stmt, proc: proc, ses: ses, + uuid: uuid, } } @@ -1640,6 +1644,10 @@ func (cwft *TxnComputationWrapper) Compile(requestCtx context.Context, u interfa return cwft.compile, err } +func (cwft *TxnComputationWrapper) GetUUID() []byte { + return cwft.uuid[:] +} + func (cwft *TxnComputationWrapper) Run(ts uint64) error { return nil } @@ -1763,10 +1771,6 @@ func (mce *MysqlCmdExecutor) doComQuery(requestCtx context.Context, sql string) TimeZone: ses.timeZone, } - var rootCtx = mce.RecordStatement(trace.DefaultContext(), ses, proc, sql, beginInstant) - ctx, span := trace.Start(rootCtx, "doComQuery") - defer span.End() - cws, err := GetComputationWrapper(ses.GetDatabaseName(), sql, ses.GetUserName(), @@ -1795,6 +1799,7 @@ func (mce *MysqlCmdExecutor) doComQuery(requestCtx context.Context, sql string) for _, cw := range cws { ses.SetMysqlResultSet(&MysqlResultSet{}) stmt := cw.GetAst() + ctx := mce.RecordStatement(requestCtx, ses, proc, cw, beginInstant) /* if it is in an active or multi-statement transaction, we check the type of the statement. diff --git a/pkg/frontend/mysql_cmd_executor_test.go b/pkg/frontend/mysql_cmd_executor_test.go index 68dec6cabf0d05d1f8f0c0987e38b532a00c9bf0..474da897a0418ff405b8fb216bbb9c72edfd3631 100644 --- a/pkg/frontend/mysql_cmd_executor_test.go +++ b/pkg/frontend/mysql_cmd_executor_test.go @@ -63,6 +63,7 @@ func Test_mce(t *testing.T) { ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() use_t := mock_frontend.NewMockComputationWrapper(ctrl) + use_t.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() stmts, err := parsers.Parse(dialect.MYSQL, "use T") if err != nil { t.Error(err) @@ -78,6 +79,7 @@ func Test_mce(t *testing.T) { t.Error(err) } create_1.EXPECT().GetAst().Return(stmts[0]).AnyTimes() + create_1.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() create_1.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes() create_1.EXPECT().Compile(gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes() create_1.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() @@ -89,6 +91,7 @@ func Test_mce(t *testing.T) { t.Error(err) } select_1.EXPECT().GetAst().Return(stmts[0]).AnyTimes() + select_1.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() select_1.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes() select_1.EXPECT().Compile(gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes() select_1.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() @@ -166,6 +169,7 @@ func Test_mce(t *testing.T) { t.Error(err) } select_2.EXPECT().GetAst().Return(stmts[0]).AnyTimes() + select_2.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() select_2.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes() select_2.EXPECT().Compile(gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes() select_2.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() diff --git a/pkg/frontend/session.go b/pkg/frontend/session.go index b798753c685626e4d90a39eb4dfa4c1523172452..d5ff924569da81b01b8d18e71758aed74bb99c34 100644 --- a/pkg/frontend/session.go +++ b/pkg/frontend/session.go @@ -33,6 +33,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/moengine" "github.com/matrixorigin/matrixone/pkg/vm/mempool" "github.com/matrixorigin/matrixone/pkg/vm/mmu/guest" + + "github.com/google/uuid" ) const MaxPrepareNumberInOneSession = 64 @@ -110,6 +112,8 @@ type Session struct { tenant *TenantInfo + uuid uuid.UUID + timeZone *time.Location } @@ -140,6 +144,7 @@ func NewSession(proto Protocol, gm *guest.Mmu, mp *mempool.Mempool, PU *config.P outputCallback: getDataFromPipeline, timeZone: time.Local, } + ses.uuid, _ = uuid.NewUUID() ses.SetOptionBits(OPTION_AUTOCOMMIT) ses.txnCompileCtx.SetSession(ses) ses.txnHandler.SetSession(ses) @@ -215,6 +220,10 @@ func (ses *Session) GetTenantInfo() *TenantInfo { return ses.tenant } +func (ses *Session) GetUUID() []byte { + return ses.uuid[:] +} + func (ses *Session) SetTenantInfo(ti *TenantInfo) { ses.tenant = ti } diff --git a/pkg/frontend/test/types_mock.go b/pkg/frontend/test/types_mock.go index d6fb3d13afa497362112953866b443a93c91cbc9..db184e2f7372a7e49b412134cb7677f68feb6784 100644 --- a/pkg/frontend/test/types_mock.go +++ b/pkg/frontend/test/types_mock.go @@ -6,6 +6,7 @@ package mock_frontend import ( "context" + "github.com/google/uuid" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -73,6 +74,18 @@ func (m *MockComputationWrapper) EXPECT() *MockComputationWrapperMockRecorder { return m.recorder } +func (m *MockComputationWrapper) GetUUID() []byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUUID") + ret0, _ := ret[0].(uuid.UUID) + return ret0[:] +} + +func (mr *MockComputationWrapperMockRecorder) GetUUID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUUID", reflect.TypeOf((*MockComputationWrapper)(nil).GetUUID)) +} + // Compile mocks base method. func (m *MockComputationWrapper) Compile(requestCtx context.Context, u interface{}, fill func(interface{}, *batch.Batch) error) (interface{}, error) { m.ctrl.T.Helper() diff --git a/pkg/frontend/types.go b/pkg/frontend/types.go index 8e6354a0fd01a87deb8635b3560daf39f20fcccc..f122f7ff230aada6dce9aa7a1da25c9db5f74204 100644 --- a/pkg/frontend/types.go +++ b/pkg/frontend/types.go @@ -48,6 +48,8 @@ type ComputationWrapper interface { GetAffectedRows() uint64 Compile(requestCtx context.Context, u interface{}, fill func(interface{}, *batch.Batch) error) (interface{}, error) + + GetUUID() []byte } type ColumnInfo interface { diff --git a/pkg/logutil/report.go b/pkg/logutil/report.go index 64872146558c42a8915b17617c7ca919220a7c17..c2590591036fe39a4e01843c369add7734549ed7 100644 --- a/pkg/logutil/report.go +++ b/pkg/logutil/report.go @@ -132,9 +132,20 @@ func (e *TraceLogEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Fiel if e.spanContextField.Key == SpanFieldKey.Load().(string) { fields = append(fields, e.spanContextField) } + for _, v := range fields { + if v.Type == zapcore.BoolType && v.Key == MOInternalFiledKeyNoopReport { + return e.Encoder.EncodeEntry(entry, fields[:0]) + } + } return GetReportZapFunc()(e.Encoder, entry, fields) } +const MOInternalFiledKeyNoopReport = "MOInternalFiledKeyNoopReport" + +func NoReportFiled() zap.Field { + return zap.Bool(MOInternalFiledKeyNoopReport, false) +} + func newTraceLogEncoder() *TraceLogEncoder { // default like zap.NewProductionEncoderConfig(), but clean core-elems ENCODE e := &TraceLogEncoder{ diff --git a/pkg/util/export/batch_processor.go b/pkg/util/export/batch_processor.go index 7d9c15235209f76096ec6822e94f1150117ec5dc..23a782dd210359cb35df97732e9791b9cfc3710b 100644 --- a/pkg/util/export/batch_processor.go +++ b/pkg/util/export/batch_processor.go @@ -27,7 +27,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/util/batchpipe" ) -const defaultQueueSize = 262144 // queue mem cost = 2MB +const defaultQueueSize = 1310720 // queue mem cost = 10MB // bufferHolder hold ItemBuffer content, handle buffer's new/flush/reset/reminder(base on timer) operations. // work like: @@ -321,7 +321,7 @@ func (c *MOCollector) Stop(graceful bool) error { var buf = new(bytes.Buffer) c.stopOnce.Do(func() { for len(c.awakeCollect) > 0 { - logutil.Debugf("doCollect left %d job", len(c.awakeCollect)) + logutil.Debugf("doCollect left %d job", len(c.awakeCollect), logutil.NoReportFiled()) time.Sleep(250 * time.Second) } for _, buffer := range c.buffers { diff --git a/pkg/util/trace/buffer_pipe_sql.go b/pkg/util/trace/buffer_pipe_sql.go index 4479fc8103ba85507bd372a0774525102738234d..7b6b2ee91f40b2caf8f4ae86ad521625b54a47cc 100644 --- a/pkg/util/trace/buffer_pipe_sql.go +++ b/pkg/util/trace/buffer_pipe_sql.go @@ -29,6 +29,8 @@ import ( bp "github.com/matrixorigin/matrixone/pkg/util/batchpipe" "github.com/matrixorigin/matrixone/pkg/util/errors" ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" + + "github.com/google/uuid" ) var errorFormatter atomic.Value @@ -113,8 +115,8 @@ func (t batchSqlHandler) NewItemBatchHandler(ctx context.Context) func(batch any if err := exec.Exec(ctx, batch, ie.NewOptsBuilder().Finish()); err != nil { // fixme: error -> log -> exec.Exec -> ... cycle // fixme: handle error situation re-try - logutil.Error(fmt.Sprintf("[Trace] faield to insert. sql: %s", batch), NoReportFiled()) - logutil.Error(fmt.Sprintf("[Trace] faield to insert. err: %v", err), NoReportFiled()) + logutil.Error(fmt.Sprintf("[Trace] faield to insert. sql: %s", batch), logutil.NoReportFiled()) + logutil.Error(fmt.Sprintf("[Trace] faield to insert. err: %v", err), logutil.NoReportFiled()) } } return f @@ -149,7 +151,7 @@ func genSpanBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { buf.WriteString("`span_id`") buf.WriteString(", `statement_id`") buf.WriteString(", `parent_span_id`") - buf.WriteString(", `node_id`") + buf.WriteString(", `node_uuid`") buf.WriteString(", `node_type`") buf.WriteString(", `resource`") buf.WriteString(", `name`") @@ -166,10 +168,10 @@ func genSpanBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { panic("Not MOSpan") } buf.WriteString("(") - buf.WriteString(fmt.Sprintf("%d", s.SpanID)) - buf.WriteString(fmt.Sprintf(", %d", s.TraceID)) - buf.WriteString(fmt.Sprintf(", %d", s.parent.SpanContext().SpanID)) - buf.WriteString(fmt.Sprintf(", %d", moNode.NodeID)) //node_d + buf.WriteString(fmt.Sprintf(`"%s"`, s.SpanID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, s.TraceID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, s.parent.SpanContext().SpanID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeUuid)) // node_uuid buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeType.String())) // node_type buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.tracer.provider.resource.String()))) // resource buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Name.String()))) // Name @@ -194,7 +196,7 @@ func genLogBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { buf.WriteString("(") buf.WriteString("`span_id`") buf.WriteString(", `statement_id`") - buf.WriteString(", `node_id`") + buf.WriteString(", `node_uuid`") buf.WriteString(", `node_type`") buf.WriteString(", `timestamp`") buf.WriteString(", `name`") @@ -212,9 +214,9 @@ func genLogBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { panic("Not MOLog") } buf.WriteString("(") - buf.WriteString(fmt.Sprintf("%d", s.SpanId)) - buf.WriteString(fmt.Sprintf(", %d", s.StatementId)) - buf.WriteString(fmt.Sprintf(", %d", moNode.NodeID)) // node_id + buf.WriteString(fmt.Sprintf(`"%s"`, s.SpanID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, s.TraceID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeUuid)) // node_uuid buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeType.String())) // node_type buf.WriteString(fmt.Sprintf(`, "%s"`, nanoSec2DatetimeString(s.Timestamp))) // timestamp buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Name))) // log level @@ -238,7 +240,7 @@ func genZapLogBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { buf.WriteString("(") buf.WriteString("`span_id`") buf.WriteString(", `statement_id`") - buf.WriteString(", `node_id`") + buf.WriteString(", `node_uuid`") buf.WriteString(", `node_type`") buf.WriteString(", `timestamp`") buf.WriteString(", `name`") @@ -257,9 +259,9 @@ func genZapLogBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { } buf.WriteString("(") - buf.WriteString(fmt.Sprintf("%d", s.SpanContext.SpanID)) - buf.WriteString(fmt.Sprintf(", %d", s.SpanContext.TraceID)) - buf.WriteString(fmt.Sprintf(", %d", moNode.NodeID)) // node_id + buf.WriteString(fmt.Sprintf(`"%s"`, s.SpanContext.SpanID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, s.SpanContext.TraceID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeUuid)) // node_uuid buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeType.String())) // node_type buf.WriteString(fmt.Sprintf(`, "%s"`, s.Timestamp.Format("2006-01-02 15:04:05.000000"))) // timestamp buf.WriteString(fmt.Sprintf(`, "%s"`, s.LoggerName)) // name @@ -291,7 +293,7 @@ func genStatementBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { buf.WriteString(", `statement`") buf.WriteString(", `statement_tag`") buf.WriteString(", `statement_fingerprint`") - buf.WriteString(", `node_id`") + buf.WriteString(", `node_uuid`") buf.WriteString(", `node_type`") buf.WriteString(", `request_at`") buf.WriteString(", `status`") @@ -306,9 +308,9 @@ func genStatementBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { panic("Not StatementInfo") } buf.WriteString("(") - buf.WriteString(fmt.Sprintf("%d", s.StatementID)) - buf.WriteString(fmt.Sprintf(", %d", s.TransactionID)) - buf.WriteString(fmt.Sprintf(", %d", s.SessionID)) + buf.WriteString(fmt.Sprintf(`"%s"`, uuid.UUID(s.StatementID).String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, uuid.UUID(s.TransactionID).String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, uuid.UUID(s.SessionID).String())) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Account))) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.User))) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Host))) @@ -316,7 +318,7 @@ func genStatementBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Statement))) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.StatementFingerprint))) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.StatementTag))) - buf.WriteString(fmt.Sprintf(", %d", moNode.NodeID)) + buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeUuid)) buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeType.String())) buf.WriteString(fmt.Sprintf(`, "%s"`, nanoSec2DatetimeString(s.RequestAt))) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Status.String()))) @@ -337,7 +339,7 @@ func genErrorBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { buf.WriteString("(") buf.WriteString("`statement_id`") buf.WriteString(", `span_id`") - buf.WriteString(", `node_id`") + buf.WriteString(", `node_uuid`") buf.WriteString(", `node_type`") buf.WriteString(", `err_code`") buf.WriteString(", `stack`") @@ -358,9 +360,9 @@ func genErrorBatchSql(in []IBuffer2SqlItem, buf *bytes.Buffer) any { span = SpanFromContext(DefaultContext()) } buf.WriteString("(") - buf.WriteString(fmt.Sprintf("%d", span.SpanContext().TraceID)) - buf.WriteString(fmt.Sprintf(", %d", span.SpanContext().SpanID)) - buf.WriteString(fmt.Sprintf(", %d", moNode.NodeID)) + buf.WriteString(fmt.Sprintf(`"%s"`, span.SpanContext().TraceID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, span.SpanContext().SpanID.String())) + buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeUuid)) buf.WriteString(fmt.Sprintf(`, "%s"`, moNode.NodeType.String())) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(s.Error.Error()))) buf.WriteString(fmt.Sprintf(`, "%s"`, quote(fmt.Sprintf(errorFormatter.Load().(string), s.Error)))) diff --git a/pkg/util/trace/buffer_pipe_sql_test.go b/pkg/util/trace/buffer_pipe_sql_test.go index 079f2fec6bc9028c82438256994ed46aecc5b0d9..da0865545769d30209a73985a168014dff01f4f7 100644 --- a/pkg/util/trace/buffer_pipe_sql_test.go +++ b/pkg/util/trace/buffer_pipe_sql_test.go @@ -54,7 +54,7 @@ func setup() { context.Background(), EnableTracer(true), WithMOVersion("v0.test.0"), - WithNode(0, NodeTypeNode), + WithNode("node_uuid", NodeTypeNode), WithSQLExecutor(func() internalExecutor.InternalExecutor { return nil }), @@ -66,7 +66,7 @@ func setup() { errors.SetErrorReporter(noopReportError) sc := SpanFromContext(DefaultContext()).SpanContext() - nodeStateSpanIdStr = fmt.Sprintf("%d, %d", sc.TraceID, sc.SpanID) + nodeStateSpanIdStr = fmt.Sprintf(`"%s", "%s"`, sc.TraceID.String(), sc.SpanID.String()) if err := agent.Listen(agent.Options{}); err != nil { _ = fmt.Errorf("listen gops agent failed: %s", err) @@ -145,374 +145,13 @@ func Test_batchSqlHandler_NewItemBuffer_Check_genBatchFunc(t1 *testing.T) { } } -func Test_batchSqlHandler_genErrorBatchSql(t1 *testing.T) { - time.Local = time.FixedZone("CST", 0) // set time-zone +0000 - type args struct { - in []IBuffer2SqlItem - buf *bytes.Buffer - } - buf := new(bytes.Buffer) - newCtx, span := Start(DefaultContext(), "Test_batchSqlHandler_genErrorBatchSql") - defer span.End() - sc := SpanFromContext(newCtx).SpanContext() - newStatementSpanIdStr := fmt.Sprintf("%d, %d", sc.TraceID, sc.SpanID) - tests := []struct { - name string - args args - want string - }{ - { - name: "single_error", - args: args{ - in: []IBuffer2SqlItem{ - &MOErrorHolder{Error: err1, Timestamp: uint64(0)}, - }, - buf: buf, - }, - want: `insert into system.error_info (` + - "`statement_id`, `span_id`, `node_id`, `node_type`, `err_code`, `stack`, `timestamp`" + - `) values (` + nodeStateSpanIdStr + `, 0, "Node", "test1", "test1", "1970-01-01 00:00:00.000000")`, - }, - { - name: "multi_error", - args: args{ - in: []IBuffer2SqlItem{ - &MOErrorHolder{Error: err1, Timestamp: uint64(0)}, - &MOErrorHolder{Error: err2, Timestamp: uint64(time.Millisecond + time.Microsecond)}, - &MOErrorHolder{Error: errors.WithContext(newCtx, err2), - Timestamp: uint64(time.Millisecond + time.Microsecond)}, - }, - buf: buf, - }, - want: `insert into system.error_info (` + - "`statement_id`, `span_id`, `node_id`, `node_type`, `err_code`, `stack`, `timestamp`" + - `) values (` + nodeStateSpanIdStr + `, 0, "Node", "test1", "test1", "1970-01-01 00:00:00.000000")` + - `,(` + nodeStateSpanIdStr + `, 0, "Node", "test2: test1", "test2: test1", "1970-01-01 00:00:00.001001")` + - `,(` + newStatementSpanIdStr + `, 0, "Node", "test2: test1", "test2: test1", "1970-01-01 00:00:00.001001")`, - }, - } - errorFormatter.Store("%v") - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - if got := genErrorBatchSql(tt.args.in, tt.args.buf); got != tt.want { - t1.Errorf("genErrorBatchSql() = %v,\n want %v", got, tt.want) - } else { - t1.Logf("SQL: %s", got) - } - }) - } -} - -func Test_batchSqlHandler_genZapBatchSql(t1 *testing.T) { - time.Local = time.FixedZone("CST", 0) // set time-zone +0000 - type args struct { - in []IBuffer2SqlItem - buf *bytes.Buffer - } - buf := new(bytes.Buffer) - sc := SpanContextWithIDs(1, 1) - tests := []struct { - name string - args args - want string - }{ - { - name: "single", - args: args{ - in: []IBuffer2SqlItem{ - &MOZap{ - Level: zapcore.InfoLevel, - SpanContext: &sc, - Timestamp: time.Unix(0, 0), - Caller: "trace/buffer_pipe_sql_test.go:100", - Message: "info message", - Extra: "{}", - }, - }, - buf: buf, - }, - want: `insert into system.log_info (` + - "`span_id`, `statement_id`, `node_id`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + - `) values (1, 1, 0, "Node", "1970-01-01 00:00:00.000000", "", "info", "trace/buffer_pipe_sql_test.go:100", "info message", "{}")`, - }, - { - name: "multi", - args: args{ - in: []IBuffer2SqlItem{ - &MOZap{ - Level: zapcore.InfoLevel, - SpanContext: &sc, - Timestamp: time.Unix(0, 0), - Caller: "trace/buffer_pipe_sql_test.go:100", - Message: "info message", - Extra: "{}", - }, - &MOZap{ - Level: zapcore.DebugLevel, - SpanContext: &sc, - Timestamp: time.Unix(0, int64(time.Microsecond+time.Millisecond)), - Caller: "trace/buffer_pipe_sql_test.go:100", - Message: "debug message", - Extra: "{}", - }, - }, - buf: buf, - }, - want: `insert into system.log_info (` + - "`span_id`, `statement_id`, `node_id`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + - `) values (1, 1, 0, "Node", "1970-01-01 00:00:00.000000", "", "info", "trace/buffer_pipe_sql_test.go:100", "info message", "{}")` + - `,(1, 1, 0, "Node", "1970-01-01 00:00:00.001001", "", "debug", "trace/buffer_pipe_sql_test.go:100", "debug message", "{}")`, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - if got := genZapLogBatchSql(tt.args.in, tt.args.buf); got != tt.want { - t1.Errorf("genZapLogBatchSql() = %v,\n want %v", got, tt.want) - } else { - t1.Logf("SQL: %s", got) - } - }) - } - time.Local = time.FixedZone("CST", 0) // set time-zone +0000 -} - -func Test_batchSqlHandler_genLogBatchSql(t1 *testing.T) { - time.Local = time.FixedZone("CST", 0) // set time-zone +0000 - type args struct { - in []IBuffer2SqlItem - buf *bytes.Buffer - } - buf := new(bytes.Buffer) - tests := []struct { - name string - args args - want string - }{ - { - name: "single", - args: args{ - in: []IBuffer2SqlItem{ - &MOLog{ - StatementId: 1, - SpanId: 1, - Timestamp: uint64(0), - Level: zapcore.InfoLevel, - Caller: util.Caller(0), - Message: "info message", - Extra: "{}", - }, - }, - buf: buf, - }, - want: `insert into system.log_info (` + - "`span_id`, `statement_id`, `node_id`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + - `) values (1, 1, 0, "Node", "1970-01-01 00:00:00.000000", "", "info", "Test_batchSqlHandler_genLogBatchSql", "info message", "{}")`, - }, - { - name: "multi", - args: args{ - in: []IBuffer2SqlItem{ - &MOLog{ - StatementId: 1, - SpanId: 1, - Timestamp: uint64(0), - Level: zapcore.InfoLevel, - Caller: util.Caller(0), - Message: "info message", - Extra: "{}", - }, - &MOLog{ - StatementId: 1, - SpanId: 1, - Timestamp: uint64(time.Millisecond + time.Microsecond), - Level: zapcore.DebugLevel, - Caller: util.Caller(0), - Message: "debug message", - Extra: "{}", - }, - }, - buf: buf, - }, - want: `insert into system.log_info (` + - "`span_id`, `statement_id`, `node_id`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + - `) values (1, 1, 0, "Node", "1970-01-01 00:00:00.000000", "", "info", "Test_batchSqlHandler_genLogBatchSql", "info message", "{}")` + - `,(1, 1, 0, "Node", "1970-01-01 00:00:00.001001", "", "debug", "Test_batchSqlHandler_genLogBatchSql", "debug message", "{}")`, - }, - } - logStackFormatter.Store("%n") - t1.Logf("%%n : %n", tests[0].args.in[0].(*MOLog).Caller) - t1.Logf("%%s : %s", tests[0].args.in[0].(*MOLog).Caller) - t1.Logf("%%+s: %+s", tests[0].args.in[0].(*MOLog).Caller) - t1.Logf("%%v : %v", tests[0].args.in[0].(*MOLog).Caller) - t1.Logf("%%+v: %+v", tests[0].args.in[0].(*MOLog).Caller) - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - if got := genLogBatchSql(tt.args.in, tt.args.buf); got != tt.want { - t1.Errorf("genLogBatchSql() = %v, want %v", got, tt.want) - } else { - t1.Logf("SQL: %s", got) - } - }) - } -} - -func Test_batchSqlHandler_genSpanBatchSql(t1 *testing.T) { - time.Local = time.FixedZone("CST", 0) // set time-zone +0000 - type args struct { - in []IBuffer2SqlItem - buf *bytes.Buffer - } - tests := []struct { - name string - args args - want string - }{ - { - name: "single_span", - args: args{ - in: []IBuffer2SqlItem{ - &MOSpan{ - SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: 1, SpanID: 1}, parent: noopSpan{}}, - Name: *bytes.NewBuffer([]byte("span1")), - StartTimeNS: util.TimeNano(0), - EndTimeNS: util.TimeNano(time.Microsecond), - Duration: util.TimeNano(time.Microsecond), - tracer: gTracer.(*MOTracer), - }, - }, - buf: buf, - }, - want: `insert into system.span_info (` + - "`span_id`, `statement_id`, `parent_span_id`, `node_id`, `node_type`, `resource`, `name`, `start_time`, `end_time`, `duration`" + - `) values (1, 1, 0, 0, "Node", "{\"Node\":{\"node_id\":0,\"node_type\":0},\"version\":\"v0.test.0\"}", "span1", "1970-01-01 00:00:00.000000", "1970-01-01 00:00:00.000001", 1000)`, - }, - { - name: "multi_span", - args: args{ - in: []IBuffer2SqlItem{ - &MOSpan{ - SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: 1, SpanID: 1}, parent: noopSpan{}}, - Name: *bytes.NewBuffer([]byte("span1")), - StartTimeNS: util.TimeNano(0), - EndTimeNS: util.TimeNano(time.Microsecond), - Duration: util.TimeNano(time.Microsecond), - tracer: gTracer.(*MOTracer), - }, - &MOSpan{ - SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: 1, SpanID: 2}, parent: noopSpan{}}, - Name: *bytes.NewBuffer([]byte("span2")), - StartTimeNS: util.TimeNano(time.Microsecond), - EndTimeNS: util.TimeNano(time.Millisecond), - Duration: util.TimeNano(time.Millisecond - time.Microsecond), - tracer: gTracer.(*MOTracer), - }, - }, - buf: buf, - }, - want: `insert into system.span_info (` + - "`span_id`, `statement_id`, `parent_span_id`, `node_id`, `node_type`, `resource`, `name`, `start_time`, `end_time`, `duration`" + - `) values (1, 1, 0, 0, "Node", "{\"Node\":{\"node_id\":0,\"node_type\":0},\"version\":\"v0.test.0\"}", "span1", "1970-01-01 00:00:00.000000", "1970-01-01 00:00:00.000001", 1000)` + - `,(2, 1, 0, 0, "Node", "{\"Node\":{\"node_id\":0,\"node_type\":0},\"version\":\"v0.test.0\"}", "span2", "1970-01-01 00:00:00.000001", "1970-01-01 00:00:00.001000", 999000)`, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - got := genSpanBatchSql(tt.args.in, tt.args.buf) - require.Equal(t1, tt.want, got) - }) - } -} - -func Test_batchSqlHandler_genStatementBatchSql(t1 *testing.T) { - type args struct { - in []IBuffer2SqlItem - buf *bytes.Buffer - } - tests := []struct { - name string - args args - want string - }{ - { - name: "single_statement", - args: args{ - in: []IBuffer2SqlItem{ - &StatementInfo{ - StatementID: 1, - TransactionID: 1, - SessionID: 1, - Account: "MO", - User: "moroot", - Database: "system", - Statement: "show tables", - StatementFingerprint: "show tables", - StatementTag: "", - RequestAt: util.TimeNano(0), - Status: StatementStatusRunning, - ExecPlan: "", - }, - }, - buf: buf, - }, - want: `insert into system.statement_info (` + - "`statement_id`, `transaction_id`, `session_id`, `account`, `user`, `host`, `database`, `statement`, `statement_tag`, `statement_fingerprint`, `node_id`, `node_type`, `request_at`, `status`, `exec_plan`" + - `) values (1, 1, 1, "MO", "moroot", "", "system", "show tables", "show tables", "", 0, "Node", "1970-01-01 00:00:00.000000", "Running", "")`, - }, - { - name: "multi_statement", - args: args{ - in: []IBuffer2SqlItem{ - &StatementInfo{ - StatementID: 1, - TransactionID: 1, - SessionID: 1, - Account: "MO", - User: "moroot", - Database: "system", - Statement: "show tables", - StatementFingerprint: "show tables", - StatementTag: "", - RequestAt: util.TimeNano(0), - Status: StatementStatusRunning, - ExecPlan: "", - }, - &StatementInfo{ - StatementID: 2, - TransactionID: 1, - SessionID: 1, - Account: "MO", - User: "moroot", - Database: "system", - Statement: "show databases", - StatementFingerprint: "show databases", - StatementTag: "dcl", - RequestAt: util.TimeNano(time.Microsecond), - Status: StatementStatusFailed, - ExecPlan: "", - }, - }, - buf: buf, - }, - want: `insert into system.statement_info (` + - "`statement_id`, `transaction_id`, `session_id`, `account`, `user`, `host`, `database`, `statement`, `statement_tag`, `statement_fingerprint`, `node_id`, `node_type`, `request_at`, `status`, `exec_plan`" + - `) values (1, 1, 1, "MO", "moroot", "", "system", "show tables", "show tables", "", 0, "Node", "1970-01-01 00:00:00.000000", "Running", "")` + - `,(2, 1, 1, "MO", "moroot", "", "system", "show databases", "show databases", "dcl", 0, "Node", "1970-01-01 00:00:00.000001", "Failed", "")`, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - if got := genStatementBatchSql(tt.args.in, tt.args.buf); got != tt.want { - t1.Errorf("genStatementBatchSql() = %v, want %v", got, tt.want) - } - }) - } -} - func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { + time.Local = time.FixedZone("CST", 0) // set time-zone +0000 type fields struct { Reminder batchpipe.Reminder sizeThreshold int64 } + sc := SpanContextWithIDs(_1TraceID, _1SpanID) defaultFields := fields{ Reminder: batchpipe.NewConstantClock(15 * time.Second), sizeThreshold: MB, @@ -539,8 +178,8 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { }, wantFunc: genErrorBatchSql, want: `insert into system.error_info (` + - "`statement_id`, `span_id`, `node_id`, `node_type`, `err_code`, `stack`, `timestamp`" + - `) values (` + nodeStateSpanIdStr + `, 0, "Node", "test1", "test1", "1970-01-01 00:00:00.000000")`, + "`statement_id`, `span_id`, `node_uuid`, `node_type`, `err_code`, `stack`, `timestamp`" + + `) values (` + nodeStateSpanIdStr + `, "node_uuid", "Node", "test1", "test1", "1970-01-01 00:00:00.000000")`, }, { name: "multi_error", @@ -554,9 +193,9 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { }, wantFunc: genErrorBatchSql, want: `insert into system.error_info (` + - "`statement_id`, `span_id`, `node_id`, `node_type`, `err_code`, `stack`, `timestamp`" + - `) values (` + nodeStateSpanIdStr + `, 0, "Node", "test1", "test1", "1970-01-01 00:00:00.000000")` + - `,(` + nodeStateSpanIdStr + `, 0, "Node", "test2: test1", "test2: test1", "1970-01-01 00:00:00.001001")`, + "`statement_id`, `span_id`, `node_uuid`, `node_type`, `err_code`, `stack`, `timestamp`" + + `) values (` + nodeStateSpanIdStr + `, "node_uuid", "Node", "test1", "test1", "1970-01-01 00:00:00.000000")` + + `,(` + nodeStateSpanIdStr + `, "node_uuid", "Node", "test2: test1", "test2: test1", "1970-01-01 00:00:00.001001")`, }, { name: "single_log", @@ -564,21 +203,21 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { args: args{ in: []IBuffer2SqlItem{ &MOLog{ - StatementId: 1, - SpanId: 1, - Timestamp: uint64(0), - Level: zapcore.InfoLevel, - Caller: util.Caller(0), - Message: "info message", - Extra: "{}", + TraceID: _1TraceID, + SpanID: _1SpanID, + Timestamp: uint64(0), + Level: zapcore.InfoLevel, + Caller: util.Caller(0), + Message: "info message", + Extra: "{}", }, }, buf: buf, }, wantFunc: genLogBatchSql, want: `insert into system.log_info (` + - "`span_id`, `statement_id`, `node_id`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + - `) values (1, 1, 0, "Node", "1970-01-01 00:00:00.000000", "", "info", "Test_buffer2Sql_GetBatch_AllType", "info message", "{}")`, + "`span_id`, `statement_id`, `node_uuid`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + + `) values ("0000000000000001", "00000000-0000-0000-0000-000000000001", "node_uuid", "Node", "1970-01-01 00:00:00.000000", "", "info", "Test_buffer2Sql_GetBatch_AllType", "info message", "{}")`, }, { name: "multi_log", @@ -586,31 +225,31 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { args: args{ in: []IBuffer2SqlItem{ &MOLog{ - StatementId: 1, - SpanId: 1, - Timestamp: uint64(0), - Level: zapcore.InfoLevel, - Caller: util.Caller(0), - Message: "info message", - Extra: "{}", + TraceID: _1TraceID, + SpanID: _1SpanID, + Timestamp: uint64(0), + Level: zapcore.InfoLevel, + Caller: util.Caller(0), + Message: "info message", + Extra: "{}", }, &MOLog{ - StatementId: 1, - SpanId: 1, - Timestamp: uint64(time.Millisecond + time.Microsecond), - Level: zapcore.DebugLevel, - Caller: util.Caller(0), - Message: "debug message", - Extra: "{}", + TraceID: _1TraceID, + SpanID: _1SpanID, + Timestamp: uint64(time.Millisecond + time.Microsecond), + Level: zapcore.DebugLevel, + Caller: util.Caller(0), + Message: "debug message", + Extra: "{}", }, }, buf: buf, }, wantFunc: genLogBatchSql, want: `insert into system.log_info (` + - "`span_id`, `statement_id`, `node_id`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + - `) values (1, 1, 0, "Node", "1970-01-01 00:00:00.000000", "", "info", "Test_buffer2Sql_GetBatch_AllType", "info message", "{}")` + - `,(1, 1, 0, "Node", "1970-01-01 00:00:00.001001", "", "debug", "Test_buffer2Sql_GetBatch_AllType", "debug message", "{}")`, + "`span_id`, `statement_id`, `node_uuid`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + + `) values ("0000000000000001", "00000000-0000-0000-0000-000000000001", "node_uuid", "Node", "1970-01-01 00:00:00.000000", "", "info", "Test_buffer2Sql_GetBatch_AllType", "info message", "{}")` + + `,("0000000000000001", "00000000-0000-0000-0000-000000000001", "node_uuid", "Node", "1970-01-01 00:00:00.001001", "", "debug", "Test_buffer2Sql_GetBatch_AllType", "debug message", "{}")`, }, { name: "single_span", @@ -618,7 +257,7 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { args: args{ in: []IBuffer2SqlItem{ &MOSpan{ - SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: 1, SpanID: 1}, parent: noopSpan{}}, + SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: _1TraceID, SpanID: _1SpanID}, parent: noopSpan{}}, Name: *bytes.NewBuffer([]byte("span1")), StartTimeNS: util.TimeNano(0), EndTimeNS: util.TimeNano(time.Microsecond), @@ -630,8 +269,8 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { }, wantFunc: genSpanBatchSql, want: `insert into system.span_info (` + - "`span_id`, `statement_id`, `parent_span_id`, `node_id`, `node_type`, `resource`, `name`, `start_time`, `end_time`, `duration`" + - `) values (1, 1, 0, 0, "Node", "{\"Node\":{\"node_id\":0,\"node_type\":0},\"version\":\"v0.test.0\"}", "span1", "1970-01-01 00:00:00.000000", "1970-01-01 00:00:00.000001", 1000)`, + "`span_id`, `statement_id`, `parent_span_id`, `node_uuid`, `node_type`, `resource`, `name`, `start_time`, `end_time`, `duration`" + + `) values ("0000000000000001", "00000000-0000-0000-0000-000000000001", "0000000000000000", "node_uuid", "Node", "{\"Node\":{\"node_uuid\":\"node_uuid\",\"node_type\":0},\"version\":\"v0.test.0\"}", "span1", "1970-01-01 00:00:00.000000", "1970-01-01 00:00:00.000001", 1000)`, }, { name: "multi_span", @@ -639,7 +278,7 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { args: args{ in: []IBuffer2SqlItem{ &MOSpan{ - SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: 1, SpanID: 1}, parent: noopSpan{}}, + SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: _1TraceID, SpanID: _1SpanID}, parent: noopSpan{}}, Name: *bytes.NewBuffer([]byte("span1")), StartTimeNS: util.TimeNano(0), EndTimeNS: util.TimeNano(time.Microsecond), @@ -647,7 +286,7 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { tracer: gTracer.(*MOTracer), }, &MOSpan{ - SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: 1, SpanID: 2}, parent: noopSpan{}}, + SpanConfig: SpanConfig{SpanContext: SpanContext{TraceID: _1TraceID, SpanID: _2SpanID}, parent: noopSpan{}}, Name: *bytes.NewBuffer([]byte("span2")), StartTimeNS: util.TimeNano(time.Microsecond), EndTimeNS: util.TimeNano(time.Millisecond), @@ -659,9 +298,9 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { }, wantFunc: genSpanBatchSql, want: `insert into system.span_info (` + - "`span_id`, `statement_id`, `parent_span_id`, `node_id`, `node_type`, `resource`, `name`, `start_time`, `end_time`, `duration`" + - `) values (1, 1, 0, 0, "Node", "{\"Node\":{\"node_id\":0,\"node_type\":0},\"version\":\"v0.test.0\"}", "span1", "1970-01-01 00:00:00.000000", "1970-01-01 00:00:00.000001", 1000)` + - `,(2, 1, 0, 0, "Node", "{\"Node\":{\"node_id\":0,\"node_type\":0},\"version\":\"v0.test.0\"}", "span2", "1970-01-01 00:00:00.000001", "1970-01-01 00:00:00.001000", 999000)`, + "`span_id`, `statement_id`, `parent_span_id`, `node_uuid`, `node_type`, `resource`, `name`, `start_time`, `end_time`, `duration`" + + `) values ("0000000000000001", "00000000-0000-0000-0000-000000000001", "0000000000000000", "node_uuid", "Node", "{\"Node\":{\"node_uuid\":\"node_uuid\",\"node_type\":0},\"version\":\"v0.test.0\"}", "span1", "1970-01-01 00:00:00.000000", "1970-01-01 00:00:00.000001", 1000)` + + `,("0000000000000002", "00000000-0000-0000-0000-000000000001", "0000000000000000", "node_uuid", "Node", "{\"Node\":{\"node_uuid\":\"node_uuid\",\"node_type\":0},\"version\":\"v0.test.0\"}", "span2", "1970-01-01 00:00:00.000001", "1970-01-01 00:00:00.001000", 999000)`, }, { name: "single_statement", @@ -669,9 +308,9 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { args: args{ in: []IBuffer2SqlItem{ &StatementInfo{ - StatementID: 1, - TransactionID: 1, - SessionID: 1, + StatementID: _1TraceID, + TransactionID: _1TxnID, + SessionID: _1SesID, Account: "MO", User: "moroot", Database: "system", @@ -687,8 +326,8 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { }, wantFunc: genStatementBatchSql, want: `insert into system.statement_info (` + - "`statement_id`, `transaction_id`, `session_id`, `account`, `user`, `host`, `database`, `statement`, `statement_tag`, `statement_fingerprint`, `node_id`, `node_type`, `request_at`, `status`, `exec_plan`" + - `) values (1, 1, 1, "MO", "moroot", "", "system", "show tables", "show tables", "", 0, "Node", "1970-01-01 00:00:00.000000", "Running", "")`, + "`statement_id`, `transaction_id`, `session_id`, `account`, `user`, `host`, `database`, `statement`, `statement_tag`, `statement_fingerprint`, `node_uuid`, `node_type`, `request_at`, `status`, `exec_plan`" + + `) values ("00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000001", "MO", "moroot", "", "system", "show tables", "show tables", "", "node_uuid", "Node", "1970-01-01 00:00:00.000000", "Running", "")`, }, { name: "multi_statement", @@ -696,9 +335,9 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { args: args{ in: []IBuffer2SqlItem{ &StatementInfo{ - StatementID: 1, - TransactionID: 1, - SessionID: 1, + StatementID: _1TraceID, + TransactionID: _1TxnID, + SessionID: _1SesID, Account: "MO", User: "moroot", Database: "system", @@ -710,9 +349,9 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { ExecPlan: "", }, &StatementInfo{ - StatementID: 2, - TransactionID: 1, - SessionID: 1, + StatementID: _2TraceID, + TransactionID: _1TxnID, + SessionID: _1SesID, Account: "MO", User: "moroot", Database: "system", @@ -728,9 +367,60 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { }, wantFunc: genStatementBatchSql, want: `insert into system.statement_info (` + - "`statement_id`, `transaction_id`, `session_id`, `account`, `user`, `host`, `database`, `statement`, `statement_tag`, `statement_fingerprint`, `node_id`, `node_type`, `request_at`, `status`, `exec_plan`" + - `) values (1, 1, 1, "MO", "moroot", "", "system", "show tables", "show tables", "", 0, "Node", "1970-01-01 00:00:00.000000", "Running", "")` + - `,(2, 1, 1, "MO", "moroot", "", "system", "show databases", "show databases", "dcl", 0, "Node", "1970-01-01 00:00:00.000001", "Failed", "")`, + "`statement_id`, `transaction_id`, `session_id`, `account`, `user`, `host`, `database`, `statement`, `statement_tag`, `statement_fingerprint`, `node_uuid`, `node_type`, `request_at`, `status`, `exec_plan`" + + `) values ("00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000001", "MO", "moroot", "", "system", "show tables", "show tables", "", "node_uuid", "Node", "1970-01-01 00:00:00.000000", "Running", "")` + + `,("00000000-0000-0000-0000-000000000002", "00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000001", "MO", "moroot", "", "system", "show databases", "show databases", "dcl", "node_uuid", "Node", "1970-01-01 00:00:00.000001", "Failed", "")`, + }, + { + name: "single_zap", + fields: defaultFields, + args: args{ + in: []IBuffer2SqlItem{ + &MOZap{ + Level: zapcore.InfoLevel, + SpanContext: &sc, + Timestamp: time.Unix(0, 0), + Caller: "trace/buffer_pipe_sql_test.go:100", + Message: "info message", + Extra: "{}", + }, + }, + buf: buf, + }, + wantFunc: genZapLogBatchSql, + want: `insert into system.log_info (` + + "`span_id`, `statement_id`, `node_uuid`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + + `) values ("0000000000000001", "00000000-0000-0000-0000-000000000001", "node_uuid", "Node", "1970-01-01 00:00:00.000000", "", "info", "trace/buffer_pipe_sql_test.go:100", "info message", "{}")`, + }, + { + name: "multi_zap", + fields: defaultFields, + args: args{ + in: []IBuffer2SqlItem{ + &MOZap{ + Level: zapcore.InfoLevel, + SpanContext: &sc, + Timestamp: time.Unix(0, 0), + Caller: "trace/buffer_pipe_sql_test.go:100", + Message: "info message", + Extra: "{}", + }, + &MOZap{ + Level: zapcore.DebugLevel, + SpanContext: &sc, + Timestamp: time.Unix(0, int64(time.Microsecond+time.Millisecond)), + Caller: "trace/buffer_pipe_sql_test.go:100", + Message: "debug message", + Extra: "{}", + }, + }, + buf: buf, + }, + wantFunc: genZapLogBatchSql, + want: `insert into system.log_info (` + + "`span_id`, `statement_id`, `node_uuid`, `node_type`, `timestamp`, `name`, `level`, `caller`, `message`, `extra`" + + `) values ("0000000000000001", "00000000-0000-0000-0000-000000000001", "node_uuid", "Node", "1970-01-01 00:00:00.000000", "", "info", "trace/buffer_pipe_sql_test.go:100", "info message", "{}")` + + `,("0000000000000001", "00000000-0000-0000-0000-000000000001", "node_uuid", "Node", "1970-01-01 00:00:00.001001", "", "debug", "trace/buffer_pipe_sql_test.go:100", "debug message", "{}")`, }, } @@ -750,9 +440,10 @@ func Test_buffer2Sql_GetBatch_AllType(t *testing.T) { if got := b.(*buffer2Sql).genBatchFunc; reflect.ValueOf(got).Pointer() != reflect.ValueOf(tt.wantFunc).Pointer() { t.Errorf("buffer2Sql's genBatchFunc = %v, want %v", got, tt.wantFunc) } - if got := b.GetBatch(tt.args.buf); got.(string) != tt.want { - t.Errorf("GetBatch() = %v,\n want %v", got, tt.want) - } + + got := b.GetBatch(tt.args.buf) + require.Equal(t, tt.want, got) + t.Logf("GetBatch() = %v", got) }) } } diff --git a/pkg/util/trace/config.go b/pkg/util/trace/config.go new file mode 100644 index 0000000000000000000000000000000000000000..8195edafe08ef4288be663fa10fde086b61a3058 --- /dev/null +++ b/pkg/util/trace/config.go @@ -0,0 +1,404 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "encoding/json" + "github.com/matrixorigin/matrixone/pkg/util" + ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "io" + "sync" + "sync/atomic" +) + +const ( + InternalExecutor = "InternalExecutor" + FileService = "FileService" +) + +const ( + MOStatementType = "MOStatementType" + MOSpanType = "MOSpan" + MOLogType = "MOLog" + MOZapType = "MOZap" + MOErrorType = "MOError" +) + +const ( + B int64 = 1 << (iota * 10) + KB + MB + GB +) + +// tracerProviderConfig. +type tracerProviderConfig struct { + // spanProcessors contains collection of SpanProcessors that are processing pipeline + // for spans in the trace signal. + // SpanProcessors registered with a TracerProvider and are called at the start + // and end of a Span's lifecycle, and are called in the order they are + // registered. + spanProcessors []SpanProcessor + + enableTracer uint32 // see EnableTracer + + // idGenerator is used to generate all Span and Trace IDs when needed. + idGenerator IDGenerator + + // resource contains attributes representing an entity that produces telemetry. + resource *Resource // see WithMOVersion, WithNode, + + // TODO: can check span's END + debugMode bool // see DebugMode + + batchProcessMode string // see WithBatchProcessMode + + sqlExecutor func() ie.InternalExecutor // see WithSQLExecutor + + mux sync.RWMutex +} + +func (cfg *tracerProviderConfig) getNodeResource() *MONodeResource { + cfg.mux.RLock() + defer cfg.mux.RUnlock() + if val, has := cfg.resource.Get("Node"); !has { + return &MONodeResource{} + } else { + return val.(*MONodeResource) + } +} + +func (cfg *tracerProviderConfig) IsEnable() bool { + cfg.mux.RLock() + defer cfg.mux.RUnlock() + return atomic.LoadUint32(&cfg.enableTracer) == 1 +} + +func (cfg *tracerProviderConfig) EnableTracer(enable bool) { + cfg.mux.Lock() + defer cfg.mux.Unlock() + if enable { + atomic.StoreUint32(&cfg.enableTracer, 1) + } else { + atomic.StoreUint32(&cfg.enableTracer, 0) + } +} + +// TracerProviderOption configures a TracerProvider. +type TracerProviderOption interface { + apply(*tracerProviderConfig) +} + +type tracerProviderOptionFunc func(config *tracerProviderConfig) + +func (f tracerProviderOptionFunc) apply(config *tracerProviderConfig) { + f(config) +} + +func WithMOVersion(v string) tracerProviderOptionFunc { + return func(config *tracerProviderConfig) { + config.resource.Put("version", v) + } +} + +// WithNode give id as NodeId, t as NodeType +func WithNode(uuid string, t NodeType) tracerProviderOptionFunc { + return func(cfg *tracerProviderConfig) { + cfg.resource.Put("Node", &MONodeResource{ + NodeUuid: uuid, + NodeType: t, + }) + } +} + +func EnableTracer(enable bool) tracerProviderOptionFunc { + return func(cfg *tracerProviderConfig) { + cfg.EnableTracer(enable) + } +} + +func DebugMode(debug bool) tracerProviderOptionFunc { + return func(cfg *tracerProviderConfig) { + cfg.debugMode = debug + } +} + +func WithBatchProcessMode(mode string) tracerProviderOptionFunc { + return func(cfg *tracerProviderConfig) { + cfg.batchProcessMode = mode + } +} + +func WithSQLExecutor(f func() ie.InternalExecutor) tracerProviderOptionFunc { + return func(cfg *tracerProviderConfig) { + cfg.sqlExecutor = f + } +} + +type Uint64IdGenerator struct{} + +func (M Uint64IdGenerator) NewIDs() (uint64, uint64) { + return util.Fastrand64(), util.Fastrand64() +} + +func (M Uint64IdGenerator) NewSpanID() uint64 { + return util.Fastrand64() +} + +var _ IDGenerator = &moIDGenerator{} + +type moIDGenerator struct{} + +func (M moIDGenerator) NewIDs() (TraceID, SpanID) { + tid := TraceID{} + binary.BigEndian.PutUint64(tid[:], util.Fastrand64()) + binary.BigEndian.PutUint64(tid[8:], util.Fastrand64()) + sid := SpanID{} + binary.BigEndian.PutUint64(sid[:], util.Fastrand64()) + return tid, sid +} + +func (M moIDGenerator) NewSpanID() SpanID { + sid := SpanID{} + binary.BigEndian.PutUint64(sid[:], util.Fastrand64()) + return sid +} + +type TraceID [16]byte + +var nilTraceID TraceID + +// IsZero checks whether the trace TraceID is 0 value. +func (t TraceID) IsZero() bool { + return !bytes.Equal(t[:], nilTraceID[:]) +} + +func (t TraceID) String() string { + var dst [36]byte + bytes2Uuid(dst[:], t) + return string(dst[:]) +} + +type SpanID [8]byte + +var nilSpanID SpanID + +// SetByUUID use prefix of uuid as value +func (s *SpanID) SetByUUID(uuid string) { + var dst [16]byte + uuid2Bytes(dst[:], uuid) + copy(s[:], dst[0:8]) +} + +func (s SpanID) String() string { + return hex.EncodeToString(s[:]) +} + +func uuid2Bytes(dst []byte, uuid string) { + _ = dst[15] + l := len(uuid) + if l != 36 || uuid[8] != '-' || uuid[13] != '-' || uuid[18] != '-' || uuid[23] != '-' { + return + } + hex.Decode(dst[0:4], []byte(uuid[0:8])) + hex.Decode(dst[4:6], []byte(uuid[9:13])) + hex.Decode(dst[6:8], []byte(uuid[14:18])) + hex.Decode(dst[8:10], []byte(uuid[19:23])) + hex.Decode(dst[10:], []byte(uuid[24:])) +} + +func bytes2Uuid(dst []byte, src [16]byte) { + _, _ = dst[35], src[15] + hex.Encode(dst[0:8], src[0:4]) + hex.Encode(dst[9:13], src[4:6]) + hex.Encode(dst[14:18], src[6:8]) + hex.Encode(dst[19:23], src[8:10]) + hex.Encode(dst[24:], src[10:]) + dst[8] = '-' + dst[13] = '-' + dst[18] = '-' + dst[23] = '-' +} + +var _ zapcore.ObjectMarshaler = (*SpanContext)(nil) + +const SpanFieldKey = "span" + +func SpanField(sc SpanContext) zap.Field { + return zap.Object(SpanFieldKey, &sc) +} + +func IsSpanField(field zapcore.Field) bool { + return field.Key == SpanFieldKey +} + +// SpanContext contains identifying trace information about a Span. +type SpanContext struct { + TraceID TraceID `json:"trace_id"` + SpanID SpanID `json:"span_id"` +} + +func (c *SpanContext) Size() (n int) { + return 24 +} + +func (c *SpanContext) MarshalTo(dAtA []byte) (int, error) { + l := cap(dAtA) + if l < c.Size() { + return -1, io.ErrUnexpectedEOF + } + copy(dAtA, c.TraceID[:]) + copy(dAtA[16:], c.SpanID[:]) + return c.Size(), nil +} + +func (c *SpanContext) Unmarshal(dAtA []byte) error { + l := cap(dAtA) + if l < c.Size() { + return io.ErrUnexpectedEOF + } + copy(c.TraceID[:], dAtA[0:16]) + copy(c.SpanID[:], dAtA[16:24]) + return nil +} + +func (c SpanContext) GetIDs() (TraceID, SpanID) { + return c.TraceID, c.SpanID +} + +func (c *SpanContext) Reset() { + c.TraceID = TraceID{} + c.SpanID = SpanID{} +} + +func (c *SpanContext) IsEmpty() bool { + return c.TraceID.IsZero() +} + +func (c *SpanContext) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("TraceId", c.TraceID.String()) + enc.AddString("SpanId", c.SpanID.String()) + return nil +} + +func SpanContextWithID(id TraceID) SpanContext { + return SpanContext{TraceID: id} +} + +func SpanContextWithIDs(tid TraceID, sid SpanID) SpanContext { + return SpanContext{TraceID: tid, SpanID: sid} +} + +// SpanConfig is a group of options for a Span. +type SpanConfig struct { + SpanContext + + // NewRoot identifies a Span as the root Span for a new trace. This is + // commonly used when an existing trace crosses trust boundaries and the + // remote parent span context should be ignored for security. + NewRoot bool `json:"NewRoot"` // see WithNewRoot + parent Span `json:"-"` +} + +// SpanStartOption applies an option to a SpanConfig. These options are applicable +// only when the span is created. +type SpanStartOption interface { + applySpanStart(*SpanConfig) +} + +type SpanEndOption interface { + applySpanEnd(*SpanConfig) +} + +// SpanOption applies an option to a SpanConfig. +type SpanOption interface { + SpanStartOption + SpanEndOption +} + +type spanOptionFunc func(*SpanConfig) + +func (f spanOptionFunc) applySpanEnd(cfg *SpanConfig) { + f(cfg) +} + +func (f spanOptionFunc) applySpanStart(cfg *SpanConfig) { + f(cfg) +} + +func WithNewRoot(newRoot bool) spanOptionFunc { + return spanOptionFunc(func(cfg *SpanConfig) { + cfg.NewRoot = newRoot + }) +} + +type Resource struct { + m map[string]any +} + +func newResource() *Resource { + return &Resource{m: make(map[string]any)} + +} + +func (r *Resource) Put(key string, val any) { + r.m[key] = val +} + +func (r *Resource) Get(key string) (any, bool) { + val, has := r.m[key] + return val, has +} + +// String need to improve +func (r *Resource) String() string { + buf, _ := json.Marshal(r.m) + return string(buf) + +} + +type NodeType int + +const ( + NodeTypeNode NodeType = iota + NodeTypeCN + NodeTypeDN + NodeTypeLogService +) + +func (t NodeType) String() string { + switch t { + case NodeTypeNode: + return "Node" + case NodeTypeCN: + return "CN" + case NodeTypeDN: + return "DN" + case NodeTypeLogService: + return "LogService" + default: + return "Unknown" + } +} + +type MONodeResource struct { + NodeUuid string `json:"node_uuid"` + NodeType NodeType `json:"node_type"` +} diff --git a/pkg/util/trace/example/main.go b/pkg/util/trace/example/main.go index 0ea34f56c0dcf68e16b382ac4210d8d9fb072392..29e6783f3a0e8224fb1cc02a63c8284a812824ca 100644 --- a/pkg/util/trace/example/main.go +++ b/pkg/util/trace/example/main.go @@ -47,7 +47,7 @@ func bootstrap(ctx context.Context) (context.Context, error) { return trace.Init(ctx, trace.WithMOVersion("v0.6.0"), // nodeType like CN/DN/LogService; id maybe in config. - trace.WithNode(0, trace.NodeTypeNode), + trace.WithNode("node_uuid", trace.NodeTypeNode), // config[enableTrace], default: true trace.EnableTracer(true), // config[traceBatchProcessor], distributed node should use "FileService" in system_vars_config.toml diff --git a/pkg/util/trace/mo_trace.go b/pkg/util/trace/mo_trace.go index c9996792fd3b440872b38cb14e67f6e523190073..e589cdf3e4628c886e342bf4e0fc8d1c1b6057ed 100644 --- a/pkg/util/trace/mo_trace.go +++ b/pkg/util/trace/mo_trace.go @@ -17,11 +17,7 @@ package trace import ( "bytes" "context" - "encoding/binary" "encoding/hex" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "io" "sync" "unsafe" @@ -100,118 +96,6 @@ func (t *MOTracer) Start(ctx context.Context, name string, opts ...SpanOption) ( return ContextWithSpan(ctx, span), span } -var _ zapcore.ObjectMarshaler = (*SpanContext)(nil) - -const SpanFieldKey = "span" - -func SpanField(sc SpanContext) zap.Field { - return zap.Object(SpanFieldKey, &sc) -} - -func IsSpanField(field zapcore.Field) bool { - return field.Key == SpanFieldKey -} - -// SpanContext contains identifying trace information about a Span. -type SpanContext struct { - TraceID TraceID `json:"trace_id"` - SpanID SpanID `json:"span_id"` -} - -func (c SpanContext) GetIDs() (TraceID, SpanID) { - return c.TraceID, c.SpanID -} - -func (c *SpanContext) Reset() { - c.TraceID = 0 - c.SpanID = 0 -} - -func (c *SpanContext) IsEmpty() bool { - return c.TraceID == 0 -} - -func (c *SpanContext) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddUint64("TraceId", uint64(c.TraceID)) - enc.AddUint64("SpanId", uint64(c.SpanID)) - return nil -} - -func (c *SpanContext) Size() (n int) { - return 16 -} - -func (c *SpanContext) MarshalTo(dAtA []byte) (int, error) { - l := cap(dAtA) - if l < c.Size() { - return -1, io.ErrUnexpectedEOF - } - binary.BigEndian.PutUint64(dAtA, uint64(c.TraceID)) - binary.BigEndian.PutUint64(dAtA[8:], uint64(c.SpanID)) - return c.Size(), nil -} - -func (c *SpanContext) Unmarshal(dAtA []byte) error { - l := cap(dAtA) - if l < c.Size() { - return io.ErrUnexpectedEOF - } - c.TraceID = TraceID(binary.BigEndian.Uint64(dAtA)) - c.SpanID = SpanID(binary.BigEndian.Uint64(dAtA[8:])) - return nil -} - -func SpanContextWithID(id TraceID) SpanContext { - return SpanContext{TraceID: id} -} - -func SpanContextWithIDs(tid TraceID, sid SpanID) SpanContext { - return SpanContext{TraceID: tid, SpanID: sid} -} - -// SpanConfig is a group of options for a Span. -type SpanConfig struct { - SpanContext - - // NewRoot identifies a Span as the root Span for a new trace. This is - // commonly used when an existing trace crosses trust boundaries and the - // remote parent span context should be ignored for security. - NewRoot bool `json:"NewRoot"` // see WithNewRoot - parent Span `json:"-"` -} - -// SpanStartOption applies an option to a SpanConfig. These options are applicable -// only when the span is created. -type SpanStartOption interface { - applySpanStart(*SpanConfig) -} - -type SpanEndOption interface { - applySpanEnd(*SpanConfig) -} - -// SpanOption applies an option to a SpanConfig. -type SpanOption interface { - SpanStartOption - SpanEndOption -} - -type spanOptionFunc func(*SpanConfig) - -func (f spanOptionFunc) applySpanEnd(cfg *SpanConfig) { - f(cfg) -} - -func (f spanOptionFunc) applySpanStart(cfg *SpanConfig) { - f(cfg) -} - -func WithNewRoot(newRoot bool) spanOptionFunc { - return spanOptionFunc(func(cfg *SpanConfig) { - cfg.NewRoot = newRoot - }) -} - var _ Span = &MOSpan{} var _ IBuffer2SqlItem = &MOSpan{} diff --git a/pkg/util/trace/mo_trace_test.go b/pkg/util/trace/mo_trace_test.go index 3c4cd82e1ee157a5aa4caaa85b27efbcdd226c45..90fbf960eb82a533162dc64b9b7263aacc152cde 100644 --- a/pkg/util/trace/mo_trace_test.go +++ b/pkg/util/trace/mo_trace_test.go @@ -20,6 +20,15 @@ import ( "testing" ) +var _1TxnID = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x1} +var _1SesID = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x1} +var _1TraceID TraceID = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x1} +var _2TraceID TraceID = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x2} +var _10F0TraceID TraceID = [16]byte{0x09, 0x87, 0x65, 0x43, 0x21, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0} +var _1SpanID SpanID = [8]byte{0, 0, 0, 0, 0, 0, 0, 1} +var _2SpanID SpanID = [8]byte{0, 0, 0, 0, 0, 0, 0, 2} +var _16SpanID SpanID = [8]byte{0, 0, 0, 0, 0, 0x12, 0x34, 0x56} + func TestMOTracer_Start(t1 *testing.T) { type fields struct { TracerConfig TracerConfig @@ -29,7 +38,7 @@ func TestMOTracer_Start(t1 *testing.T) { name string opts []SpanOption } - rootCtx := ContextWithSpanContext(context.Background(), SpanContextWithIDs(1, 1)) + rootCtx := ContextWithSpanContext(context.Background(), SpanContextWithIDs(_1TraceID, _1SpanID)) tests := []struct { name string fields fields @@ -49,8 +58,8 @@ func TestMOTracer_Start(t1 *testing.T) { opts: []SpanOption{}, }, wantNewRoot: false, - wantTraceId: 1, - wantParentSpanId: 1, + wantTraceId: _1TraceID, + wantParentSpanId: _1SpanID, }, { name: "newRoot", @@ -63,8 +72,8 @@ func TestMOTracer_Start(t1 *testing.T) { opts: []SpanOption{WithNewRoot(true)}, }, wantNewRoot: true, - wantTraceId: 1, - wantParentSpanId: 1, + wantTraceId: _1TraceID, + wantParentSpanId: _1SpanID, }, } for _, tt := range tests { @@ -105,24 +114,24 @@ func TestSpanContext_MarshalTo(t *testing.T) { { name: "normal", fields: fields{ - TraceID: 0, - SpanID: 0x123456, + TraceID: nilTraceID, + SpanID: _16SpanID, }, - args: args{dAtA: make([]byte, 16)}, - want: 16, - // 1 2 3 4 5 6 7 8--1 2 3 4 5 6 7 8 - wantBytes: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x12, 0x34, 0x56}, + args: args{dAtA: make([]byte, 24)}, + want: 24, + // 1 2 3 4 5 6 7 8, 1 2 3 4 5 6 7 8--1 2 3 4 5 6 7 8 + wantBytes: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x12, 0x34, 0x56}, }, { name: "not-zero", fields: fields{ - TraceID: 0x0987654321FFFFFF, - SpanID: 0x123456, + TraceID: _10F0TraceID, + SpanID: _16SpanID, }, - args: args{dAtA: make([]byte, 16)}, - want: 16, - // 1 2 3 4 5 6 7 8--1 2 3 4 5 6 7 8 - wantBytes: []byte{0x09, 0x87, 0x65, 0x43, 0x21, 0xff, 0xff, 0xff, 0, 0, 0, 0, 0, 0x12, 0x34, 0x56}, + args: args{dAtA: make([]byte, 24)}, + want: 24, + // 1 2 3 4 5 6 7 8, 1 2 3 4 5 6 7 8--1 2 3 4 5 6 7 8 + wantBytes: []byte{0x09, 0x87, 0x65, 0x43, 0x21, 0xff, 0xff, 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x12, 0x34, 0x56}, }, } for _, tt := range tests { diff --git a/pkg/util/trace/ptrace.go b/pkg/util/trace/ptrace.go index 379820b38973703e5ebdfa5618979c49addfd50e..e974b82e1acfbcf4bbc33176666329a10344dab3 100644 --- a/pkg/util/trace/ptrace.go +++ b/pkg/util/trace/ptrace.go @@ -14,145 +14,28 @@ package trace -import ( - "sync" - "sync/atomic" - - "github.com/matrixorigin/matrixone/pkg/util" - ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" -) - -// tracerProviderConfig. -type tracerProviderConfig struct { - // spanProcessors contains collection of SpanProcessors that are processing pipeline - // for spans in the trace signal. - // SpanProcessors registered with a TracerProvider and are called at the start - // and end of a Span's lifecycle, and are called in the order they are - // registered. - spanProcessors []SpanProcessor - - enableTracer uint32 // see EnableTracer - - // idGenerator is used to generate all Span and Trace IDs when needed. - idGenerator IDGenerator - - // resource contains attributes representing an entity that produces telemetry. - resource *Resource // see WithMOVersion, WithNode, - - // TODO: can check span's END - debugMode bool // see DebugMode - - batchProcessMode string // see WithBatchProcessMode - - sqlExecutor func() ie.InternalExecutor // see WithSQLExecutor - - mux sync.RWMutex -} - -func (cfg *tracerProviderConfig) getNodeResource() *MONodeResource { - cfg.mux.RLock() - defer cfg.mux.RUnlock() - if val, has := cfg.resource.Get("Node"); !has { - return &MONodeResource{} - } else { - return val.(*MONodeResource) - } -} - -func (cfg *tracerProviderConfig) IsEnable() bool { - cfg.mux.RLock() - defer cfg.mux.RUnlock() - return atomic.LoadUint32(&cfg.enableTracer) == 1 -} - -func (cfg *tracerProviderConfig) EnableTracer(enable bool) { - cfg.mux.Lock() - defer cfg.mux.Unlock() - if enable { - atomic.StoreUint32(&cfg.enableTracer, 1) - } else { - atomic.StoreUint32(&cfg.enableTracer, 0) - } -} - -// TracerProviderOption configures a TracerProvider. -type TracerProviderOption interface { - apply(*tracerProviderConfig) -} - -type tracerProviderOptionFunc func(config *tracerProviderConfig) - -func (f tracerProviderOptionFunc) apply(config *tracerProviderConfig) { - f(config) -} - -func WithMOVersion(v string) tracerProviderOptionFunc { - return func(config *tracerProviderConfig) { - config.resource.Put("version", v) - } -} - -// WithNode give id as NodeId, t as NodeType -func WithNode(id int64, t NodeType) tracerProviderOptionFunc { - return func(cfg *tracerProviderConfig) { - cfg.resource.Put("Node", &MONodeResource{ - NodeID: id, - NodeType: t, - }) - } -} - -func EnableTracer(enable bool) tracerProviderOptionFunc { - return func(cfg *tracerProviderConfig) { - cfg.EnableTracer(enable) - } -} - -func DebugMode(debug bool) tracerProviderOptionFunc { - return func(cfg *tracerProviderConfig) { - cfg.debugMode = debug - } -} - -func WithBatchProcessMode(mode string) tracerProviderOptionFunc { - return func(cfg *tracerProviderConfig) { - cfg.batchProcessMode = mode - } -} - -func WithSQLExecutor(f func() ie.InternalExecutor) tracerProviderOptionFunc { - return func(cfg *tracerProviderConfig) { - cfg.sqlExecutor = f - } -} - -var _ IDGenerator = &MOTraceIdGenerator{} - -type MOTraceIdGenerator struct{} - -func (M MOTraceIdGenerator) NewIDs() (TraceID, SpanID) { - return TraceID(util.Fastrand64()), SpanID(util.Fastrand64()) -} - -func (M MOTraceIdGenerator) NewSpanID() SpanID { - return SpanID(util.Fastrand64()) -} - var _ TracerProvider = &MOTracerProvider{} type MOTracerProvider struct { tracerProviderConfig } -func newMOTracerProvider(opts ...TracerProviderOption) *MOTracerProvider { +func defaultMOTracerProvider() *MOTracerProvider { pTracer := &MOTracerProvider{ tracerProviderConfig{ enableTracer: 0, resource: newResource(), - idGenerator: &MOTraceIdGenerator{}, + idGenerator: &moIDGenerator{}, batchProcessMode: InternalExecutor, }, } + WithNode("node_uuid", NodeTypeNode).apply(&pTracer.tracerProviderConfig) + WithMOVersion("MatrixOne").apply(&pTracer.tracerProviderConfig) + return pTracer +} + +func newMOTracerProvider(opts ...TracerProviderOption) *MOTracerProvider { + pTracer := defaultMOTracerProvider() for _, opt := range opts { opt.apply(&pTracer.tracerProviderConfig) } diff --git a/pkg/util/trace/report_log.go b/pkg/util/trace/report_log.go index e1a901565c561018babb284a91b8e96722e001c0..3a06bee79d1017ff6a109c0cca90196d986cc66f 100644 --- a/pkg/util/trace/report_log.go +++ b/pkg/util/trace/report_log.go @@ -38,14 +38,14 @@ var _ batchpipe.HasName = &MOLog{} var _ IBuffer2SqlItem = &MOLog{} type MOLog struct { - StatementId TraceID `json:"statement_id"` - SpanId SpanID `json:"span_id"` - Timestamp util.TimeNano `json:"timestamp"` - Level zapcore.Level `json:"level"` - Caller util.Frame `json:"caller"` // like "util/trace/trace.go:666" - Name string `json:"name"` - Message string `json:"Message"` - Extra string `json:"extra"` // like json text + TraceID TraceID `json:"trace_id"` + SpanID SpanID `json:"span_id"` + Timestamp util.TimeNano `json:"timestamp"` + Level zapcore.Level `json:"level"` + Caller util.Frame `json:"caller"` // like "util/trace/trace.go:666" + Name string `json:"name"` + Message string `json:"Message"` + Extra string `json:"extra"` // like json text } func newMOLog() *MOLog { @@ -85,8 +85,8 @@ func ReportLog(ctx context.Context, level zapcore.Level, depth int, formatter st sc = span.SpanContext() } log := newMOLog() - log.StatementId = sc.TraceID - log.SpanId = sc.SpanID + log.TraceID = sc.TraceID + log.SpanID = sc.SpanID log.Timestamp = util.NowNS() log.Level = level log.Caller = util.Caller(depth + 1) @@ -127,12 +127,6 @@ func (m MOZap) Size() int64 { func (m MOZap) Free() {} -const moInternalFiledKeyNoopReport = "moInternalFiledKeyNoopReport" - -func NoReportFiled() zap.Field { - return zap.Bool(moInternalFiledKeyNoopReport, false) -} - func ReportZap(jsonEncoder zapcore.Encoder, entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) { var needReport = true if !gTracerProvider.IsEnable() { @@ -150,10 +144,6 @@ func ReportZap(jsonEncoder zapcore.Encoder, entry zapcore.Entry, fields []zapcor log.SpanContext = v.Interface.(*SpanContext) break } - if v.Type == zapcore.BoolType && v.Key == moInternalFiledKeyNoopReport { - needReport = false - break - } } if !needReport { log.Free() diff --git a/pkg/util/trace/report_log_test.go b/pkg/util/trace/report_log_test.go index 9236c097fbe89e587dd0252a843e2bcc5fe00d12..f0044c3f844be2deb422c528166ab28af577be3f 100644 --- a/pkg/util/trace/report_log_test.go +++ b/pkg/util/trace/report_log_test.go @@ -41,7 +41,7 @@ func TestReportLog(t *testing.T) { name: "close", enableTracer: false, args: args{ - ctx: ContextWithSpanContext(context.Background(), SpanContextWithIDs(0, 0)), + ctx: ContextWithSpanContext(context.Background(), SpanContextWithIDs(nilTraceID, nilSpanID)), level: zapcore.InfoLevel, depth: 3, formatter: "info message", @@ -52,7 +52,7 @@ func TestReportLog(t *testing.T) { name: "collect", enableTracer: true, args: args{ - ctx: ContextWithSpanContext(context.Background(), SpanContextWithIDs(0, 0)), + ctx: ContextWithSpanContext(context.Background(), SpanContextWithIDs(nilTraceID, nilSpanID)), level: zapcore.InfoLevel, depth: 3, formatter: "info message", diff --git a/pkg/util/trace/report_statement.go b/pkg/util/trace/report_statement.go index fc35d788e8927f4cf30eeabd7d50b5a1d7294476..bea23b384e7adb9f60f9e709b203486a1117f960 100644 --- a/pkg/util/trace/report_statement.go +++ b/pkg/util/trace/report_statement.go @@ -25,9 +25,9 @@ import ( var _ IBuffer2SqlItem = &StatementInfo{} type StatementInfo struct { - StatementID uint64 `json:"statement_id"` - TransactionID uint64 `json:"transaction_id"` - SessionID uint64 `jons:"session_id"` + StatementID [16]byte `json:"statement_id"` + TransactionID [16]byte `json:"transaction_id"` + SessionID [16]byte `jons:"session_id"` Account string `json:"account"` User string `json:"user"` Host string `json:"host"` diff --git a/pkg/util/trace/resource.go b/pkg/util/trace/resource.go deleted file mode 100644 index d6ef27893ce9889343bd1174085ff86056ce3995..0000000000000000000000000000000000000000 --- a/pkg/util/trace/resource.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2022 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package trace - -import "encoding/json" - -const ( - MOStatementType = "MOStatementType" - MOSpanType = "MOSpan" - MOLogType = "MOLog" - MOZapType = "MOZap" - MOErrorType = "MOError" -) - -const ( - B int64 = 1 << (iota * 10) - KB - MB - GB -) - -type Resource struct { - m map[string]any -} - -func newResource() *Resource { - return &Resource{m: make(map[string]any)} - -} - -func (r *Resource) Put(key string, val any) { - r.m[key] = val -} - -func (r *Resource) Get(key string) (any, bool) { - val, has := r.m[key] - return val, has -} - -// String need to improve -func (r *Resource) String() string { - buf, _ := json.Marshal(r.m) - return string(buf) - -} - -type NodeType int - -const ( - NodeTypeNode NodeType = iota - NodeTypeCN - NodeTypeDN - NodeTypeLogService -) - -func (t NodeType) String() string { - switch t { - case NodeTypeNode: - return "Node" - case NodeTypeCN: - return "CN" - case NodeTypeDN: - return "DN" - case NodeTypeLogService: - return "LogService" - default: - return "Unknown" - } -} - -type MONodeResource struct { - NodeID int64 `json:"node_id"` - NodeType NodeType `json:"node_type"` -} diff --git a/pkg/util/trace/schema.go b/pkg/util/trace/schema.go index 6a27281dbc4a5c7c0a62d77542b25272910fa8e2..c479984057e1e0398c55b233ca43c16b5663daa2 100644 --- a/pkg/util/trace/schema.go +++ b/pkg/util/trace/schema.go @@ -36,10 +36,10 @@ const ( sqlCreateDBConst = `create database if not exists ` + statsDatabase sqlCreateSpanInfoTable = `CREATE TABLE IF NOT EXISTS span_info( - span_id BIGINT UNSIGNED, - statement_id BIGINT UNSIGNED, - parent_span_id BIGINT UNSIGNED, - node_id BIGINT COMMENT "node uuid in MO", + span_id varchar(16), + statement_id varchar(36), + parent_span_id varchar(16), + node_uuid varchar(36) COMMENT "node uuid in MO, which node accept this request", node_type varchar(64) COMMENT "node type in MO, enum: DN, CN, LogService;", name varchar(1024) COMMENT "span name, for example: step name of execution plan, function name in code, ...", start_time datetime, @@ -48,9 +48,9 @@ const ( resource varchar(4096) COMMENT "json, static resource informations /*should by json type*/" )` sqlCreateLogInfoTable = `CREATE TABLE IF NOT EXISTS log_info( - statement_id BIGINT UNSIGNED, - span_id BIGINT UNSIGNED, - node_id BIGINT COMMENT "node uuid in MO", + statement_id varchar(36), + span_id varchar(16), + node_uuid varchar(36) COMMENT "node uuid in MO, which node accept this request", node_type varchar(64) COMMENT "node type in MO, enum: DN, CN, LogService;", timestamp datetime COMMENT "log timestamp", name varchar(1024) COMMENT "logger name", @@ -60,9 +60,9 @@ const ( extra varchar(4096) COMMENT "log extra fields, json" )` sqlCreateStatementInfoTable = `CREATE TABLE IF NOT EXISTS statement_info( - statement_id BIGINT UNSIGNED, - transaction_id BIGINT UNSIGNED, - session_id BIGINT UNSIGNED, + statement_id varchar(36), + transaction_id varchar(36), + session_id varchar(36), ` + "`account`" + ` varchar(1024) COMMENT 'account name', user varchar(1024) COMMENT 'user name', host varchar(1024) COMMENT 'user client ip', @@ -70,16 +70,16 @@ const ( statement varchar(10240) COMMENT 'sql statement/*TODO: should by TEXT, or BLOB */', statement_tag varchar(1024), statement_fingerprint varchar(40960) COMMENT 'sql statement fingerprint/*TYPE should by TEXT, longer*/', - node_id BIGINT COMMENT "node uuid in MO, which node accept this request", + node_uuid varchar(36) COMMENT "node uuid in MO, which node accept this request", node_type varchar(64) COMMENT "node type in MO, enum: DN, CN, LogService;", request_at datetime, status varchar(1024) COMMENT 'sql statement running status, enum: Running, Success, Failed', exec_plan varchar(4096) COMMENT "sql execution plan; /*TODO: 应为JSON 类型*/" )` sqlCreateErrorInfoTable = `CREATE TABLE IF NOT EXISTS error_info( - statement_id BIGINT UNSIGNED, - span_id BIGINT UNSIGNED, - node_id BIGINT COMMENT "node uuid in MO, which node accept this request", + statement_id varchar(36), + span_id varchar(16), + node_uuid varchar(36) COMMENT "node uuid in MO, which node accept this request", node_type varchar(64) COMMENT "node type in MO, enum: DN, CN, LogService;", err_code varchar(1024), stack varchar(4096), @@ -97,7 +97,7 @@ func InitSchemaByInnerExecutor(ctx context.Context, ieFactory func() ie.Internal exec.ApplySessionOverride(ie.NewOptsBuilder().Database(statsDatabase).Internal(true).Finish()) mustExec := func(sql string) { if err := exec.Exec(ctx, sql, ie.NewOptsBuilder().Finish()); err != nil { - panic(fmt.Sprintf("[Metric] init metric tables error: %v, sql: %s", err, sql)) + panic(fmt.Errorf("[Metric] init metric tables error: %v, sql: %s", err, sql)) } } diff --git a/pkg/util/trace/trace.go b/pkg/util/trace/trace.go index 34dc4603938ef999371897c42eeadcdcb8104ebb..3a06e70f4d4aebad650b4f60e34f4975ad0311c7 100644 --- a/pkg/util/trace/trace.go +++ b/pkg/util/trace/trace.go @@ -28,17 +28,9 @@ import ( "go.uber.org/zap" ) -const ( - InternalExecutor = "InternalExecutor" - FileService = "FileService" -) - -type TraceID uint64 -type SpanID uint64 - var gTracerProvider *MOTracerProvider var gTracer Tracer -var gTraceContext context.Context = context.Background() +var gTraceContext = context.Background() var gSpanContext atomic.Value func Init(ctx context.Context, opts ...TracerProviderOption) (context.Context, error) { @@ -57,7 +49,9 @@ func Init(ctx context.Context, opts ...TracerProviderOption) (context.Context, e gTracer = gTracerProvider.Tracer("MatrixOrigin") // init Node DefaultContext - sc := SpanContextWithIDs(TraceID(0), SpanID(config.getNodeResource().NodeID)) + var spanId SpanID + spanId.SetByUUID(config.getNodeResource().NodeUuid) + sc := SpanContextWithIDs(nilTraceID, spanId) gSpanContext.Store(&sc) gTraceContext = ContextWithSpanContext(ctx, sc) @@ -70,7 +64,7 @@ func Init(ctx context.Context, opts ...TracerProviderOption) (context.Context, e func initExport(ctx context.Context, config *tracerProviderConfig) { if !config.IsEnable() { - logutil2.Infof(context.TODO(), "initExport pass.") + logutil.Info("initExport pass.") return } var p export.BatchProcessor @@ -97,8 +91,8 @@ func initExport(ctx context.Context, config *tracerProviderConfig) { } if p != nil { config.spanProcessors = append(config.spanProcessors, NewBatchSpanProcessor(p)) - logutil2.Infof(context.TODO(), "trace span processor") - logutil2.Info(context.TODO(), "[Debug]", zap.String("operation", "value1"), zap.String("operation_1", "value2")) + logutil.Info("trace span processor") + logutil.Info("[Debug]", zap.String("operation", "value1"), zap.String("operation_1", "value2")) } } diff --git a/pkg/util/trace/trace_test.go b/pkg/util/trace/trace_test.go index f3aad1fc253c53659a251abcac3f40e8f83eb07e..04097030af6cd08ec4e211e29d8b5c5e6803cfcf 100644 --- a/pkg/util/trace/trace_test.go +++ b/pkg/util/trace/trace_test.go @@ -74,7 +74,7 @@ func TestDefaultContext(t *testing.T) { }{ { name: "normal", - want: ContextWithSpanContext(context.Background(), SpanContextWithIDs(0, 0)), + want: ContextWithSpanContext(context.Background(), SpanContextWithIDs(nilTraceID, nilSpanID)), }, } for _, tt := range tests { @@ -85,7 +85,7 @@ func TestDefaultContext(t *testing.T) { } func TestDefaultSpanContext(t *testing.T) { - sc := SpanContextWithIDs(0, 0) + sc := SpanContextWithIDs(nilTraceID, nilSpanID) tests := []struct { name string want *SpanContext @@ -109,7 +109,7 @@ func TestGetNodeResource(t *testing.T) { }{ { name: "normal", - want: &MONodeResource{0, NodeTypeNode}, + want: &MONodeResource{"node_uuid", NodeTypeNode}, }, } for _, tt := range tests { diff --git a/pkg/util/uuidutil.go b/pkg/util/uuidutil.go new file mode 100644 index 0000000000000000000000000000000000000000..0cd65f8776f80b6e6f6fc3ff5cd768f801125671 --- /dev/null +++ b/pkg/util/uuidutil.go @@ -0,0 +1,78 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "bytes" + "encoding/binary" + "errors" + "github.com/google/uuid" + "hash/fnv" + "net" +) + +var ( + ErrNoHardwareAddr = errors.New("uuid: no Hardware address found") + ErrUUIDNodeIDTooShort = errors.New("uuid: nodeID too short") +) + +// getDefaultHardwareAddr returns hardware address(like mac addr), like the first valid val. +func getDefaultHardwareAddr() (net.HardwareAddr, error) { + ifaces, err := net.Interfaces() + if err != nil { + return []byte{}, err + } + for _, iface := range ifaces { + if len(iface.HardwareAddr) >= 6 { + return iface.HardwareAddr, nil + } + } + return []byte{}, ErrNoHardwareAddr +} + +// docker mac addr range: [02:42:ac:11:00:00, 02:42:ac:11:ff:ff] +var dockerMacPrefix = []byte{0x02, 0x42, 0xac} + +// SetUUIDNodeID set all uuid generator's node_id +func SetUUIDNodeID(nodeUuid []byte) error { + var err error + var hwAddr []byte + // situation 1: set by mac addr + hwAddr, err = getDefaultHardwareAddr() + if err == nil && !bytes.Equal(hwAddr[:len(dockerMacPrefix)], dockerMacPrefix) && uuid.SetNodeID(hwAddr) { + return nil + } + // case 2: set by nodeUuid prefix + if len(nodeUuid) > 6 && uuid.SetNodeID(nodeUuid) { + return nil + } + // case 3: set by md5sum{hwAddr + nodeUuid, randomUint64} + var num [8]byte + binary.BigEndian.PutUint64(num[:], Fastrand64()) + hasher := fnv.New128() + if _, err = hasher.Write(hwAddr); err != nil { + return err + } else if _, err := hasher.Write(nodeUuid); err != nil { + return err + } else if _, err := hasher.Write(num[:]); err != nil { + return err + } + var hash = make([]byte, 0, 16) + hash = hasher.Sum(hash[:]) + if !uuid.SetNodeID(hash[:]) { + return ErrUUIDNodeIDTooShort + } + return nil +}