From a5ca47dab0a78bb0812cc07caf6913c7c311e404 Mon Sep 17 00:00:00 2001 From: Jackson <xzxiong@yeah.net> Date: Wed, 17 Aug 2022 15:36:49 +0800 Subject: [PATCH] SpanContext add MarshalTo/Unmarshal support morpc (de)serialize (#4539) - SpanContext support MarshalTo/Unmarshal - how-to-use is in [pkg/util/trace/example/main.go](https://github.com/matrixorigin/matrixone/compare/main...xzxiong:trace_morpc) Approved by: @fengttt --- pkg/util/trace/example/main.go | 48 ++++++++--- pkg/util/trace/mo_trace.go | 39 +++++++-- pkg/util/trace/mo_trace_test.go | 146 ++++++++++++++++++++++++++++++++ pkg/util/trace/trace.go | 6 +- 4 files changed, 213 insertions(+), 26 deletions(-) create mode 100644 pkg/util/trace/mo_trace_test.go diff --git a/pkg/util/trace/example/main.go b/pkg/util/trace/example/main.go index 9b478d8e2..0ea34f56c 100644 --- a/pkg/util/trace/example/main.go +++ b/pkg/util/trace/example/main.go @@ -153,38 +153,62 @@ func errorUsage(ctx context.Context) { } +type FunctionRequest struct { + trace.SpanContext +} + type rpcRequest struct { - TraceId trace.TraceID - SpanId trace.SpanID + message []byte } type rpcResponse struct { message string } type rpcServer struct { } +type rpcClient struct { +} + +var gServer = &rpcServer{} func rpcUsage(ctx context.Context) { - traceId, spanId := trace.SpanFromContext(ctx).SpanContext().GetIDs() - req := &rpcRequest{ - TraceId: traceId, - SpanId: spanId, + newCtx, span := trace.Start(ctx, "rpcUsage", trace.WithNewRoot(true)) + defer span.End() + req := &FunctionRequest{ + SpanContext: trace.SpanFromContext(newCtx).SpanContext(), } - _ = remoteCallFunction(ctx, req) + logutil2.Info(newCtx, "client call Function") + _ = callFunction(newCtx, req) } -func remoteCallFunction(ctx context.Context, req *rpcRequest) error { - s := &rpcServer{} - resp, err := s.Function(ctx, req) +func callFunction(ctx context.Context, req *FunctionRequest) error { + rpcReq := &rpcRequest{message: make([]byte, 16)} + if _, err := req.SpanContext.MarshalTo(rpcReq.message); err != nil { + logutil.Errorf("callFunction: %v", err) + return err + } + + client := &rpcClient{} + resp, err := client.Function(ctx, rpcReq) logutil2.Infof(ctx, "resp: %s", resp.message) return err } +func (s *rpcClient) Function(ctx context.Context, req *rpcRequest) (*rpcResponse, error) { + return gServer.Function(ctx, req) +} + func (s *rpcServer) Function(ctx context.Context, req *rpcRequest) (*rpcResponse, error) { - rootCtx := trace.ContextWithSpanContext(ctx, trace.SpanContextWithIDs(req.TraceId, req.SpanId)) + + var sc trace.SpanContext + if err := sc.Unmarshal(req.message); err != nil { + return &rpcResponse{message: "error"}, err + } + rootCtx := trace.ContextWithSpanContext(ctx, sc) + logutil2.Info(rootCtx, "server accept request") newCtx, span := trace.Start(rootCtx, "Function") defer span.End() - logutil2.Info(newCtx, "do Function") + logutil2.Info(newCtx, "server do Function, have same TraceId from client.") return &rpcResponse{message: "success"}, nil } diff --git a/pkg/util/trace/mo_trace.go b/pkg/util/trace/mo_trace.go index 9e72a678f..c9996792f 100644 --- a/pkg/util/trace/mo_trace.go +++ b/pkg/util/trace/mo_trace.go @@ -17,20 +17,20 @@ package trace import ( "bytes" "context" + "encoding/binary" "encoding/hex" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "io" "sync" "unsafe" "github.com/matrixorigin/matrixone/pkg/util" - "github.com/matrixorigin/matrixone/pkg/util/batchpipe" ) // TracerConfig is a group of options for a Tracer. type TracerConfig struct { - Name string - reminder batchpipe.Reminder // WithReminder + Name string } // TracerOption applies an option to a TracerConfig. @@ -38,18 +38,14 @@ type TracerOption interface { apply(*TracerConfig) } +var _ TracerOption = tracerOptionFunc(nil) + type tracerOptionFunc func(*TracerConfig) func (f tracerOptionFunc) apply(cfg *TracerConfig) { f(cfg) } -func WithReminder(r batchpipe.Reminder) tracerOptionFunc { - return tracerOptionFunc(func(cfg *TracerConfig) { - cfg.reminder = r - }) -} - const ( // FlagsSampled is a bitmask with the sampled bit set. A SpanContext // with the sampling bit set means the span is sampled. @@ -95,6 +91,7 @@ func (t *MOTracer) Start(ctx context.Context, name string, opts ...SpanOption) ( if span.NewRoot { span.TraceID, span.SpanID = t.provider.idGenerator.NewIDs() + span.parent = noopSpan{} } else { span.TraceID, span.SpanID = parent.SpanContext().TraceID, t.provider.idGenerator.NewSpanID() span.parent = parent @@ -140,6 +137,30 @@ func (c *SpanContext) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } +func (c *SpanContext) Size() (n int) { + return 16 +} + +func (c *SpanContext) MarshalTo(dAtA []byte) (int, error) { + l := cap(dAtA) + if l < c.Size() { + return -1, io.ErrUnexpectedEOF + } + binary.BigEndian.PutUint64(dAtA, uint64(c.TraceID)) + binary.BigEndian.PutUint64(dAtA[8:], uint64(c.SpanID)) + return c.Size(), nil +} + +func (c *SpanContext) Unmarshal(dAtA []byte) error { + l := cap(dAtA) + if l < c.Size() { + return io.ErrUnexpectedEOF + } + c.TraceID = TraceID(binary.BigEndian.Uint64(dAtA)) + c.SpanID = SpanID(binary.BigEndian.Uint64(dAtA[8:])) + return nil +} + func SpanContextWithID(id TraceID) SpanContext { return SpanContext{TraceID: id} } diff --git a/pkg/util/trace/mo_trace_test.go b/pkg/util/trace/mo_trace_test.go new file mode 100644 index 000000000..3c4cd82e1 --- /dev/null +++ b/pkg/util/trace/mo_trace_test.go @@ -0,0 +1,146 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "context" + "github.com/stretchr/testify/require" + "testing" +) + +func TestMOTracer_Start(t1 *testing.T) { + type fields struct { + TracerConfig TracerConfig + } + type args struct { + ctx context.Context + name string + opts []SpanOption + } + rootCtx := ContextWithSpanContext(context.Background(), SpanContextWithIDs(1, 1)) + tests := []struct { + name string + fields fields + args args + wantNewRoot bool + wantTraceId TraceID + wantParentSpanId SpanID + }{ + { + name: "normal", + fields: fields{ + TracerConfig: TracerConfig{Name: "normal"}, + }, + args: args{ + ctx: rootCtx, + name: "normal", + opts: []SpanOption{}, + }, + wantNewRoot: false, + wantTraceId: 1, + wantParentSpanId: 1, + }, + { + name: "newRoot", + fields: fields{ + TracerConfig: TracerConfig{Name: "newRoot"}, + }, + args: args{ + ctx: rootCtx, + name: "newRoot", + opts: []SpanOption{WithNewRoot(true)}, + }, + wantNewRoot: true, + wantTraceId: 1, + wantParentSpanId: 1, + }, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := &MOTracer{ + TracerConfig: tt.fields.TracerConfig, + provider: gTracerProvider, + } + newCtx, span := t.Start(tt.args.ctx, tt.args.name, tt.args.opts...) + if !tt.wantNewRoot { + require.Equal(t1, tt.wantTraceId, span.SpanContext().TraceID) + require.Equal(t1, tt.wantParentSpanId, span.ParentSpanContext().SpanID) + require.Equal(t1, tt.wantParentSpanId, SpanFromContext(newCtx).ParentSpanContext().SpanID) + } else { + require.NotEqual(t1, tt.wantTraceId, span.SpanContext().TraceID) + require.NotEqual(t1, tt.wantParentSpanId, span.ParentSpanContext().SpanID) + require.NotEqual(t1, tt.wantParentSpanId, SpanFromContext(newCtx).ParentSpanContext().SpanID) + } + }) + } +} + +func TestSpanContext_MarshalTo(t *testing.T) { + type fields struct { + TraceID TraceID + SpanID SpanID + } + type args struct { + dAtA []byte + } + tests := []struct { + name string + fields fields + args args + want int + wantBytes []byte + }{ + { + name: "normal", + fields: fields{ + TraceID: 0, + SpanID: 0x123456, + }, + args: args{dAtA: make([]byte, 16)}, + want: 16, + // 1 2 3 4 5 6 7 8--1 2 3 4 5 6 7 8 + wantBytes: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x12, 0x34, 0x56}, + }, + { + name: "not-zero", + fields: fields{ + TraceID: 0x0987654321FFFFFF, + SpanID: 0x123456, + }, + args: args{dAtA: make([]byte, 16)}, + want: 16, + // 1 2 3 4 5 6 7 8--1 2 3 4 5 6 7 8 + wantBytes: []byte{0x09, 0x87, 0x65, 0x43, 0x21, 0xff, 0xff, 0xff, 0, 0, 0, 0, 0, 0x12, 0x34, 0x56}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &SpanContext{ + TraceID: tt.fields.TraceID, + SpanID: tt.fields.SpanID, + } + got, err := c.MarshalTo(tt.args.dAtA) + require.Equal(t, nil, err) + require.Equal(t, got, tt.want) + require.Equal(t, tt.wantBytes, tt.args.dAtA) + newC := &SpanContext{} + err = newC.Unmarshal(tt.args.dAtA) + require.Equal(t, nil, err) + require.Equal(t, c, newC) + require.Equal(t, tt.fields.TraceID, newC.TraceID) + require.Equal(t, tt.fields.SpanID, newC.SpanID) + }) + } +} diff --git a/pkg/util/trace/trace.go b/pkg/util/trace/trace.go index c7f38723c..34dc46039 100644 --- a/pkg/util/trace/trace.go +++ b/pkg/util/trace/trace.go @@ -18,12 +18,10 @@ import ( "context" goErrors "errors" "sync/atomic" - "time" "unsafe" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/logutil/logutil2" - "github.com/matrixorigin/matrixone/pkg/util/batchpipe" "github.com/matrixorigin/matrixone/pkg/util/errors" "github.com/matrixorigin/matrixone/pkg/util/export" @@ -56,9 +54,7 @@ func Init(ctx context.Context, opts ...TracerProviderOption) (context.Context, e config := &gTracerProvider.tracerProviderConfig // init Tracer - gTracer = gTracerProvider.Tracer("MatrixOrigin", - WithReminder(batchpipe.NewConstantClock(5*time.Second)), - ) + gTracer = gTracerProvider.Tracer("MatrixOrigin") // init Node DefaultContext sc := SpanContextWithIDs(TraceID(0), SpanID(config.getNodeResource().NodeID)) -- GitLab