Skip to content
Snippets Groups Projects
Unverified Commit 0008d897 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

morpc: add trace support (#4602)

Support trace context transfer via morpc framework

Approved by: @reusee
parent 133b4b3c
No related branches found
No related tags found
No related merge requests found
......@@ -56,6 +56,7 @@ func newMessageCodec(messageFactory func() Message, payloadCopyBufSize int, enab
}
c := &messageCodec{codec: length.New(bc), bc: bc}
c.AddHeaderCodec(&deadlineContextCodec{})
c.AddHeaderCodec(&traceCodec{})
return c
}
......
......@@ -20,6 +20,7 @@ import (
"time"
"github.com/fagongzi/goetty/v2/buf"
"github.com/matrixorigin/matrixone/pkg/util/trace"
)
type deadlineContextCodec struct {
......@@ -50,3 +51,43 @@ func (hc *deadlineContextCodec) Decode(msg *RPCMessage, data []byte) (int, error
msg.Ctx, msg.cancel = context.WithTimeout(msg.Ctx, time.Duration(buf.Byte2Int64(data)))
return 8, nil
}
type traceCodec struct {
}
func (hc *traceCodec) Encode(msg *RPCMessage, out *buf.ByteBuf) (int, error) {
if msg.Ctx == nil {
return 0, nil
}
span := trace.SpanFromContext(msg.Ctx)
c := span.SpanContext()
n := c.Size()
out.MustWriteByte(byte(n))
out.Grow(n)
idx := out.GetWriteIndex()
out.SetWriteIndex(idx + n)
c.MarshalTo(out.RawSlice(idx, idx+n))
return 1 + n, nil
}
func (hc *traceCodec) Decode(msg *RPCMessage, data []byte) (int, error) {
if len(data) < 1 {
return 0, io.ErrShortBuffer
}
if len(data) < int(data[0]) {
return 0, io.ErrShortBuffer
}
c := &trace.SpanContext{}
if err := c.Unmarshal(data[1 : 1+data[0]]); err != nil {
return 0, err
}
if msg.Ctx == nil {
msg.Ctx = context.Background()
}
msg.Ctx = trace.ContextWithSpanContext(msg.Ctx, *c)
return int(1 + data[0]), nil
}
......@@ -20,6 +20,7 @@ import (
"time"
"github.com/fagongzi/goetty/v2/buf"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/stretchr/testify/assert"
)
......@@ -47,3 +48,29 @@ func TestDecodeContext(t *testing.T) {
assert.True(t, ok)
assert.True(t, !ts.IsZero())
}
func TestEncodeAndDecodeTrace(t *testing.T) {
hc := &traceCodec{}
out := buf.NewByteBuf(8)
span := trace.SpanContextWithID(trace.TraceID(1))
n, err := hc.Encode(&RPCMessage{Ctx: trace.ContextWithSpanContext(context.Background(), span)}, out)
assert.Equal(t, 1+span.Size(), n)
assert.NoError(t, err)
msg := &RPCMessage{}
_, data := out.ReadBytes(1 + span.Size())
n, err = hc.Decode(msg, nil)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = hc.Decode(msg, data[:1])
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = hc.Decode(msg, data)
assert.Equal(t, 1+span.Size(), n)
assert.NoError(t, err)
assert.Equal(t, span, trace.SpanFromContext(msg.Ctx).SpanContext())
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment