diff --git a/go.mod b/go.mod index 1cd6d693d8c7db06158f57d4450fabff35375ed9..9216a645cecb9ad478b9b486397453e2bfad61f7 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 github.com/cockroachdb/pebble v0.0.0-20220407171941-2120d145e292 github.com/fagongzi/goetty v1.13.0 - github.com/fagongzi/goetty/v2 v2.0.3-0.20220624135603-5923140593a7 + github.com/fagongzi/goetty/v2 v2.0.3-0.20220629100032-f8e8c3c38772 github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d github.com/go-sql-driver/mysql v1.6.0 github.com/gogo/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 41569ceecdae88a68eb7adb0cd8aedf28dc57fcd..7d1802ac830ec04b4c3aa69bdefb1aa494211144 100644 --- a/go.sum +++ b/go.sum @@ -213,8 +213,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/fagongzi/goetty v1.13.0 h1:k1RT+32/O97zu370BhHk8sL/kSAqF2ebSY1Nj+x/dBg= github.com/fagongzi/goetty v1.13.0/go.mod h1:M1HA6OyQA3/4bV70W588Uek6sF7NxgxT/3gGhlf0yj0= -github.com/fagongzi/goetty/v2 v2.0.3-0.20220624135603-5923140593a7 h1:BjZR38MOC8566qCTsU6Hign3a/jNkkx4kje9I5jBrrA= -github.com/fagongzi/goetty/v2 v2.0.3-0.20220624135603-5923140593a7/go.mod h1:OwIBpVwRW1HjF/Jhc2Av3UvG2NygMg+bdqGxZaqwhU0= +github.com/fagongzi/goetty/v2 v2.0.3-0.20220629100032-f8e8c3c38772 h1:J0QvYEFfz8rIwqvJTNphrHj+bdCPMdoPRj9SNAWFxjM= +github.com/fagongzi/goetty/v2 v2.0.3-0.20220629100032-f8e8c3c38772/go.mod h1:OwIBpVwRW1HjF/Jhc2Av3UvG2NygMg+bdqGxZaqwhU0= github.com/fagongzi/util v0.0.0-20201116094402-221cc40c4593/go.mod h1:jYDIbpaqHXCCQ7QIDXRVfsQYAGKSNNb6N8BPTgdpcdE= github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d h1:1pILVCatHj3eVo9i52dZyY4BwjTmSIeN+/hoJh8rD0Y= github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d/go.mod h1:5cqSns2zMRcJeVGvAqeTrbXFqh5AqBFr5uVKP9T2kiE= diff --git a/pkg/common/morpc/backend.go b/pkg/common/morpc/backend.go index b69110110655af1d8303670ec5725311405f0e71..e95b542efc5f13d43d6859b3cdf8f0416f8629fd 100644 --- a/pkg/common/morpc/backend.go +++ b/pkg/common/morpc/backend.go @@ -15,16 +15,13 @@ package morpc import ( - "bytes" "context" - "encoding/hex" "fmt" "sync" + "sync/atomic" "time" "github.com/fagongzi/goetty/v2" - "github.com/fagongzi/util/hack" - "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/common/stop" "github.com/matrixorigin/matrixone/pkg/logutil" "go.uber.org/zap" @@ -126,8 +123,12 @@ type remoteBackend struct { mu struct { sync.RWMutex - futures map[string]*Future - streams map[string]*stream + futures map[uint64]*Future + streams map[uint64]*stream + } + + atomic struct { + id uint64 } } @@ -157,8 +158,8 @@ func NewRemoteBackend( }, } rb.writeC = make(chan *Future, rb.options.bufferSize) - rb.mu.futures = make(map[string]*Future, rb.options.bufferSize) - rb.mu.streams = make(map[string]*stream, rb.options.bufferSize) + rb.mu.futures = make(map[uint64]*Future, rb.options.bufferSize) + rb.mu.streams = make(map[uint64]*stream, rb.options.bufferSize) rb.conn = goetty.NewIOSession(rb.options.goettyOptions...) if rb.options.connect { @@ -211,6 +212,8 @@ func (rb *remoteBackend) adjust() { } func (rb *remoteBackend) Send(ctx context.Context, request Message, opts SendOptions) (*Future, error) { + request.SetID(rb.nextID()) + f := rb.newFuture() f.init(ctx, request, opts, false) if err := rb.doSend(f); err != nil { @@ -229,8 +232,8 @@ func (rb *remoteBackend) NewStream(receiveChanBuffer int) (Stream, error) { rb.mu.Lock() defer rb.mu.Unlock() - st := newStream(make(chan Message, receiveChanBuffer), rb.doSend) - rb.mu.streams[hack.SliceToString(st.ID())] = st + st := newStream(rb.nextID(), make(chan Message, receiveChanBuffer), rb.doSend) + rb.mu.streams[st.ID()] = st return st, nil } @@ -371,7 +374,7 @@ func (rb *remoteBackend) writeLoop(ctx context.Context) { writeTimeout += f.opts.Timeout if err := rb.conn.Write(f.request, goetty.WriteOptions{}); err != nil { rb.logger.Error("write request failed", - zap.String("request-id", hex.EncodeToString(f.request.ID())), + zap.Uint64("request-id", f.request.GetID()), zap.Error(err)) retry = true written = 0 @@ -385,7 +388,7 @@ func (rb *remoteBackend) writeLoop(ctx context.Context) { if err := rb.conn.Flush(writeTimeout); err != nil { for _, f := range sendFutures { rb.logger.Error("write request failed", - zap.String("request-id", hex.EncodeToString(f.request.ID())), + zap.Uint64("request-id", f.request.GetID()), zap.Error(err)) } retry = true @@ -498,7 +501,7 @@ func (rb *remoteBackend) requestDone(response Message) { rb.mu.Lock() defer rb.mu.Unlock() - id := hack.SliceToString(response.ID()) + id := response.GetID() if f, ok := rb.mu.futures[id]; ok { if !f.stream { delete(rb.mu.futures, id) @@ -508,18 +511,17 @@ func (rb *remoteBackend) requestDone(response Message) { } func (rb *remoteBackend) addFuture(f *Future) { - id := hack.SliceToString(f.request.ID()) rb.mu.Lock() defer rb.mu.Unlock() - rb.mu.futures[id] = f + rb.mu.futures[f.request.GetID()] = f } func (rb *remoteBackend) releaseFuture(f *Future) { rb.mu.Lock() defer rb.mu.Unlock() - delete(rb.mu.futures, hack.SliceToString(f.id)) + delete(rb.mu.futures, f.id) if !f.stream { f.reset() rb.futurePool.Put(f) @@ -593,6 +595,10 @@ func (rb *remoteBackend) newFuture() *Future { return rb.futurePool.Get().(*Future) } +func (rb *remoteBackend) nextID() uint64 { + return atomic.AddUint64(&rb.atomic.id, 1) +} + type goettyBasedBackendFactory struct { codec Codec options []BackendOption @@ -610,7 +616,7 @@ func (bf *goettyBasedBackendFactory) Create(remote string) (Backend, error) { } type stream struct { - id []byte + id uint64 ctx context.Context cancel context.CancelFunc c chan Message @@ -622,11 +628,10 @@ type stream struct { } } -func newStream(c chan Message, sendFunc func(f *Future) error) *stream { +func newStream(id uint64, c chan Message, sendFunc func(f *Future) error) *stream { ctx, cancel := context.WithCancel(context.Background()) - id := uuid.New() return &stream{ - id: id[:], + id: id, c: c, sendFunc: sendFunc, ctx: ctx, @@ -635,7 +640,7 @@ func newStream(c chan Message, sendFunc func(f *Future) error) *stream { } func (s *stream) Send(request Message, opts SendOptions) error { - if !bytes.Equal(s.id, request.ID()) { + if s.id != request.GetID() { panic("request.id != stream.id") } @@ -682,6 +687,6 @@ func (s *stream) Close() error { return nil } -func (s *stream) ID() []byte { +func (s *stream) ID() uint64 { return s.id } diff --git a/pkg/common/morpc/backend_test.go b/pkg/common/morpc/backend_test.go index 416066eb8b701fa8d4cbae06447e0302652a0c3b..ac44cd90a45852fd76d9cc13d97469af72ac0783 100644 --- a/pkg/common/morpc/backend_test.go +++ b/pkg/common/morpc/backend_test.go @@ -16,12 +16,14 @@ package morpc import ( "context" + "fmt" "os" "sync" "testing" "time" "github.com/fagongzi/goetty/v2" + "github.com/fagongzi/goetty/v2/buf" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -40,7 +42,7 @@ func TestSend(t *testing.T) { func(b *remoteBackend) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - req := &testMessage{id: []byte("id1")} + req := newTestMessage(1) f, err := b.Send(ctx, req, SendOptions{}) assert.NoError(t, err) defer f.Close() @@ -61,7 +63,7 @@ func TestSendWithResetConnAndRetry(t *testing.T) { func(b *remoteBackend) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - req := &testMessage{id: []byte("id1")} + req := &testMessage{id: 1} f, err := b.Send(ctx, req, SendOptions{}) assert.NoError(t, err) defer f.Close() @@ -85,7 +87,7 @@ func TestSendWithTimeout(t *testing.T) { func(b *remoteBackend) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() - req := &testMessage{id: []byte("id1")} + req := &testMessage{id: 1} f, err := b.Send(ctx, req, SendOptions{}) assert.NoError(t, err) defer f.Close() @@ -191,7 +193,7 @@ func TestBusy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() - req := &testMessage{id: []byte("id1")} + req := &testMessage{id: 1} f1, err := b.Send(ctx, req, SendOptions{}) assert.NoError(t, err) defer f1.Close() @@ -219,7 +221,7 @@ func TestBusy(t *testing.T) { func TestDoneWithClosedStreamCannotPanic(t *testing.T) { var f *Future c := make(chan Message, 1) - s := newStream(c, func(v *Future) error { + s := newStream(1, c, func(v *Future) error { f = v return nil }) @@ -297,7 +299,7 @@ func (b *testBackend) Send(ctx context.Context, request Message, opts SendOption } func (b *testBackend) NewStream(bufferSize int) (Stream, error) { - return newStream(make(chan Message, bufferSize), func(f *Future) error { + return newStream(1, make(chan Message, bufferSize), func(f *Future) error { return nil }), nil } @@ -306,33 +308,37 @@ func (b *testBackend) Close() {} func (b *testBackend) Busy() bool { return b.busy } type testMessage struct { - id []byte + id uint64 payload []byte } -func newTestMessage(id []byte) *testMessage { +func newTestMessage(id uint64) *testMessage { return &testMessage{id: id} } -func (tm *testMessage) ID() []byte { +func (tm *testMessage) SetID(id uint64) { + tm.id = id +} + +func (tm *testMessage) GetID() uint64 { return tm.id } func (tm *testMessage) DebugString() string { - return string(tm.id) + return fmt.Sprintf("%d", tm.id) } func (tm *testMessage) Size() int { - return len(tm.id) + return 8 } func (tm *testMessage) MarshalTo(data []byte) (int, error) { - return copy(data, tm.id), nil + buf.Uint64ToBytesTo(tm.id, data) + return 8, nil } func (tm *testMessage) Unmarshal(data []byte) error { - tm.id = make([]byte, len(data)) - copy(tm.id, data) + tm.id = buf.Byte2Uint64(data) return nil } diff --git a/pkg/common/morpc/codec_test.go b/pkg/common/morpc/codec_test.go index f587753e026c832436e054103bbdac73d0d900ae..f23d9bd397995833dea0eeca7d401412c8935a95 100644 --- a/pkg/common/morpc/codec_test.go +++ b/pkg/common/morpc/codec_test.go @@ -25,7 +25,7 @@ func TestEncodeAndDecode(t *testing.T) { codec := newTestCodec() buf := buf.NewByteBuf(32) - msg := newTestMessage([]byte("1")) + msg := newTestMessage(1) err := codec.Encode(msg, buf) assert.NoError(t, err) @@ -41,7 +41,7 @@ func TestEncodeAndDecodeWithPayload(t *testing.T) { buf2 := buf.NewByteBuf(32) buf1.SetSinkTo(buf2) - msg := newTestMessage([]byte("1")) + msg := newTestMessage(1) msg.payload = []byte("payload") err := codec.Encode(msg, buf1) assert.NoError(t, err) diff --git a/pkg/common/morpc/examples/message/message.go b/pkg/common/morpc/examples/message/message.go index e52d9fe50ed3367be33a211e70ca70d070cd2fad..c2de167968da94f3abd60ebe0fa6f672e32fdca4 100644 --- a/pkg/common/morpc/examples/message/message.go +++ b/pkg/common/morpc/examples/message/message.go @@ -15,39 +15,36 @@ package message import ( - "encoding/hex" - "github.com/fagongzi/goetty/v2/buf" ) // ExampleMessage example message type ExampleMessage struct { - MsgID []byte + MsgID uint64 Content string } -func (tm *ExampleMessage) ID() []byte { +func (tm *ExampleMessage) GetID() uint64 { return tm.MsgID } +func (tm *ExampleMessage) SetID(id uint64) { + tm.MsgID = id +} + func (tm *ExampleMessage) DebugString() string { - return hex.EncodeToString(tm.MsgID) + "," + string(tm.Content) + return "" } func (tm *ExampleMessage) Size() int { - return 8 + len(tm.MsgID) + len(tm.Content) + return 4 + 8 + len(tm.Content) } func (tm *ExampleMessage) MarshalTo(data []byte) (int, error) { offset := 0 - buf.Int2BytesTo(len(tm.MsgID), data[offset:]) - offset += 4 - - if len(tm.MsgID) > 0 { - copy(data[offset:], tm.MsgID) - offset += len(tm.MsgID) - } + buf.Uint64ToBytesTo(tm.MsgID, data[offset:]) + offset += 8 buf.Int2BytesTo(len(tm.Content), data[offset:]) offset += 4 @@ -63,16 +60,10 @@ func (tm *ExampleMessage) MarshalTo(data []byte) (int, error) { func (tm *ExampleMessage) Unmarshal(data []byte) error { offset := 0 - n := buf.Byte2Int(data) - offset += 4 - - tm.MsgID = make([]byte, n) - if n > 0 { - copy(tm.MsgID, data[offset:offset+n]) - offset += n - } + tm.MsgID = buf.Byte2Uint64(data) + offset += 8 - n = buf.Byte2Int(data[offset:]) + n := buf.Byte2Int(data[offset:]) offset += 4 content := make([]byte, n) diff --git a/pkg/common/morpc/examples/message/message_test.go b/pkg/common/morpc/examples/message/message_test.go index d903783d9ae28b3a4fd5179b530018d226cea466..bd4d77d0baeb19ab2595a781f921320ae8706282 100644 --- a/pkg/common/morpc/examples/message/message_test.go +++ b/pkg/common/morpc/examples/message/message_test.go @@ -21,7 +21,7 @@ import ( ) func TestMarshal(t *testing.T) { - m := &ExampleMessage{MsgID: []byte{1}, Content: "hello"} + m := &ExampleMessage{MsgID: 1, Content: "hello"} data := make([]byte, m.Size()) _, err := m.MarshalTo(data) diff --git a/pkg/common/morpc/examples/pingpong/main.go b/pkg/common/morpc/examples/pingpong/main.go index 39f092f0626fd0795226df761bf31a57694c595a..f6b4430a87f9be9784308cc3684b6098893d8ef2 100644 --- a/pkg/common/morpc/examples/pingpong/main.go +++ b/pkg/common/morpc/examples/pingpong/main.go @@ -47,7 +47,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - f, err := cli.Send(ctx, addr, &message.ExampleMessage{MsgID: []byte{1}, Content: "hello"}, morpc.SendOptions{}) + f, err := cli.Send(ctx, addr, &message.ExampleMessage{MsgID: 1, Content: "hello"}, morpc.SendOptions{}) if err != nil { panic(err) } diff --git a/pkg/common/morpc/examples/stream/main.go b/pkg/common/morpc/examples/stream/main.go index 695d453c1430d8477a7dc650c073800cecd80044..dab911eb3a74091ef3e5980ff7810227f15518f7 100644 --- a/pkg/common/morpc/examples/stream/main.go +++ b/pkg/common/morpc/examples/stream/main.go @@ -77,7 +77,7 @@ func startServer() error { // send more message back go func() { for i := 0; i < 10; i++ { - if err := cs.Write(&message.ExampleMessage{MsgID: request.ID(), Content: fmt.Sprintf("stream-%d", i)}, + if err := cs.Write(&message.ExampleMessage{MsgID: request.GetID(), Content: fmt.Sprintf("stream-%d", i)}, morpc.SendOptions{}); err != nil { panic(err) } diff --git a/pkg/common/morpc/future.go b/pkg/common/morpc/future.go index b713d79854506c4ee5ca7fe11de0c86d1c5f8097..b4ae0f3ecc10bc7676b005832a33c4a0fedc3684 100644 --- a/pkg/common/morpc/future.go +++ b/pkg/common/morpc/future.go @@ -15,7 +15,6 @@ package morpc import ( - "bytes" "context" "runtime" "sync" @@ -36,7 +35,7 @@ func newFutureWithChan(c chan Message) *Future { // Future is used to obtain response data synchronously. type Future struct { - id []byte + id uint64 c chan Message releaseFunc func(*Future) stream bool @@ -60,7 +59,7 @@ func (f *Future) init(ctx context.Context, request Message, opts SendOptions, st } } - f.id = request.ID() + f.id = request.GetID() f.ctx = ctx f.request = request f.opts = opts @@ -103,7 +102,7 @@ func (f *Future) done(response Message) { defer f.mu.Unlock() if !f.mu.closed && !f.timeout() { - if !bytes.Equal(response.ID(), f.id) { + if response.GetID() != f.id { return } @@ -131,7 +130,7 @@ func (f *Future) unRef() { func (f *Future) reset() { f.request = nil - f.id = nil + f.id = 0 f.ctx = nil f.opts = SendOptions{} f.stream = false diff --git a/pkg/common/morpc/future_test.go b/pkg/common/morpc/future_test.go index a394429b73db69fc1a0297a90566ca8d098a3ed5..ee55eb58b518945579666bb74d9377e370d87600 100644 --- a/pkg/common/morpc/future_test.go +++ b/pkg/common/morpc/future_test.go @@ -55,7 +55,7 @@ func TestNewFuture(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - req := newTestMessage([]byte("id")) + req := newTestMessage(1) f := newFuture(nil) f.init(ctx, req, SendOptions{}, false) defer f.Close() @@ -72,7 +72,7 @@ func TestReleaseFuture(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - req := newTestMessage([]byte("id")) + req := newTestMessage(1) f := newFuture(func(f *Future) { f.reset() }) f.init(ctx, req, SendOptions{}, false) f.c <- req @@ -87,7 +87,7 @@ func TestGet(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - req := newTestMessage([]byte("id")) + req := newTestMessage(1) f := newFuture(func(f *Future) { f.reset() }) f.init(ctx, req, SendOptions{}, false) defer f.Close() @@ -102,7 +102,7 @@ func TestGetWithTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1) defer cancel() - req := newTestMessage([]byte("id")) + req := newTestMessage(1) f := newFuture(func(f *Future) { f.reset() }) f.init(ctx, req, SendOptions{}, false) defer f.Close() @@ -117,18 +117,18 @@ func TestGetWithInvalidResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - req := newTestMessage([]byte("id")) + req := newTestMessage(1) f := newFuture(func(f *Future) { f.reset() }) f.init(ctx, req, SendOptions{}, false) defer f.Close() - f.done(newTestMessage([]byte("id2"))) + f.done(newTestMessage(2)) assert.Equal(t, 0, len(f.c)) } func TestTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Hour) - req := newTestMessage([]byte("id")) + req := newTestMessage(1) f := newFuture(func(f *Future) { f.reset() }) f.init(ctx, req, SendOptions{}, false) defer f.Close() diff --git a/pkg/common/morpc/server.go b/pkg/common/morpc/server.go index ed1e4b828958d106e86b173dbc4b9d36521a3316..9789ca1d9c304ee3ec69855f204f4c312b9973a8 100644 --- a/pkg/common/morpc/server.go +++ b/pkg/common/morpc/server.go @@ -16,7 +16,6 @@ package morpc import ( "context" - "encoding/hex" "fmt" "sync" "time" @@ -98,6 +97,7 @@ func NewRPCServer(name, address string, codec Codec, options ...ServerOption) (R goetty.WithAppSessionOptions( goetty.WithCodec(codec, codec), goetty.WithLogger(s.logger), + goetty.WithDisableReleaseOutBuf(), // release out buf when write loop reutrned ), ) if err != nil { @@ -165,7 +165,7 @@ func (s *server) onMessage(rs goetty.IOSession, value interface{}, sequence uint if ce := s.logger.Check(zap.DebugLevel, "received request"); ce != nil { ce.Write(zap.Uint64("sequence", sequence), zap.String("client", rs.RemoteAddr()), - zap.String("request-id", hex.EncodeToString(request.ID())), + zap.Uint64("request-id", request.GetID()), zap.String("request", request.DebugString())) } @@ -181,14 +181,17 @@ func (s *server) onMessage(rs goetty.IOSession, value interface{}, sequence uint if ce := s.logger.Check(zap.DebugLevel, "handle request completed"); ce != nil { ce.Write(zap.Uint64("sequence", sequence), zap.String("client", rs.RemoteAddr()), - zap.String("request-id", hex.EncodeToString(request.ID()))) + zap.Uint64("request-id", request.GetID())) } return nil } func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error { return s.stopper.RunTask(func(ctx context.Context) { - defer cs.close() + defer func() { + cs.close() + rs.OutBuf().Release() + }() responses := make([]sendMessage, 0, s.options.batchSendSize) fetch := func() { @@ -240,7 +243,7 @@ func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error { timeout += sendResponses[idx].opts.Timeout if err := rs.Write(sendResponses[idx].message, goetty.WriteOptions{}); err != nil { s.logger.Error("write response failed", - zap.String("request-id", hex.EncodeToString(sendResponses[idx].message.ID())), + zap.Uint64("request-id", sendResponses[idx].message.GetID()), zap.Error(err)) return } @@ -251,7 +254,7 @@ func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error { if err := rs.Flush(timeout); err != nil { for idx := range sendResponses { s.logger.Error("write response failed", - zap.String("request-id", hex.EncodeToString(sendResponses[idx].message.ID())), + zap.Uint64("request-id", sendResponses[idx].message.GetID()), zap.Error(err)) } return @@ -260,8 +263,8 @@ func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error { var fields []zap.Field fields = append(fields, zap.String("client", rs.RemoteAddr())) for idx := range sendResponses { - fields = append(fields, zap.String("request-id", - hex.EncodeToString(sendResponses[idx].message.ID()))) + fields = append(fields, zap.Uint64("request-id", + sendResponses[idx].message.GetID())) fields = append(fields, zap.String("response", sendResponses[idx].message.DebugString())) } diff --git a/pkg/common/morpc/server_test.go b/pkg/common/morpc/server_test.go index b91af41bbf6d6dd879b08217e348ae353550ce08..cbaa1293bd586ef5875e59b5cea5264859ad8441 100644 --- a/pkg/common/morpc/server_test.go +++ b/pkg/common/morpc/server_test.go @@ -40,7 +40,7 @@ func TestHandleServer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - req := &testMessage{id: []byte{1}} + req := newTestMessage(1) f, err := c.Send(ctx, testAddr, req, SendOptions{}) assert.NoError(t, err) @@ -65,7 +65,7 @@ func TestHandleServerWithPayloadMessage(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - req := &testMessage{id: []byte{1}, payload: []byte("payload")} + req := &testMessage{id: 1, payload: []byte("payload")} f, err := c.Send(ctx, testAddr, req, SendOptions{}) assert.NoError(t, err) @@ -91,7 +91,7 @@ func TestHandleServerWriteWithClosedSession(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - req := &testMessage{id: []byte{1}} + req := newTestMessage(1) f, err := c.Send(ctx, testAddr, req, SendOptions{}) assert.NoError(t, err) diff --git a/pkg/common/morpc/types.go b/pkg/common/morpc/types.go index a8678840c6273b7be1c35a81fe22a8b62fb148d2..de19406808d3c01daf5205b9b8547ff061f55230 100644 --- a/pkg/common/morpc/types.go +++ b/pkg/common/morpc/types.go @@ -32,9 +32,11 @@ var ( // Message morpc is not a normal remote method call, rather it is a message-based asynchronous // driven framework. type Message interface { - // ID each message has a unique ID. If it is a message transmitted in stream, the ID must - // be set to Stream.ID - ID() []byte + // SetID each message has a unique ID in a RPCClient Backend. If it is a message transmitted + // in stream, the ID must be set to Stream.ID. + SetID(uint64) + // GetID returns ID of the message + GetID() uint64 // DebugString return debug string DebugString() string // Size size of message after marshal @@ -140,7 +142,7 @@ type Backend interface { type Stream interface { // ID returns the stream ID. All messages transmitted on the current stream need to use the // stream ID as the message ID - ID() []byte + ID() uint64 // Send send message to stream Send(request Message, opts SendOptions) error // Receive returns a channel to read message from server. The channel will be closed after stream diff --git a/pkg/logservice/service.go b/pkg/logservice/service.go index 82f3aff4b8749ab414f4614e368ba844114ecc60..d683c10a7178516f6a6f812ecb5de3edeaf2f229 100644 --- a/pkg/logservice/service.go +++ b/pkg/logservice/service.go @@ -52,8 +52,12 @@ type RPCRequest struct { var _ morpc.PayloadMessage = (*RPCRequest)(nil) -func (r *RPCRequest) ID() []byte { - return nil +func (r *RPCRequest) SetID(id uint64) { + r.RequestID = id +} + +func (r *RPCRequest) GetID() uint64 { + return r.RequestID } func (r *RPCRequest) DebugString() string { @@ -76,8 +80,12 @@ type RPCResponse struct { var _ morpc.PayloadMessage = (*RPCResponse)(nil) -func (r *RPCResponse) ID() []byte { - return nil +func (r *RPCResponse) SetID(id uint64) { + r.RequestID = id +} + +func (r *RPCResponse) GetID() uint64 { + return r.RequestID } func (r *RPCResponse) DebugString() string { @@ -167,6 +175,7 @@ func (s *Service) handleRPCRequest(req morpc.Message, recs = MustMarshal(&records) } // FIXME: set timeout value + resp.RequestID = rr.RequestID return cs.Write(&RPCResponse{ Response: resp, payload: recs, diff --git a/pkg/pb/logservice/logservice.pb.go b/pkg/pb/logservice/logservice.pb.go index f3c159cf73ba8a9b3c85937c62967f4e91b3b167..a68618c03f4ec063344e21663fadfd1d9347bb20 100644 --- a/pkg/pb/logservice/logservice.pb.go +++ b/pkg/pb/logservice/logservice.pb.go @@ -5,11 +5,12 @@ package logservice import ( fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. @@ -637,14 +638,15 @@ func (m *LogRecord) GetData() []byte { } type Request struct { - Method MethodType `protobuf:"varint,1,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"` - Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"` - ShardID uint64 `protobuf:"varint,3,opt,name=ShardID,proto3" json:"ShardID,omitempty"` - Index uint64 `protobuf:"varint,4,opt,name=Index,proto3" json:"Index,omitempty"` - MaxSize uint64 `protobuf:"varint,5,opt,name=MaxSize,proto3" json:"MaxSize,omitempty"` - Timeout int64 `protobuf:"varint,6,opt,name=Timeout,proto3" json:"Timeout,omitempty"` - DNShardID uint64 `protobuf:"varint,7,opt,name=DNShardID,proto3" json:"DNShardID,omitempty"` - DNID uint64 `protobuf:"varint,8,opt,name=DNID,proto3" json:"DNID,omitempty"` + RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"` + Method MethodType `protobuf:"varint,2,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"` + Name string `protobuf:"bytes,3,opt,name=Name,proto3" json:"Name,omitempty"` + ShardID uint64 `protobuf:"varint,4,opt,name=ShardID,proto3" json:"ShardID,omitempty"` + Index uint64 `protobuf:"varint,5,opt,name=Index,proto3" json:"Index,omitempty"` + MaxSize uint64 `protobuf:"varint,6,opt,name=MaxSize,proto3" json:"MaxSize,omitempty"` + Timeout int64 `protobuf:"varint,7,opt,name=Timeout,proto3" json:"Timeout,omitempty"` + DNShardID uint64 `protobuf:"varint,8,opt,name=DNShardID,proto3" json:"DNShardID,omitempty"` + DNID uint64 `protobuf:"varint,9,opt,name=DNID,proto3" json:"DNID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -683,6 +685,13 @@ func (m *Request) XXX_DiscardUnknown() { var xxx_messageInfo_Request proto.InternalMessageInfo +func (m *Request) GetRequestID() uint64 { + if m != nil { + return m.RequestID + } + return 0 +} + func (m *Request) GetMethod() MethodType { if m != nil { return m.Method @@ -740,12 +749,13 @@ func (m *Request) GetDNID() uint64 { } type Response struct { - Method MethodType `protobuf:"varint,1,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"` - ErrorCode ErrorCode `protobuf:"varint,2,opt,name=ErrorCode,proto3,enum=logservice.ErrorCode" json:"ErrorCode,omitempty"` - ErrorMessage string `protobuf:"bytes,3,opt,name=ErrorMessage,proto3" json:"ErrorMessage,omitempty"` - ShardID uint64 `protobuf:"varint,4,opt,name=ShardID,proto3" json:"ShardID,omitempty"` - Index uint64 `protobuf:"varint,5,opt,name=Index,proto3" json:"Index,omitempty"` - LastIndex uint64 `protobuf:"varint,6,opt,name=LastIndex,proto3" json:"LastIndex,omitempty"` + RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"` + Method MethodType `protobuf:"varint,2,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"` + ErrorCode ErrorCode `protobuf:"varint,3,opt,name=ErrorCode,proto3,enum=logservice.ErrorCode" json:"ErrorCode,omitempty"` + ErrorMessage string `protobuf:"bytes,4,opt,name=ErrorMessage,proto3" json:"ErrorMessage,omitempty"` + ShardID uint64 `protobuf:"varint,5,opt,name=ShardID,proto3" json:"ShardID,omitempty"` + Index uint64 `protobuf:"varint,6,opt,name=Index,proto3" json:"Index,omitempty"` + LastIndex uint64 `protobuf:"varint,7,opt,name=LastIndex,proto3" json:"LastIndex,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -784,6 +794,13 @@ func (m *Response) XXX_DiscardUnknown() { var xxx_messageInfo_Response proto.InternalMessageInfo +func (m *Response) GetRequestID() uint64 { + if m != nil { + return m.RequestID + } + return 0 +} + func (m *Response) GetMethod() MethodType { if m != nil { return m.Method @@ -894,72 +911,73 @@ func init() { func init() { proto.RegisterFile("logservice.proto", fileDescriptor_fd1040c5381ab5a7) } var fileDescriptor_fd1040c5381ab5a7 = []byte{ - // 1031 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x4f, 0x6b, 0xe3, 0x46, - 0x14, 0x8f, 0x6c, 0xf9, 0xdf, 0xb3, 0xd7, 0x9d, 0x0c, 0xcd, 0x56, 0x0d, 0x4b, 0x36, 0x98, 0xb2, - 0x84, 0x40, 0x1d, 0x48, 0x58, 0x28, 0x6d, 0xa1, 0x24, 0x96, 0xba, 0xeb, 0x25, 0x91, 0xc3, 0xd8, - 0x39, 0xb4, 0x50, 0x96, 0x89, 0x35, 0x71, 0xd4, 0xb5, 0x35, 0xee, 0x68, 0x1c, 0xe2, 0x7e, 0x88, - 0x7e, 0xae, 0x3d, 0xe6, 0xd2, 0xeb, 0xd2, 0x4d, 0x2f, 0xbd, 0xf5, 0x03, 0xf4, 0x52, 0x66, 0x46, - 0x92, 0xa5, 0x84, 0xb4, 0xbb, 0xb7, 0x79, 0xbf, 0xf7, 0x47, 0xbf, 0xf7, 0xde, 0x6f, 0x06, 0x01, - 0x9a, 0xf2, 0x49, 0xcc, 0xc4, 0x55, 0x38, 0x66, 0xdd, 0xb9, 0xe0, 0x92, 0x63, 0x58, 0x21, 0x9b, - 0x5f, 0x4e, 0x42, 0x79, 0xb9, 0x38, 0xef, 0x8e, 0xf9, 0x6c, 0x6f, 0xc2, 0x27, 0x7c, 0x4f, 0x87, - 0x9c, 0x2f, 0x2e, 0xb4, 0xa5, 0x0d, 0x7d, 0x32, 0xa9, 0x9d, 0xbf, 0x2d, 0x68, 0x1d, 0xf3, 0xc9, - 0xf0, 0x92, 0x8a, 0xa0, 0x1f, 0x5d, 0x70, 0xec, 0x40, 0xcd, 0x18, 0xae, 0x63, 0x6d, 0x5b, 0x3b, - 0x36, 0x49, 0x4d, 0x7c, 0x04, 0x75, 0xc2, 0xe6, 0xd3, 0x70, 0x4c, 0x63, 0xa7, 0xb4, 0x5d, 0xde, - 0x69, 0xee, 0x3f, 0xeb, 0xe6, 0xa8, 0xe4, 0xab, 0x74, 0xd3, 0x40, 0x2f, 0x92, 0x62, 0x49, 0xb2, - 0x3c, 0xfc, 0x29, 0x54, 0xbc, 0x39, 0x1f, 0x5f, 0x3a, 0x65, 0x5d, 0xdb, 0x18, 0x78, 0x13, 0xea, - 0xc7, 0x8c, 0x06, 0x4c, 0xf4, 0x5d, 0xc7, 0xd6, 0x8e, 0xcc, 0xc6, 0x18, 0xec, 0x11, 0x13, 0x33, - 0xa7, 0xa2, 0x71, 0x7d, 0xde, 0xfc, 0x06, 0x1e, 0x15, 0x3e, 0x80, 0x11, 0x94, 0xdf, 0xb0, 0x65, - 0x42, 0x58, 0x1d, 0xd5, 0x87, 0xae, 0xe8, 0x74, 0xc1, 0x9c, 0xd2, 0xb6, 0xb5, 0xd3, 0x20, 0xc6, - 0xf8, 0xba, 0xf4, 0x95, 0xd5, 0xb9, 0x82, 0xf6, 0x31, 0x9f, 0x24, 0xf9, 0xba, 0xe5, 0xef, 0x8b, - 0x23, 0xd0, 0x65, 0x9a, 0xfb, 0xce, 0x43, 0xcd, 0x1d, 0xd5, 0xdf, 0xbe, 0x7b, 0xba, 0x76, 0xf3, - 0xee, 0xa9, 0x45, 0x8a, 0xa3, 0x7b, 0x02, 0x8d, 0xb4, 0xac, 0xab, 0xbf, 0x6b, 0x93, 0x15, 0xd0, - 0xf9, 0xdd, 0x82, 0x75, 0x15, 0x2e, 0xb9, 0x60, 0x2f, 0x19, 0x15, 0xf2, 0x9c, 0x51, 0xa9, 0xda, - 0x3b, 0x3b, 0x4b, 0x66, 0xdd, 0x20, 0xfa, 0x8c, 0xb7, 0xa1, 0x49, 0xe8, 0x85, 0x3c, 0x0c, 0x02, - 0xc1, 0xe2, 0x38, 0xe9, 0x20, 0x0f, 0xe1, 0x67, 0xd0, 0x1e, 0x1a, 0x66, 0x69, 0x50, 0x59, 0x07, - 0xdd, 0x41, 0xf1, 0x17, 0xf0, 0xe8, 0x05, 0x8f, 0xe3, 0x70, 0x9e, 0x86, 0xd9, 0x3a, 0xac, 0x08, - 0xe2, 0x6f, 0x73, 0x8b, 0xad, 0xe8, 0xc5, 0x6e, 0xde, 0xe9, 0x3d, 0x37, 0xad, 0x23, 0x5b, 0x75, - 0xbf, 0x5a, 0x69, 0xc7, 0x83, 0xa6, 0xeb, 0x7f, 0x88, 0x7e, 0xfe, 0x7b, 0x3c, 0x3f, 0x01, 0x72, - 0xfd, 0x0f, 0x18, 0xce, 0x73, 0xa8, 0xea, 0x82, 0xa9, 0x06, 0x3f, 0xcb, 0x53, 0xcd, 0x11, 0x49, - 0x78, 0x26, 0xc1, 0x9d, 0x7f, 0x2c, 0xa8, 0x93, 0xe1, 0xc9, 0x50, 0x52, 0xc9, 0x94, 0x38, 0xfa, - 0x51, 0xc0, 0xae, 0x13, 0x86, 0xc6, 0x50, 0xc3, 0x3a, 0x66, 0x34, 0x66, 0x2f, 0xf9, 0xd4, 0x48, - 0xd1, 0x70, 0x2c, 0x82, 0x6a, 0xf4, 0x23, 0xb1, 0x88, 0xc6, 0x54, 0xb2, 0xc0, 0x14, 0x31, 0x52, - 0xbe, 0x83, 0xe2, 0x57, 0xd0, 0x32, 0x89, 0x61, 0x2c, 0xb9, 0x58, 0x3a, 0xf6, 0xfd, 0x1b, 0x93, - 0xf2, 0xe9, 0xe6, 0x03, 0xcd, 0x8d, 0x29, 0xe4, 0x6e, 0x7e, 0x07, 0xeb, 0xf7, 0x42, 0xfe, 0x4f, - 0xf3, 0x76, 0x5e, 0xf3, 0x14, 0x1a, 0x7a, 0x8b, 0x63, 0x2e, 0x82, 0x07, 0xba, 0xdf, 0x05, 0x7b, - 0xb4, 0x9c, 0x9b, 0xdc, 0xf6, 0xfe, 0xe3, 0x02, 0x4f, 0x9d, 0xa7, 0xbc, 0x44, 0xc7, 0xa8, 0xbd, - 0xb8, 0x54, 0x52, 0xdd, 0x79, 0x8b, 0xe8, 0x73, 0xe7, 0x4f, 0x0b, 0x6a, 0x84, 0xfd, 0xb2, 0x60, - 0xb1, 0xc4, 0x5d, 0xa8, 0x9e, 0x30, 0x79, 0xc9, 0x03, 0xfd, 0x89, 0x3b, 0xd5, 0x8c, 0x47, 0x57, - 0x4b, 0xa2, 0x54, 0x3d, 0x9f, 0xce, 0xd2, 0xbb, 0xaa, 0xcf, 0x79, 0x1d, 0x95, 0x8b, 0x3a, 0xca, - 0xf8, 0xdb, 0x79, 0xfe, 0x0e, 0xd4, 0x4e, 0xe8, 0xf5, 0x30, 0xfc, 0x95, 0x25, 0x4f, 0x45, 0x6a, - 0x2a, 0xcf, 0x28, 0x9c, 0x31, 0xbe, 0x90, 0x4e, 0x75, 0xdb, 0xda, 0x29, 0x93, 0xd4, 0x54, 0x8a, - 0x4c, 0x15, 0xe3, 0x3a, 0x35, 0xa3, 0xc8, 0x0c, 0xd0, 0x5d, 0xfa, 0x7d, 0xd7, 0xa9, 0x9b, 0x97, - 0x47, 0x9d, 0x3b, 0xef, 0x95, 0x8c, 0x58, 0x3c, 0xe7, 0x51, 0xcc, 0x3e, 0xba, 0xcd, 0x03, 0x68, - 0x78, 0x42, 0x70, 0xd1, 0xe3, 0x41, 0x3a, 0xe7, 0x8d, 0x7c, 0x4a, 0xe6, 0x24, 0xab, 0x38, 0xdc, - 0x81, 0x96, 0x36, 0x4e, 0x58, 0x1c, 0xd3, 0x09, 0x4b, 0x2e, 0x7a, 0x01, 0xcb, 0xcf, 0xca, 0x7e, - 0x60, 0x56, 0x95, 0xfc, 0xac, 0x9e, 0x40, 0xe3, 0x98, 0xc6, 0xd2, 0x78, 0xaa, 0xa6, 0xef, 0x0c, - 0xe8, 0xbc, 0xd2, 0xef, 0x94, 0x59, 0x7a, 0xd6, 0xeb, 0x73, 0xb5, 0x5d, 0x85, 0xc4, 0x8e, 0xa5, - 0x95, 0xbc, 0x71, 0xef, 0x89, 0x50, 0xde, 0xe4, 0xd6, 0xa5, 0xb1, 0xbb, 0xd7, 0x00, 0xab, 0x41, - 0x60, 0x80, 0x6a, 0x8f, 0x78, 0x87, 0x23, 0x0f, 0xad, 0xe1, 0x26, 0xd4, 0x5c, 0x6f, 0x38, 0x22, - 0x83, 0x1f, 0x90, 0xa5, 0x1c, 0x87, 0xa7, 0xa7, 0x9e, 0xef, 0xa2, 0x12, 0xae, 0x83, 0x4d, 0xbc, - 0x43, 0x17, 0x95, 0x71, 0x0b, 0xea, 0x23, 0x72, 0xe6, 0xf7, 0x54, 0x82, 0x8d, 0x11, 0xb4, 0x5e, - 0x78, 0xa3, 0xd7, 0x19, 0x52, 0x51, 0x25, 0x7a, 0x03, 0xdf, 0xf7, 0x7a, 0x23, 0x54, 0xc5, 0x6d, - 0x80, 0xc4, 0x78, 0x4d, 0x06, 0xa8, 0xb6, 0xeb, 0x03, 0xac, 0x74, 0xab, 0xbc, 0x67, 0x31, 0x13, - 0x06, 0x41, 0x6b, 0xaa, 0x74, 0x3f, 0x92, 0x4c, 0x44, 0x74, 0x8a, 0x2c, 0xfc, 0x09, 0x34, 0xf5, - 0xfd, 0x3a, 0x9b, 0x07, 0x54, 0x32, 0x54, 0xc2, 0xeb, 0xc9, 0x53, 0x40, 0xd8, 0xcf, 0x6c, 0x2c, - 0x59, 0x80, 0xca, 0xbb, 0xbf, 0x95, 0x72, 0xdb, 0x53, 0x9f, 0xf6, 0xb9, 0x36, 0x4d, 0x2b, 0x89, - 0xa2, 0x90, 0xa5, 0x2a, 0xf7, 0x68, 0x34, 0x66, 0x53, 0x16, 0xa0, 0x92, 0x22, 0xdd, 0x8f, 0xae, - 0xe8, 0x34, 0x0c, 0xf4, 0x46, 0x50, 0x19, 0x63, 0x68, 0x27, 0x48, 0x9a, 0x63, 0xe7, 0xb0, 0x53, - 0xba, 0x9c, 0x72, 0x1a, 0xa0, 0x0a, 0x7e, 0x0c, 0xb8, 0x88, 0x29, 0x2d, 0xa3, 0xaa, 0xaa, 0x9f, - 0xb1, 0xaa, 0x29, 0xa2, 0xba, 0xb0, 0xcf, 0x25, 0x61, 0x34, 0x58, 0xa2, 0xba, 0xfa, 0xe4, 0x70, - 0x19, 0x4b, 0x36, 0xeb, 0x4d, 0x79, 0xcc, 0x02, 0xd4, 0xc0, 0x9f, 0xc3, 0x86, 0xde, 0xec, 0xe1, - 0x54, 0xa8, 0x98, 0xec, 0xa1, 0x42, 0x81, 0x9a, 0xcb, 0x60, 0x21, 0x07, 0x17, 0x84, 0x46, 0x13, - 0x86, 0xd4, 0xcd, 0x6e, 0xfb, 0x5c, 0xe6, 0x5e, 0x3c, 0x74, 0x81, 0x37, 0x00, 0x0d, 0xe4, 0x25, - 0x13, 0xa6, 0xaa, 0x69, 0xfa, 0xaf, 0xda, 0x91, 0x77, 0xf3, 0x7e, 0x6b, 0xed, 0xed, 0xed, 0x96, - 0x75, 0x73, 0xbb, 0x65, 0xfd, 0x71, 0xbb, 0x65, 0xfd, 0x78, 0x90, 0xfb, 0xf5, 0x98, 0x51, 0x29, - 0xc2, 0x6b, 0x2e, 0xc2, 0x49, 0x18, 0xa5, 0x46, 0xc4, 0xf6, 0xe6, 0x6f, 0x26, 0x7b, 0xf3, 0xf3, - 0xbd, 0x95, 0x74, 0xce, 0xab, 0xfa, 0x3f, 0xe4, 0xe0, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb2, - 0xc8, 0xc0, 0x42, 0xd6, 0x08, 0x00, 0x00, + // 1042 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4f, 0x6f, 0xe3, 0x44, + 0x14, 0xaf, 0x13, 0xe7, 0xdf, 0x4b, 0x36, 0x4c, 0x47, 0x74, 0x31, 0xd5, 0xaa, 0x5b, 0x45, 0x68, + 0x55, 0x55, 0x22, 0x95, 0x5a, 0xad, 0x84, 0x00, 0x09, 0xb5, 0xb1, 0xd9, 0xcd, 0xaa, 0x75, 0xaa, + 0x49, 0x7a, 0x00, 0x09, 0xad, 0xa6, 0xf1, 0x34, 0x35, 0x9b, 0x78, 0xc2, 0x78, 0x52, 0x35, 0xdc, + 0xf8, 0x02, 0x7c, 0xae, 0x3d, 0xf6, 0xc2, 0x75, 0x05, 0x3d, 0x71, 0xe3, 0x03, 0x70, 0x00, 0xcd, + 0x8c, 0xed, 0xd8, 0x2d, 0x85, 0x3d, 0x70, 0x9b, 0xf7, 0x7b, 0x7f, 0xe6, 0xbd, 0xdf, 0xfb, 0x79, + 0x64, 0x40, 0x53, 0x3e, 0x89, 0x99, 0xb8, 0x0a, 0xc7, 0xac, 0x3b, 0x17, 0x5c, 0x72, 0x0c, 0x2b, + 0x64, 0xf3, 0xd3, 0x49, 0x28, 0x2f, 0x17, 0xe7, 0xdd, 0x31, 0x9f, 0xed, 0x4d, 0xf8, 0x84, 0xef, + 0xe9, 0x90, 0xf3, 0xc5, 0x85, 0xb6, 0xb4, 0xa1, 0x4f, 0x26, 0xb5, 0xf3, 0x87, 0x05, 0xad, 0x63, + 0x3e, 0x19, 0x5e, 0x52, 0x11, 0xf4, 0xa3, 0x0b, 0x8e, 0x1d, 0xa8, 0x19, 0xc3, 0x75, 0xac, 0x6d, + 0x6b, 0xc7, 0x26, 0xa9, 0x89, 0x8f, 0xa0, 0x4e, 0xd8, 0x7c, 0x1a, 0x8e, 0x69, 0xec, 0x94, 0xb6, + 0xcb, 0x3b, 0xcd, 0xfd, 0x67, 0xdd, 0x5c, 0x2b, 0xf9, 0x2a, 0xdd, 0x34, 0xd0, 0x8b, 0xa4, 0x58, + 0x92, 0x2c, 0x0f, 0x7f, 0x08, 0x15, 0x6f, 0xce, 0xc7, 0x97, 0x4e, 0x59, 0xd7, 0x36, 0x06, 0xde, + 0x84, 0xfa, 0x31, 0xa3, 0x01, 0x13, 0x7d, 0xd7, 0xb1, 0xb5, 0x23, 0xb3, 0x31, 0x06, 0x7b, 0xc4, + 0xc4, 0xcc, 0xa9, 0x68, 0x5c, 0x9f, 0x37, 0xbf, 0x80, 0x47, 0x85, 0x0b, 0x30, 0x82, 0xf2, 0x1b, + 0xb6, 0x4c, 0x1a, 0x56, 0x47, 0x75, 0xd1, 0x15, 0x9d, 0x2e, 0x98, 0x53, 0xda, 0xb6, 0x76, 0x1a, + 0xc4, 0x18, 0x9f, 0x97, 0x3e, 0xb3, 0x3a, 0x57, 0xd0, 0x3e, 0xe6, 0x93, 0x24, 0x5f, 0x8f, 0xfc, + 0x75, 0x91, 0x02, 0x5d, 0xa6, 0xb9, 0xef, 0x3c, 0x34, 0xdc, 0x51, 0xfd, 0xed, 0xbb, 0xa7, 0x6b, + 0x37, 0xef, 0x9e, 0x5a, 0xa4, 0x48, 0xdd, 0x13, 0x68, 0xa4, 0x65, 0x5d, 0x7d, 0xaf, 0x4d, 0x56, + 0x40, 0xe7, 0x17, 0x0b, 0xd6, 0x55, 0xb8, 0xe4, 0x82, 0xbd, 0x64, 0x54, 0xc8, 0x73, 0x46, 0xa5, + 0x1a, 0xef, 0xec, 0x2c, 0xe1, 0xba, 0x41, 0xf4, 0x19, 0x6f, 0x43, 0x93, 0xd0, 0x0b, 0x79, 0x18, + 0x04, 0x82, 0xc5, 0x71, 0x32, 0x41, 0x1e, 0xc2, 0xcf, 0xa0, 0x3d, 0x34, 0x9d, 0xa5, 0x41, 0x65, + 0x1d, 0x74, 0x07, 0xc5, 0x9f, 0xc0, 0xa3, 0x17, 0x3c, 0x8e, 0xc3, 0x79, 0x1a, 0x66, 0xeb, 0xb0, + 0x22, 0x88, 0xbf, 0xcc, 0x2d, 0xb6, 0xa2, 0x17, 0xbb, 0x79, 0x67, 0xf6, 0x1c, 0x5b, 0x47, 0xb6, + 0x9a, 0x7e, 0xb5, 0xd2, 0x8e, 0x07, 0x4d, 0xd7, 0x7f, 0x1f, 0xfd, 0xfc, 0x3b, 0x3d, 0xdf, 0x01, + 0x72, 0xfd, 0xf7, 0x20, 0xe7, 0x39, 0x54, 0x75, 0xc1, 0x54, 0x83, 0x1f, 0xe5, 0x5b, 0xcd, 0x35, + 0x92, 0xf4, 0x99, 0x04, 0x77, 0xfe, 0xb4, 0xa0, 0x4e, 0x86, 0x27, 0x43, 0x49, 0x25, 0x53, 0xe2, + 0xe8, 0x47, 0x01, 0xbb, 0x4e, 0x3a, 0x34, 0x86, 0x22, 0xeb, 0x98, 0xd1, 0x98, 0xbd, 0xe4, 0x53, + 0x23, 0x45, 0xd3, 0x63, 0x11, 0x54, 0xd4, 0x8f, 0xc4, 0x22, 0x1a, 0x53, 0xc9, 0x02, 0x53, 0xc4, + 0x48, 0xf9, 0x0e, 0x8a, 0x5f, 0x41, 0xcb, 0x24, 0x86, 0xb1, 0xe4, 0x62, 0xe9, 0xd8, 0xf7, 0xbf, + 0x98, 0xb4, 0x9f, 0x6e, 0x3e, 0xd0, 0x7c, 0x31, 0x85, 0xdc, 0xcd, 0xaf, 0x60, 0xfd, 0x5e, 0xc8, + 0x7f, 0x69, 0xde, 0xce, 0x6b, 0x9e, 0x42, 0x43, 0x6f, 0x71, 0xcc, 0x45, 0xf0, 0xc0, 0xf4, 0xbb, + 0x60, 0x8f, 0x96, 0x73, 0x93, 0xdb, 0xde, 0x7f, 0x5c, 0xe8, 0x53, 0xe7, 0x29, 0x2f, 0xd1, 0x31, + 0x6a, 0x2f, 0x2e, 0x95, 0x54, 0x4f, 0xde, 0x22, 0xfa, 0xdc, 0xf9, 0xa9, 0x04, 0x35, 0xc2, 0x7e, + 0x58, 0xb0, 0x58, 0x9a, 0x4d, 0xeb, 0x63, 0xa6, 0x82, 0x15, 0x80, 0xbb, 0x50, 0x3d, 0x61, 0xf2, + 0x92, 0x07, 0xff, 0x74, 0x97, 0xf1, 0xe8, 0xbb, 0x92, 0x28, 0x75, 0x9b, 0x4f, 0x67, 0x2c, 0x91, + 0xb8, 0x3e, 0xe7, 0x55, 0x66, 0x17, 0x55, 0x96, 0x4d, 0x57, 0xc9, 0x4f, 0xe7, 0x40, 0xed, 0x84, + 0x5e, 0x0f, 0xc3, 0x1f, 0x99, 0x53, 0x35, 0xf1, 0x89, 0xa9, 0x3c, 0xa3, 0x70, 0xc6, 0xf8, 0x42, + 0x3a, 0xb5, 0x6d, 0x6b, 0xa7, 0x4c, 0x52, 0x53, 0x4d, 0x91, 0xea, 0xc9, 0x75, 0xea, 0x66, 0x8a, + 0x0c, 0xd0, 0x1c, 0xf8, 0x7d, 0xd7, 0x69, 0x98, 0x77, 0x49, 0x9d, 0x3b, 0x7f, 0x29, 0x91, 0xb1, + 0x78, 0xce, 0xa3, 0x98, 0xfd, 0xcf, 0x24, 0x1c, 0x40, 0xc3, 0x13, 0x82, 0x8b, 0x1e, 0x0f, 0x0c, + 0x13, 0xed, 0xfd, 0x8d, 0x7c, 0x4a, 0xe6, 0x24, 0xab, 0x38, 0xdc, 0x81, 0x96, 0x36, 0x4e, 0x58, + 0x1c, 0xd3, 0x09, 0x4b, 0xbe, 0xfe, 0x02, 0x96, 0x67, 0xb2, 0xf2, 0x00, 0x93, 0xd5, 0x3c, 0x93, + 0x4f, 0xa0, 0x71, 0x4c, 0x63, 0x69, 0x3c, 0x35, 0x33, 0x56, 0x06, 0x74, 0x5e, 0xe9, 0x37, 0xce, + 0x08, 0x26, 0x63, 0xe2, 0xb9, 0x52, 0x86, 0x42, 0x62, 0xc7, 0xd2, 0x5f, 0xc1, 0xc6, 0xbd, 0xe7, + 0x45, 0x79, 0x93, 0x2f, 0x36, 0x8d, 0xdd, 0xbd, 0x06, 0x58, 0x11, 0x81, 0x01, 0xaa, 0x3d, 0xe2, + 0x1d, 0x8e, 0x3c, 0xb4, 0x86, 0x9b, 0x50, 0x73, 0xbd, 0xe1, 0x88, 0x0c, 0xbe, 0x41, 0x96, 0x72, + 0x1c, 0x9e, 0x9e, 0x7a, 0xbe, 0x8b, 0x4a, 0xb8, 0x0e, 0x36, 0xf1, 0x0e, 0x5d, 0x54, 0xc6, 0x2d, + 0xa8, 0x8f, 0xc8, 0x99, 0xdf, 0x53, 0x09, 0x36, 0x46, 0xd0, 0x7a, 0xe1, 0x8d, 0x5e, 0x67, 0x48, + 0x45, 0x95, 0xe8, 0x0d, 0x7c, 0xdf, 0xeb, 0x8d, 0x50, 0x15, 0xb7, 0x01, 0x12, 0xe3, 0x35, 0x19, + 0xa0, 0xda, 0xae, 0x0f, 0xb0, 0xd2, 0xbc, 0xf2, 0x9e, 0xc5, 0x4c, 0x18, 0x04, 0xad, 0xa9, 0xd2, + 0xfd, 0x48, 0x32, 0x11, 0xd1, 0x29, 0xb2, 0xf0, 0x07, 0xd0, 0xd4, 0xdf, 0xe6, 0xd9, 0x3c, 0xa0, + 0x92, 0xa1, 0x12, 0x5e, 0x4f, 0x9e, 0x11, 0xc2, 0xbe, 0x67, 0x63, 0xc9, 0x02, 0x54, 0xde, 0xfd, + 0xb9, 0x94, 0xdb, 0x9e, 0xba, 0xda, 0xe7, 0xda, 0x34, 0xa3, 0x24, 0x7a, 0x43, 0x96, 0xaa, 0xdc, + 0xa3, 0xd1, 0x98, 0x4d, 0x59, 0x80, 0x4a, 0xaa, 0xe9, 0x7e, 0x74, 0x45, 0xa7, 0x61, 0xa0, 0x37, + 0x82, 0xca, 0x18, 0x43, 0x3b, 0x41, 0xd2, 0x1c, 0x3b, 0x87, 0x9d, 0xd2, 0xe5, 0x94, 0xd3, 0x00, + 0x55, 0xf0, 0x63, 0xc0, 0x45, 0x4c, 0x29, 0x1d, 0x55, 0x55, 0xfd, 0xac, 0xab, 0x9a, 0x6a, 0x54, + 0x17, 0xf6, 0xb9, 0x24, 0x8c, 0x06, 0x4b, 0x54, 0x57, 0x57, 0x0e, 0x97, 0xb1, 0x64, 0xb3, 0xde, + 0x94, 0xc7, 0x2c, 0x40, 0x0d, 0xfc, 0x31, 0x6c, 0xe8, 0xcd, 0x1e, 0x4e, 0x85, 0x8a, 0xc9, 0x1e, + 0x39, 0x14, 0x28, 0x5e, 0x06, 0x0b, 0x39, 0xb8, 0x20, 0x34, 0x9a, 0x30, 0xa4, 0x5e, 0x85, 0xb6, + 0xcf, 0x65, 0xee, 0xb5, 0x44, 0x17, 0x78, 0x03, 0xd0, 0x40, 0x5e, 0x32, 0x61, 0xaa, 0x9a, 0xa1, + 0x7f, 0xaf, 0x1d, 0x79, 0x37, 0xbf, 0x6d, 0xad, 0xbd, 0xbd, 0xdd, 0xb2, 0x6e, 0x6e, 0xb7, 0xac, + 0x5f, 0x6f, 0xb7, 0xac, 0x6f, 0x0f, 0x72, 0xbf, 0x2d, 0x33, 0x2a, 0x45, 0x78, 0xcd, 0x45, 0x38, + 0x09, 0xa3, 0xd4, 0x88, 0xd8, 0xde, 0xfc, 0xcd, 0x64, 0x6f, 0x7e, 0xbe, 0xb7, 0x92, 0xce, 0x79, + 0x55, 0xff, 0xc3, 0x1c, 0xfc, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x8f, 0x96, 0x66, 0x53, 0x12, 0x09, + 0x00, 0x00, } func (m *LogShardInfo) Marshal() (dAtA []byte, err error) { @@ -1350,43 +1368,48 @@ func (m *Request) MarshalToSizedBuffer(dAtA []byte) (int, error) { if m.DNID != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.DNID)) i-- - dAtA[i] = 0x40 + dAtA[i] = 0x48 } if m.DNShardID != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.DNShardID)) i-- - dAtA[i] = 0x38 + dAtA[i] = 0x40 } if m.Timeout != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.Timeout)) i-- - dAtA[i] = 0x30 + dAtA[i] = 0x38 } if m.MaxSize != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.MaxSize)) i-- - dAtA[i] = 0x28 + dAtA[i] = 0x30 } if m.Index != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.Index)) i-- - dAtA[i] = 0x20 + dAtA[i] = 0x28 } if m.ShardID != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.ShardID)) i-- - dAtA[i] = 0x18 + dAtA[i] = 0x20 } if len(m.Name) > 0 { i -= len(m.Name) copy(dAtA[i:], m.Name) i = encodeVarintLogservice(dAtA, i, uint64(len(m.Name))) i-- - dAtA[i] = 0x12 + dAtA[i] = 0x1a } if m.Method != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.Method)) i-- + dAtA[i] = 0x10 + } + if m.RequestID != 0 { + i = encodeVarintLogservice(dAtA, i, uint64(m.RequestID)) + i-- dAtA[i] = 0x8 } return len(dAtA) - i, nil @@ -1419,33 +1442,38 @@ func (m *Response) MarshalToSizedBuffer(dAtA []byte) (int, error) { if m.LastIndex != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.LastIndex)) i-- - dAtA[i] = 0x30 + dAtA[i] = 0x38 } if m.Index != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.Index)) i-- - dAtA[i] = 0x28 + dAtA[i] = 0x30 } if m.ShardID != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.ShardID)) i-- - dAtA[i] = 0x20 + dAtA[i] = 0x28 } if len(m.ErrorMessage) > 0 { i -= len(m.ErrorMessage) copy(dAtA[i:], m.ErrorMessage) i = encodeVarintLogservice(dAtA, i, uint64(len(m.ErrorMessage))) i-- - dAtA[i] = 0x1a + dAtA[i] = 0x22 } if m.ErrorCode != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.ErrorCode)) i-- - dAtA[i] = 0x10 + dAtA[i] = 0x18 } if m.Method != 0 { i = encodeVarintLogservice(dAtA, i, uint64(m.Method)) i-- + dAtA[i] = 0x10 + } + if m.RequestID != 0 { + i = encodeVarintLogservice(dAtA, i, uint64(m.RequestID)) + i-- dAtA[i] = 0x8 } return len(dAtA) - i, nil @@ -1683,6 +1711,9 @@ func (m *Request) Size() (n int) { } var l int _ = l + if m.RequestID != 0 { + n += 1 + sovLogservice(uint64(m.RequestID)) + } if m.Method != 0 { n += 1 + sovLogservice(uint64(m.Method)) } @@ -1720,6 +1751,9 @@ func (m *Response) Size() (n int) { } var l int _ = l + if m.RequestID != 0 { + n += 1 + sovLogservice(uint64(m.RequestID)) + } if m.Method != 0 { n += 1 + sovLogservice(uint64(m.Method)) } @@ -2891,6 +2925,25 @@ func (m *Request) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) + } + m.RequestID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogservice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestID |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType) } @@ -2909,7 +2962,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { break } } - case 2: + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) } @@ -2941,7 +2994,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { } m.Name = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 3: + case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ShardID", wireType) } @@ -2960,7 +3013,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { break } } - case 4: + case 5: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) } @@ -2979,7 +3032,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { break } } - case 5: + case 6: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field MaxSize", wireType) } @@ -2998,7 +3051,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { break } } - case 6: + case 7: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Timeout", wireType) } @@ -3017,7 +3070,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { break } } - case 7: + case 8: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field DNShardID", wireType) } @@ -3036,7 +3089,7 @@ func (m *Request) Unmarshal(dAtA []byte) error { break } } - case 8: + case 9: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field DNID", wireType) } @@ -3107,6 +3160,25 @@ func (m *Response) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) + } + m.RequestID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogservice + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestID |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType) } @@ -3125,7 +3197,7 @@ func (m *Response) Unmarshal(dAtA []byte) error { break } } - case 2: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ErrorCode", wireType) } @@ -3144,7 +3216,7 @@ func (m *Response) Unmarshal(dAtA []byte) error { break } } - case 3: + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field ErrorMessage", wireType) } @@ -3176,7 +3248,7 @@ func (m *Response) Unmarshal(dAtA []byte) error { } m.ErrorMessage = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 4: + case 5: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ShardID", wireType) } @@ -3195,7 +3267,7 @@ func (m *Response) Unmarshal(dAtA []byte) error { break } } - case 5: + case 6: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) } @@ -3214,7 +3286,7 @@ func (m *Response) Unmarshal(dAtA []byte) error { break } } - case 6: + case 7: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field LastIndex", wireType) } diff --git a/pkg/pb/metadata/metadata.pb.go b/pkg/pb/metadata/metadata.pb.go index cca8a473e1d71068317ab3419c818112d54b7c59..51843c03c1f06f0d33044eaff8d26609b265e733 100644 --- a/pkg/pb/metadata/metadata.pb.go +++ b/pkg/pb/metadata/metadata.pb.go @@ -5,11 +5,12 @@ package metadata import ( fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/pb/metric/metric.pb.go b/pkg/pb/metric/metric.pb.go index 70c4ea31c03890c7cb90d73b0841e96b20ce926e..5b8b208de50e70e4c747c39cb9bc8d1eb77e9b0d 100644 --- a/pkg/pb/metric/metric.pb.go +++ b/pkg/pb/metric/metric.pb.go @@ -6,10 +6,11 @@ package metric import ( encoding_binary "encoding/binary" fmt "fmt" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/pb/plan/plan.pb.go b/pkg/pb/plan/plan.pb.go index 188b55808b101a5531b82e89f91a0bf7f1e3a180..7d38841e57f26b56b0f007e60948346ec6a12b94 100644 --- a/pkg/pb/plan/plan.pb.go +++ b/pkg/pb/plan/plan.pb.go @@ -6,11 +6,12 @@ package plan import ( encoding_binary "encoding/binary" fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/pb/timestamp/timestamp.pb.go b/pkg/pb/timestamp/timestamp.pb.go index 15548ce467ea932ce38a596ae983939a6c92dc00..5e13dcc4f21452f893164d80af3afe3f99e1ce0b 100644 --- a/pkg/pb/timestamp/timestamp.pb.go +++ b/pkg/pb/timestamp/timestamp.pb.go @@ -5,10 +5,11 @@ package timestamp import ( fmt "fmt" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/pb/txn/txn.go b/pkg/pb/txn/txn.go index 65202882a1b922f061b2190f3afb24edf62f6ba7..f205d49772fb9369a50161f989a2b7eea0ad38be 100644 --- a/pkg/pb/txn/txn.go +++ b/pkg/pb/txn/txn.go @@ -183,7 +183,12 @@ func (m TxnRequest) GetTargetDN() metadata.DNShard { } // ID implement morpc Messgae -func (m TxnRequest) ID() []byte { +func (m *TxnRequest) SetID(id uint64) { + m.RequestID = id +} + +// ID implement morpc Messgae +func (m *TxnRequest) GetID() uint64 { return m.RequestID } @@ -203,6 +208,11 @@ func (m *TxnRequest) SetPayloadField(data []byte) { } // ID implement morpc Messgae -func (m TxnResponse) ID() []byte { +func (m *TxnResponse) SetID(id uint64) { + m.RequestID = id +} + +// ID implement morpc Messgae +func (m *TxnResponse) GetID() uint64 { return m.RequestID } diff --git a/pkg/pb/txn/txn.pb.go b/pkg/pb/txn/txn.pb.go index 8dbd3747517fa916cf6832cda7fcb247118a33a9..c0c5d9d1644f7c3f6dd6e8a38dd80a9deca211d3 100644 --- a/pkg/pb/txn/txn.pb.go +++ b/pkg/pb/txn/txn.pb.go @@ -5,13 +5,14 @@ package txn import ( fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" metadata "github.com/matrixorigin/matrixone/pkg/pb/metadata" timestamp "github.com/matrixorigin/matrixone/pkg/pb/timestamp" - io "io" - math "math" - math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used. @@ -466,7 +467,7 @@ func (m *CNOpResponse) GetPayload() []byte { // 2. DN -> DN (TxnMethod.Prepare, TxnMethod.GetStatus, TxnMethod.CommitDNShard, TxnMethod.RollbackDNShard) type TxnRequest struct { // RequestID request id - RequestID []byte `protobuf:"bytes,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"` + RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"` // Txn transaction metadata Txn TxnMeta `protobuf:"bytes,2,opt,name=Txn,proto3" json:"Txn"` // TxnMethod TxnRequest opCode, select the Request defined below according to TxnMethod. @@ -525,11 +526,11 @@ func (m *TxnRequest) XXX_DiscardUnknown() { var xxx_messageInfo_TxnRequest proto.InternalMessageInfo -func (m *TxnRequest) GetRequestID() []byte { +func (m *TxnRequest) GetRequestID() uint64 { if m != nil { return m.RequestID } - return nil + return 0 } func (m *TxnRequest) GetTxn() TxnMeta { @@ -605,7 +606,7 @@ func (m *TxnRequest) GetRollbackDNShardRequest() *TxnRollbackDNShardRequest { // TxnResponse response of TxnRequest. type TxnResponse struct { // RequestID corresponding request id - RequestID []byte `protobuf:"bytes,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"` + RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"` // Txn transaction metadata. TxnResponse.TxnMeta and TxnRequest.TxnMeta may differ // in that the node initiating the TxnRequest needs to process the returned TxnMeta, // e.g. to determine whether the transaction is Aborted by the status of the returned @@ -669,11 +670,11 @@ func (m *TxnResponse) XXX_DiscardUnknown() { var xxx_messageInfo_TxnResponse proto.InternalMessageInfo -func (m *TxnResponse) GetRequestID() []byte { +func (m *TxnResponse) GetRequestID() uint64 { if m != nil { return m.RequestID } - return nil + return 0 } func (m *TxnResponse) GetTxn() *TxnMeta { @@ -1379,7 +1380,7 @@ func init() { func init() { proto.RegisterFile("txn.proto", fileDescriptor_4f782e76b37adb9a) } var fileDescriptor_4f782e76b37adb9a = []byte{ - // 1032 bytes of a gzipped FileDescriptorProto + // 1033 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xdd, 0x6e, 0xe3, 0x44, 0x14, 0xae, 0xf3, 0xeb, 0x9c, 0xfc, 0xac, 0x7b, 0xfa, 0x97, 0xad, 0x4a, 0xb6, 0xb2, 0x56, 0xab, 0x52, 0x41, 0xc2, 0xb6, 0x70, 0x03, 0x52, 0xa5, 0x6e, 0xb2, 0x2c, 0x95, 0xd8, 0xb4, 0x9a, 0x44, @@ -1408,43 +1409,43 @@ var fileDescriptor_4f782e76b37adb9a = []byte{ 0x98, 0xdc, 0x5c, 0x95, 0xa8, 0x15, 0xd6, 0xa1, 0x78, 0x45, 0x6f, 0xc7, 0x9c, 0x3a, 0x52, 0x05, 0x15, 0xa2, 0x97, 0xd8, 0x82, 0x42, 0x9f, 0x06, 0x43, 0x26, 0xd4, 0x2f, 0x4f, 0xdd, 0xa3, 0x0a, 0xb3, 0x8f, 0xa0, 0x12, 0x65, 0x9c, 0xfa, 0xdc, 0x9b, 0xc6, 0xa8, 0x8d, 0x18, 0xb5, 0xfd, 0x4f, - 0x0e, 0xa0, 0x3f, 0xf3, 0x74, 0x6d, 0xb2, 0x35, 0xf2, 0x73, 0xae, 0xcf, 0x85, 0x41, 0xff, 0x93, - 0xcc, 0xfd, 0xff, 0xe4, 0x19, 0x14, 0x5e, 0x33, 0x31, 0xe2, 0x8e, 0xac, 0x76, 0x49, 0xcc, 0x91, - 0x95, 0x28, 0x2f, 0x22, 0xe4, 0xbe, 0x1c, 0xd3, 0xa1, 0x6c, 0x6e, 0x95, 0xc8, 0x6f, 0x6c, 0x42, - 0xa9, 0xdd, 0x55, 0x09, 0x95, 0xde, 0x2c, 0x09, 0x5f, 0x6a, 0x20, 0x59, 0x84, 0xe0, 0x17, 0x50, - 0x8d, 0x84, 0xa7, 0x31, 0x91, 0xdc, 0x76, 0x74, 0xca, 0x98, 0x93, 0xc4, 0x63, 0xf1, 0x1c, 0x1e, - 0x11, 0x3e, 0x1e, 0x0f, 0xe8, 0xf5, 0x1b, 0x0d, 0x2f, 0x4a, 0xf8, 0x9e, 0x86, 0x27, 0xdc, 0x24, - 0x19, 0x8f, 0x67, 0x50, 0x53, 0x87, 0x45, 0x33, 0x98, 0x92, 0x61, 0x57, 0x33, 0xc4, 0xbd, 0x24, - 0x11, 0x8d, 0x1d, 0xb0, 0x5e, 0x31, 0xa1, 0x4e, 0xb9, 0x62, 0x28, 0x49, 0x86, 0xba, 0x66, 0x48, - 0xfa, 0xc9, 0x0a, 0x02, 0xaf, 0x60, 0x3b, 0xda, 0x99, 0x3e, 0x5b, 0x8a, 0x09, 0x24, 0xd3, 0x41, - 0xbc, 0x19, 0xf1, 0x18, 0xb2, 0x16, 0x89, 0xdf, 0xc0, 0xae, 0xde, 0x6a, 0x82, 0xb3, 0x2c, 0x39, - 0x1b, 0xc9, 0x0e, 0x25, 0x58, 0x53, 0xd0, 0xf6, 0xef, 0x79, 0x28, 0x4b, 0xb9, 0x29, 0x61, 0xde, - 0xaf, 0xb7, 0x46, 0xaa, 0xde, 0xde, 0x5f, 0x69, 0x1f, 0x82, 0xd9, 0x9f, 0x79, 0x2f, 0x83, 0x80, - 0x07, 0x4a, 0x68, 0x55, 0x8d, 0x96, 0x46, 0x32, 0x77, 0xe3, 0x67, 0xf1, 0xd3, 0x34, 0x1f, 0x69, - 0x0b, 0x5d, 0x46, 0x0e, 0x12, 0x3f, 0x74, 0x67, 0x50, 0xd3, 0x7a, 0x53, 0xc0, 0x62, 0x5c, 0x1b, - 0x71, 0x2f, 0x49, 0x44, 0x87, 0xda, 0x58, 0xc8, 0x4d, 0x31, 0x98, 0x71, 0x6d, 0x24, 0xfd, 0x64, - 0x05, 0x11, 0x8a, 0x7c, 0xae, 0x39, 0x45, 0x52, 0x8a, 0x8b, 0x3c, 0xe1, 0x26, 0xc9, 0x78, 0x7c, - 0x05, 0x9b, 0x4b, 0x92, 0x53, 0x24, 0x91, 0xb6, 0x1e, 0xaf, 0x51, 0xa9, 0xa2, 0x59, 0xc5, 0x60, - 0x0f, 0x76, 0x12, 0x6a, 0x53, 0x64, 0x91, 0xa8, 0x3e, 0x48, 0x11, 0xaa, 0x22, 0x5c, 0x8f, 0xc5, - 0xef, 0x60, 0x6f, 0x45, 0x6c, 0x8a, 0xb6, 0x22, 0x69, 0x9f, 0xa4, 0x6a, 0x55, 0x11, 0xa7, 0xe1, - 0xed, 0x09, 0x58, 0xc9, 0x19, 0x12, 0xbb, 0x71, 0x8c, 0xff, 0x7a, 0xe3, 0xac, 0xdc, 0x13, 0x99, - 0x75, 0xf7, 0xc4, 0x16, 0x6c, 0xae, 0xa8, 0xc2, 0xbe, 0x00, 0x5c, 0x1d, 0x44, 0x0f, 0xaa, 0xc2, - 0xde, 0x81, 0xad, 0x35, 0x9a, 0xb1, 0x7f, 0x91, 0x69, 0x13, 0x83, 0xe9, 0x39, 0x14, 0x15, 0x4e, - 0x5d, 0xc1, 0xa9, 0xfc, 0x3a, 0xee, 0x41, 0x77, 0xb1, 0xbd, 0x2d, 0xb7, 0x97, 0x50, 0x9c, 0xfd, - 0x95, 0xac, 0x74, 0x65, 0xce, 0xfd, 0xff, 0xa2, 0xec, 0x5d, 0xd8, 0x5e, 0xa7, 0x4e, 0xfb, 0x6b, - 0xd8, 0x4b, 0x99, 0x88, 0x0f, 0xc9, 0xb2, 0x0f, 0xf5, 0x34, 0xd9, 0xda, 0x5d, 0x78, 0x9c, 0x3a, - 0x27, 0x1f, 0x92, 0xeb, 0x00, 0xf6, 0xd3, 0xb5, 0x6c, 0xc3, 0x62, 0xac, 0x1d, 0xff, 0x00, 0xa5, - 0xf9, 0x13, 0x12, 0x01, 0x0a, 0xe7, 0xd7, 0xc2, 0xfd, 0x99, 0x59, 0x1b, 0x58, 0x01, 0x53, 0x3f, - 0xf1, 0x2c, 0x03, 0x6b, 0x00, 0x51, 0xe5, 0xc2, 0xf5, 0x86, 0x56, 0x06, 0xab, 0x50, 0x52, 0x6b, - 0xe6, 0x58, 0xd9, 0x30, 0xf8, 0x7c, 0xc0, 0x03, 0xe9, 0xcc, 0x61, 0x19, 0x8a, 0x72, 0xc5, 0x1c, - 0x2b, 0x7f, 0xfc, 0xab, 0x4c, 0xa0, 0x86, 0xac, 0x09, 0xb9, 0xf0, 0x11, 0x65, 0x6d, 0x60, 0x09, - 0xf2, 0xf2, 0x79, 0x64, 0x19, 0x61, 0xd6, 0x88, 0xcb, 0xca, 0x84, 0x44, 0xba, 0x6a, 0x2b, 0x1b, - 0x12, 0xa9, 0x1a, 0xac, 0x5c, 0x98, 0x72, 0xfe, 0x8b, 0xac, 0x3c, 0x6e, 0xea, 0x5b, 0x5d, 0xed, - 0xce, 0x2a, 0xe0, 0xd6, 0xe2, 0xae, 0xd6, 0xc6, 0xe2, 0x8b, 0xb3, 0x77, 0x7f, 0x37, 0x8c, 0xb7, - 0x77, 0x0d, 0xe3, 0xdd, 0x5d, 0xc3, 0xf8, 0xeb, 0xae, 0x61, 0x7c, 0xff, 0xd1, 0xd2, 0xa3, 0x7d, - 0x42, 0x45, 0xe0, 0xce, 0x78, 0xe0, 0x0e, 0x5d, 0x4f, 0x2f, 0x3c, 0xd6, 0xf2, 0xdf, 0x0c, 0x5b, - 0xfe, 0xa0, 0x25, 0x66, 0xde, 0xa0, 0x20, 0x5f, 0xe6, 0xa7, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, - 0x77, 0x9c, 0x72, 0xbc, 0xfb, 0x0b, 0x00, 0x00, + 0x0e, 0xa0, 0x3f, 0xf3, 0x74, 0x6d, 0xb2, 0x35, 0xf2, 0x53, 0xe9, 0x33, 0x47, 0x16, 0x06, 0xfd, + 0x4f, 0x32, 0xf7, 0xff, 0x93, 0x67, 0x50, 0x78, 0xcd, 0xc4, 0x88, 0x3b, 0xb2, 0xda, 0x25, 0x31, + 0x47, 0x56, 0xa2, 0xbc, 0x88, 0x90, 0xfb, 0x72, 0x4c, 0x87, 0xb2, 0xb9, 0x55, 0x22, 0xbf, 0xb1, + 0x09, 0xa5, 0x76, 0x57, 0x25, 0x54, 0x7a, 0xb3, 0x24, 0x7c, 0xa9, 0x81, 0x64, 0x11, 0x82, 0x5f, + 0x40, 0x35, 0x12, 0x9e, 0xc6, 0x44, 0x72, 0xdb, 0xd1, 0x29, 0x63, 0x4e, 0x12, 0x8f, 0xc5, 0x73, + 0x78, 0x44, 0xf8, 0x78, 0x3c, 0xa0, 0xd7, 0x6f, 0x34, 0xbc, 0x28, 0xe1, 0x7b, 0x1a, 0x9e, 0x70, + 0x93, 0x64, 0x3c, 0x9e, 0x41, 0x4d, 0x1d, 0x16, 0xcd, 0x60, 0x4a, 0x86, 0x5d, 0xcd, 0x10, 0xf7, + 0x92, 0x44, 0x34, 0x76, 0xc0, 0x7a, 0xc5, 0x84, 0x3a, 0xe5, 0x8a, 0xa1, 0x24, 0x19, 0xea, 0x9a, + 0x21, 0xe9, 0x27, 0x2b, 0x08, 0xbc, 0x82, 0xed, 0x68, 0x67, 0xfa, 0x6c, 0x29, 0x26, 0x90, 0x4c, + 0x07, 0xf1, 0x66, 0xc4, 0x63, 0xc8, 0x5a, 0x24, 0x7e, 0x03, 0xbb, 0x7a, 0xab, 0x09, 0xce, 0xb2, + 0xe4, 0x6c, 0x24, 0x3b, 0x94, 0x60, 0x4d, 0x41, 0xdb, 0xbf, 0xe7, 0xa1, 0x2c, 0xe5, 0xa6, 0x84, + 0x79, 0xbf, 0xde, 0x1a, 0xa9, 0x7a, 0x7b, 0x7f, 0xa5, 0x7d, 0x08, 0x66, 0x7f, 0xe6, 0xbd, 0x0c, + 0x02, 0x1e, 0x28, 0xa1, 0x55, 0x35, 0x5a, 0x1a, 0xc9, 0xdc, 0x8d, 0x9f, 0xc5, 0x4f, 0xd3, 0x7c, + 0xa4, 0x2d, 0x74, 0x19, 0x39, 0x48, 0xfc, 0xd0, 0x9d, 0x41, 0x4d, 0xeb, 0x4d, 0x01, 0x8b, 0x71, + 0x6d, 0xc4, 0xbd, 0x24, 0x11, 0x1d, 0x6a, 0x63, 0x21, 0x37, 0xc5, 0x60, 0xc6, 0xb5, 0x91, 0xf4, + 0x93, 0x15, 0x44, 0x28, 0xf2, 0xb9, 0xe6, 0x14, 0x49, 0x29, 0x2e, 0xf2, 0x84, 0x9b, 0x24, 0xe3, + 0xf1, 0x15, 0x6c, 0x2e, 0x49, 0x4e, 0x91, 0x44, 0xda, 0x7a, 0xbc, 0x46, 0xa5, 0x8a, 0x66, 0x15, + 0x83, 0x3d, 0xd8, 0x49, 0xa8, 0x4d, 0x91, 0x45, 0xa2, 0xfa, 0x20, 0x45, 0xa8, 0x8a, 0x70, 0x3d, + 0x16, 0xbf, 0x83, 0xbd, 0x15, 0xb1, 0x29, 0xda, 0x8a, 0xa4, 0x7d, 0x92, 0xaa, 0x55, 0x45, 0x9c, + 0x86, 0xb7, 0x27, 0x60, 0x25, 0x67, 0x48, 0xec, 0xc6, 0x31, 0xfe, 0xeb, 0x8d, 0xb3, 0x72, 0x4f, + 0x64, 0xd6, 0xdd, 0x13, 0x5b, 0xb0, 0xb9, 0xa2, 0x0a, 0xfb, 0x02, 0x70, 0x75, 0x10, 0x3d, 0xa8, + 0x0a, 0x7b, 0x07, 0xb6, 0xd6, 0x68, 0xc6, 0xfe, 0x45, 0xa6, 0x4d, 0x0c, 0xa6, 0xe7, 0x50, 0x54, + 0x38, 0x75, 0x05, 0xa7, 0xf2, 0xeb, 0xb8, 0x07, 0xdd, 0xc5, 0xf6, 0xb6, 0xdc, 0x5e, 0x42, 0x71, + 0xf6, 0x57, 0xb2, 0xd2, 0x95, 0x39, 0xf7, 0xff, 0x8b, 0xb2, 0x77, 0x61, 0x7b, 0x9d, 0x3a, 0xed, + 0xaf, 0x61, 0x2f, 0x65, 0x22, 0x3e, 0x24, 0xcb, 0x3e, 0xd4, 0xd3, 0x64, 0x6b, 0x77, 0xe1, 0x71, + 0xea, 0x9c, 0x7c, 0x48, 0xae, 0x03, 0xd8, 0x4f, 0xd7, 0xb2, 0x0d, 0x8b, 0xb1, 0x76, 0xfc, 0x03, + 0x94, 0xe6, 0x4f, 0x48, 0x04, 0x28, 0x9c, 0x5f, 0x0b, 0xf7, 0x67, 0x66, 0x6d, 0x60, 0x05, 0x4c, + 0xfd, 0xc4, 0xb3, 0x0c, 0xac, 0x01, 0x44, 0x95, 0x0b, 0xd7, 0x1b, 0x5a, 0x19, 0xac, 0x42, 0x49, + 0xad, 0x99, 0x63, 0x65, 0xc3, 0xe0, 0xf3, 0x01, 0x0f, 0xa4, 0x33, 0x87, 0x65, 0x28, 0xca, 0x15, + 0x73, 0xac, 0xfc, 0xf1, 0xaf, 0x32, 0x81, 0x1a, 0xb2, 0x26, 0xe4, 0xc2, 0x47, 0x94, 0xb5, 0x81, + 0x25, 0xc8, 0xcb, 0xe7, 0x91, 0x65, 0x84, 0x59, 0x23, 0x2e, 0x2b, 0x13, 0x12, 0xe9, 0xaa, 0xad, + 0x6c, 0x48, 0xa4, 0x6a, 0xb0, 0x72, 0x61, 0xca, 0xf9, 0x2f, 0xb2, 0xf2, 0xb8, 0xa9, 0x6f, 0x75, + 0xb5, 0x3b, 0xab, 0x80, 0x5b, 0x8b, 0xbb, 0x5a, 0x1b, 0x8b, 0x2f, 0xce, 0xde, 0xfd, 0xdd, 0x30, + 0xde, 0xde, 0x35, 0x8c, 0x77, 0x77, 0x0d, 0xe3, 0xaf, 0xbb, 0x86, 0xf1, 0xfd, 0x47, 0x4b, 0x8f, + 0xf6, 0x09, 0x15, 0x81, 0x3b, 0xe3, 0x81, 0x3b, 0x74, 0x3d, 0xbd, 0xf0, 0x58, 0xcb, 0x7f, 0x33, + 0x6c, 0xf9, 0x83, 0x96, 0x98, 0x79, 0x83, 0x82, 0x7c, 0x99, 0x9f, 0xfe, 0x1b, 0x00, 0x00, 0xff, + 0xff, 0x8d, 0x41, 0xe2, 0x5d, 0xfb, 0x0b, 0x00, 0x00, } func (m *TxnMeta) Marshal() (dAtA []byte, err error) { @@ -1824,12 +1825,10 @@ func (m *TxnRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { } i-- dAtA[i] = 0x12 - if len(m.RequestID) > 0 { - i -= len(m.RequestID) - copy(dAtA[i:], m.RequestID) - i = encodeVarintTxn(dAtA, i, uint64(len(m.RequestID))) + if m.RequestID != 0 { + i = encodeVarintTxn(dAtA, i, uint64(m.RequestID)) i-- - dAtA[i] = 0xa + dAtA[i] = 0x8 } return len(dAtA) - i, nil } @@ -1976,12 +1975,10 @@ func (m *TxnResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x12 } - if len(m.RequestID) > 0 { - i -= len(m.RequestID) - copy(dAtA[i:], m.RequestID) - i = encodeVarintTxn(dAtA, i, uint64(len(m.RequestID))) + if m.RequestID != 0 { + i = encodeVarintTxn(dAtA, i, uint64(m.RequestID)) i-- - dAtA[i] = 0xa + dAtA[i] = 0x8 } return len(dAtA) - i, nil } @@ -2545,9 +2542,8 @@ func (m *TxnRequest) Size() (n int) { } var l int _ = l - l = len(m.RequestID) - if l > 0 { - n += 1 + l + sovTxn(uint64(l)) + if m.RequestID != 0 { + n += 1 + sovTxn(uint64(m.RequestID)) } l = m.Txn.Size() n += 1 + l + sovTxn(uint64(l)) @@ -2597,9 +2593,8 @@ func (m *TxnResponse) Size() (n int) { } var l int _ = l - l = len(m.RequestID) - if l > 0 { - n += 1 + l + sovTxn(uint64(l)) + if m.RequestID != 0 { + n += 1 + sovTxn(uint64(m.RequestID)) } if m.Txn != nil { l = m.Txn.Size() @@ -3515,10 +3510,10 @@ func (m *TxnRequest) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 2 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) } - var byteLen int + m.RequestID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowTxn @@ -3528,26 +3523,11 @@ func (m *TxnRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= int(b&0x7F) << shift + m.RequestID |= uint64(b&0x7F) << shift if b < 0x80 { break } } - if byteLen < 0 { - return ErrInvalidLengthTxn - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthTxn - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RequestID = append(m.RequestID[:0], dAtA[iNdEx:postIndex]...) - if m.RequestID == nil { - m.RequestID = []byte{} - } - iNdEx = postIndex case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType) @@ -3923,10 +3903,10 @@ func (m *TxnResponse) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 2 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) } - var byteLen int + m.RequestID = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowTxn @@ -3936,26 +3916,11 @@ func (m *TxnResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= int(b&0x7F) << shift + m.RequestID |= uint64(b&0x7F) << shift if b < 0x80 { break } } - if byteLen < 0 { - return ErrInvalidLengthTxn - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthTxn - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RequestID = append(m.RequestID[:0], dAtA[iNdEx:postIndex]...) - if m.RequestID == nil { - m.RequestID = []byte{} - } - iNdEx = postIndex case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType) diff --git a/pkg/txn/rpc/sender.go b/pkg/txn/rpc/sender.go index 1760845f2145520636de2565d32496dd83ac6194..3995170c1ad9bb8fa7ec7ad5fc7400a0e6f0c94b 100644 --- a/pkg/txn/rpc/sender.go +++ b/pkg/txn/rpc/sender.go @@ -17,7 +17,6 @@ package rpc import ( "context" - "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/logutil" @@ -50,7 +49,6 @@ func (s *sender) Close() error { func (s *sender) Send(ctx context.Context, requests []txn.TxnRequest) ([]txn.TxnResponse, error) { if len(requests) == 1 { - requests[0].RequestID = s.getUUID() resp, err := s.doSend(ctx, requests[0]) if err != nil { return nil, err @@ -113,11 +111,6 @@ func (s *sender) doSend(ctx context.Context, request txn.TxnRequest) (txn.TxnRes return *(v.(*txn.TxnResponse)), nil } -func (s *sender) getUUID() []byte { - id := uuid.New() - return id[:] -} - type executor struct { ctx context.Context stream morpc.Stream diff --git a/pkg/txn/rpc/sender_test.go b/pkg/txn/rpc/sender_test.go index 94842b08f56a3e0ad463d9ffbe9eb2da334529ec..e04fdc979651a546679e1f7fb0cf182cedfaac9e 100644 --- a/pkg/txn/rpc/sender_test.go +++ b/pkg/txn/rpc/sender_test.go @@ -15,7 +15,6 @@ package rpc import ( - "bytes" "context" "errors" "fmt" @@ -24,7 +23,6 @@ import ( "testing" "time" - "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/pb/metadata" "github.com/matrixorigin/matrixone/pkg/pb/txn" @@ -45,7 +43,7 @@ func TestSendWithSingleRequest(t *testing.T) { s.RegisterRequestHandler(func(request morpc.Message, sequence uint64, cs morpc.ClientSession) error { return cs.Write(&txn.TxnResponse{ - RequestID: request.ID(), + RequestID: request.GetID(), Method: txn.TxnMethod_Write, }, morpc.SendOptions{}) }) @@ -84,7 +82,7 @@ func TestSendWithMultiDN(t *testing.T) { s.RegisterRequestHandler(func(m morpc.Message, sequence uint64, cs morpc.ClientSession) error { request := m.(*txn.TxnRequest) return cs.Write(&txn.TxnResponse{ - RequestID: request.ID(), + RequestID: request.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: []byte(fmt.Sprintf("%s-%d", request.GetTargetDN().Address, sequence))}, }, morpc.SendOptions{}) }) @@ -129,7 +127,7 @@ func TestSendWithMultiDN(t *testing.T) { } func TestNewExecutor(t *testing.T) { - ts := newTestStream(nil) + ts := newTestStream(1, nil) defer func() { assert.NoError(t, ts.Close()) }() @@ -139,7 +137,7 @@ func TestNewExecutor(t *testing.T) { } func TestNewExectorWithClosedStream(t *testing.T) { - ts := newTestStream(nil) + ts := newTestStream(1, nil) assert.NoError(t, ts.Close()) _, err := newExecutor(context.Background(), nil, ts) assert.Error(t, err) @@ -147,7 +145,7 @@ func TestNewExectorWithClosedStream(t *testing.T) { func TestExecute(t *testing.T) { var requests []morpc.Message - ts := newTestStream(func(m morpc.Message) (morpc.Message, error) { + ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) { requests = append(requests, m) return m, nil }) @@ -165,7 +163,7 @@ func TestExecute(t *testing.T) { assert.Equal(t, n, len(requests)) for i := 0; i < n; i++ { - assert.Equal(t, ts.id, requests[i].ID()) + assert.Equal(t, ts.id, requests[i].GetID()) } assert.Equal(t, n, len(exec.indexes)) @@ -175,7 +173,7 @@ func TestExecute(t *testing.T) { } func TestExecuteWithClosedStream(t *testing.T) { - ts := newTestStream(nil) + ts := newTestStream(1, nil) exec, err := newExecutor(context.Background(), nil, ts) assert.NoError(t, err) @@ -185,9 +183,9 @@ func TestExecuteWithClosedStream(t *testing.T) { } func TestWaitCompleted(t *testing.T) { - ts := newTestStream(func(m morpc.Message) (morpc.Message, error) { + ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) { req := m.(*txn.TxnRequest) - return &txn.TxnResponse{RequestID: m.ID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil + return &txn.TxnResponse{RequestID: m.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil }) defer func() { assert.NoError(t, ts.Close()) @@ -208,9 +206,9 @@ func TestWaitCompleted(t *testing.T) { } func TestWaitCompletedWithContextDone(t *testing.T) { - ts := newTestStream(func(m morpc.Message) (morpc.Message, error) { + ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) { req := m.(*txn.TxnRequest) - return &txn.TxnResponse{RequestID: m.ID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil + return &txn.TxnResponse{RequestID: m.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil }) defer func() { assert.NoError(t, ts.Close()) @@ -227,9 +225,9 @@ func TestWaitCompletedWithContextDone(t *testing.T) { } func TestWaitCompletedWithStreamClosed(t *testing.T) { - ts := newTestStream(func(m morpc.Message) (morpc.Message, error) { + ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) { req := m.(*txn.TxnRequest) - return &txn.TxnResponse{RequestID: m.ID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil + return &txn.TxnResponse{RequestID: m.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil }) ctx, cancel := context.WithCancel(context.Background()) @@ -244,7 +242,7 @@ func TestWaitCompletedWithStreamClosed(t *testing.T) { } type testStream struct { - id []byte + id uint64 c chan morpc.Message handle func(morpc.Message) (morpc.Message, error) @@ -254,17 +252,16 @@ type testStream struct { } } -func newTestStream(handle func(morpc.Message) (morpc.Message, error)) *testStream { - id := uuid.New() +func newTestStream(id uint64, handle func(morpc.Message) (morpc.Message, error)) *testStream { return &testStream{ - id: id[:], + id: id, c: make(chan morpc.Message, 1024), handle: handle, } } func (s *testStream) Send(request morpc.Message, opts morpc.SendOptions) error { - if !bytes.Equal(s.id, request.ID()) { + if s.id != request.GetID() { panic("request.id != stream.id") } @@ -304,7 +301,7 @@ func (s *testStream) Close() error { return nil } -func (s *testStream) ID() []byte { +func (s *testStream) ID() uint64 { return s.id } diff --git a/proto/logservice.proto b/proto/logservice.proto index 5d5aca46168aeeb4e96801a3c9cca45e1a41339c..c77892293800bcc972ed3afe0d00e777a169a879 100644 --- a/proto/logservice.proto +++ b/proto/logservice.proto @@ -116,15 +116,16 @@ message LogRecord { }; message Request { - MethodType Method = 1; - string Name = 2; - uint64 ShardID = 3; - uint64 Index = 4; - uint64 MaxSize = 5; - int64 Timeout = 6; - - uint64 DNShardID = 7; - uint64 DNID = 8; + uint64 RequestID = 1; + MethodType Method = 2; + string Name = 3; + uint64 ShardID = 4; + uint64 Index = 5; + uint64 MaxSize = 6; + int64 Timeout = 7; + + uint64 DNShardID = 8; + uint64 DNID = 9; }; enum ErrorCode { @@ -147,12 +148,13 @@ enum ErrorCode { }; message Response { - MethodType Method = 1; - ErrorCode ErrorCode = 2; - string ErrorMessage = 3; - uint64 ShardID = 4; - uint64 Index = 5; - uint64 LastIndex = 6; + uint64 RequestID = 1; + MethodType Method = 2; + ErrorCode ErrorCode = 3; + string ErrorMessage = 4; + uint64 ShardID = 5; + uint64 Index = 6; + uint64 LastIndex = 7; }; message LogRecordResponse { diff --git a/proto/txn.proto b/proto/txn.proto index 6349fd995d8edd61386c74730e65bc05be1ff157..e5b3c8f2bd498d59dff78725fe1841a33cc7c0ba 100644 --- a/proto/txn.proto +++ b/proto/txn.proto @@ -153,7 +153,7 @@ enum TxnMethod { // 2. DN -> DN (TxnMethod.Prepare, TxnMethod.GetStatus, TxnMethod.CommitDNShard, TxnMethod.RollbackDNShard) message TxnRequest { // RequestID request id - bytes RequestID = 1; + uint64 RequestID = 1; // Txn transaction metadata TxnMeta Txn = 2 [(gogoproto.nullable) = false]; // TxnMethod TxnRequest opCode, select the Request defined below according to TxnMethod. @@ -179,7 +179,7 @@ message TxnRequest { // TxnResponse response of TxnRequest. message TxnResponse { // RequestID corresponding request id - bytes RequestID = 1; + uint64 RequestID = 1; // Txn transaction metadata. TxnResponse.TxnMeta and TxnRequest.TxnMeta may differ // in that the node initiating the TxnRequest needs to process the returned TxnMeta, // e.g. to determine whether the transaction is Aborted by the status of the returned