From 08db8ed16508874079bc7c173745ab348a1d8242 Mon Sep 17 00:00:00 2001
From: fagongzi <zhangxu19830126@gmail.com>
Date: Wed, 29 Jun 2022 21:17:58 +0800
Subject: [PATCH] morpc: use uint64 as the ID of the message to avoid mem
 allocation (#3539)

* morpc: use uint64 as the ID of the message

* Fix DATA RACE

* Refector logservice morpc
---
 go.mod                                        |   2 +-
 go.sum                                        |   4 +-
 pkg/common/morpc/backend.go                   |  49 +--
 pkg/common/morpc/backend_test.go              |  34 ++-
 pkg/common/morpc/codec_test.go                |   4 +-
 pkg/common/morpc/examples/message/message.go  |  35 +--
 .../morpc/examples/message/message_test.go    |   2 +-
 pkg/common/morpc/examples/pingpong/main.go    |   2 +-
 pkg/common/morpc/examples/stream/main.go      |   2 +-
 pkg/common/morpc/future.go                    |   9 +-
 pkg/common/morpc/future_test.go               |  14 +-
 pkg/common/morpc/server.go                    |  19 +-
 pkg/common/morpc/server_test.go               |   6 +-
 pkg/common/morpc/types.go                     |  10 +-
 pkg/logservice/service.go                     |  17 +-
 pkg/pb/logservice/logservice.pb.go            | 284 +++++++++++-------
 pkg/pb/metadata/metadata.pb.go                |   5 +-
 pkg/pb/metric/metric.pb.go                    |   3 +-
 pkg/pb/plan/plan.pb.go                        |   5 +-
 pkg/pb/timestamp/timestamp.pb.go              |   3 +-
 pkg/pb/txn/txn.go                             |  14 +-
 pkg/pb/txn/txn.pb.go                          | 163 ++++------
 pkg/txn/rpc/sender.go                         |   7 -
 pkg/txn/rpc/sender_test.go                    |  39 ++-
 proto/logservice.proto                        |  32 +-
 proto/txn.proto                               |   4 +-
 26 files changed, 413 insertions(+), 355 deletions(-)

diff --git a/go.mod b/go.mod
index 1cd6d693d..9216a645c 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
 	github.com/cespare/xxhash/v2 v2.1.2
 	github.com/cockroachdb/pebble v0.0.0-20220407171941-2120d145e292
 	github.com/fagongzi/goetty v1.13.0
-	github.com/fagongzi/goetty/v2 v2.0.3-0.20220624135603-5923140593a7
+	github.com/fagongzi/goetty/v2 v2.0.3-0.20220629100032-f8e8c3c38772
 	github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d
 	github.com/go-sql-driver/mysql v1.6.0
 	github.com/gogo/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 41569ceec..7d1802ac8 100644
--- a/go.sum
+++ b/go.sum
@@ -213,8 +213,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
 github.com/fagongzi/goetty v1.13.0 h1:k1RT+32/O97zu370BhHk8sL/kSAqF2ebSY1Nj+x/dBg=
 github.com/fagongzi/goetty v1.13.0/go.mod h1:M1HA6OyQA3/4bV70W588Uek6sF7NxgxT/3gGhlf0yj0=
-github.com/fagongzi/goetty/v2 v2.0.3-0.20220624135603-5923140593a7 h1:BjZR38MOC8566qCTsU6Hign3a/jNkkx4kje9I5jBrrA=
-github.com/fagongzi/goetty/v2 v2.0.3-0.20220624135603-5923140593a7/go.mod h1:OwIBpVwRW1HjF/Jhc2Av3UvG2NygMg+bdqGxZaqwhU0=
+github.com/fagongzi/goetty/v2 v2.0.3-0.20220629100032-f8e8c3c38772 h1:J0QvYEFfz8rIwqvJTNphrHj+bdCPMdoPRj9SNAWFxjM=
+github.com/fagongzi/goetty/v2 v2.0.3-0.20220629100032-f8e8c3c38772/go.mod h1:OwIBpVwRW1HjF/Jhc2Av3UvG2NygMg+bdqGxZaqwhU0=
 github.com/fagongzi/util v0.0.0-20201116094402-221cc40c4593/go.mod h1:jYDIbpaqHXCCQ7QIDXRVfsQYAGKSNNb6N8BPTgdpcdE=
 github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d h1:1pILVCatHj3eVo9i52dZyY4BwjTmSIeN+/hoJh8rD0Y=
 github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d/go.mod h1:5cqSns2zMRcJeVGvAqeTrbXFqh5AqBFr5uVKP9T2kiE=
diff --git a/pkg/common/morpc/backend.go b/pkg/common/morpc/backend.go
index b69110110..e95b542ef 100644
--- a/pkg/common/morpc/backend.go
+++ b/pkg/common/morpc/backend.go
@@ -15,16 +15,13 @@
 package morpc
 
 import (
-	"bytes"
 	"context"
-	"encoding/hex"
 	"fmt"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/fagongzi/goetty/v2"
-	"github.com/fagongzi/util/hack"
-	"github.com/google/uuid"
 	"github.com/matrixorigin/matrixone/pkg/common/stop"
 	"github.com/matrixorigin/matrixone/pkg/logutil"
 	"go.uber.org/zap"
@@ -126,8 +123,12 @@ type remoteBackend struct {
 
 	mu struct {
 		sync.RWMutex
-		futures map[string]*Future
-		streams map[string]*stream
+		futures map[uint64]*Future
+		streams map[uint64]*stream
+	}
+
+	atomic struct {
+		id uint64
 	}
 }
 
@@ -157,8 +158,8 @@ func NewRemoteBackend(
 		},
 	}
 	rb.writeC = make(chan *Future, rb.options.bufferSize)
-	rb.mu.futures = make(map[string]*Future, rb.options.bufferSize)
-	rb.mu.streams = make(map[string]*stream, rb.options.bufferSize)
+	rb.mu.futures = make(map[uint64]*Future, rb.options.bufferSize)
+	rb.mu.streams = make(map[uint64]*stream, rb.options.bufferSize)
 	rb.conn = goetty.NewIOSession(rb.options.goettyOptions...)
 
 	if rb.options.connect {
@@ -211,6 +212,8 @@ func (rb *remoteBackend) adjust() {
 }
 
 func (rb *remoteBackend) Send(ctx context.Context, request Message, opts SendOptions) (*Future, error) {
+	request.SetID(rb.nextID())
+
 	f := rb.newFuture()
 	f.init(ctx, request, opts, false)
 	if err := rb.doSend(f); err != nil {
@@ -229,8 +232,8 @@ func (rb *remoteBackend) NewStream(receiveChanBuffer int) (Stream, error) {
 
 	rb.mu.Lock()
 	defer rb.mu.Unlock()
-	st := newStream(make(chan Message, receiveChanBuffer), rb.doSend)
-	rb.mu.streams[hack.SliceToString(st.ID())] = st
+	st := newStream(rb.nextID(), make(chan Message, receiveChanBuffer), rb.doSend)
+	rb.mu.streams[st.ID()] = st
 	return st, nil
 }
 
@@ -371,7 +374,7 @@ func (rb *remoteBackend) writeLoop(ctx context.Context) {
 						writeTimeout += f.opts.Timeout
 						if err := rb.conn.Write(f.request, goetty.WriteOptions{}); err != nil {
 							rb.logger.Error("write request failed",
-								zap.String("request-id", hex.EncodeToString(f.request.ID())),
+								zap.Uint64("request-id", f.request.GetID()),
 								zap.Error(err))
 							retry = true
 							written = 0
@@ -385,7 +388,7 @@ func (rb *remoteBackend) writeLoop(ctx context.Context) {
 					if err := rb.conn.Flush(writeTimeout); err != nil {
 						for _, f := range sendFutures {
 							rb.logger.Error("write request failed",
-								zap.String("request-id", hex.EncodeToString(f.request.ID())),
+								zap.Uint64("request-id", f.request.GetID()),
 								zap.Error(err))
 						}
 						retry = true
@@ -498,7 +501,7 @@ func (rb *remoteBackend) requestDone(response Message) {
 	rb.mu.Lock()
 	defer rb.mu.Unlock()
 
-	id := hack.SliceToString(response.ID())
+	id := response.GetID()
 	if f, ok := rb.mu.futures[id]; ok {
 		if !f.stream {
 			delete(rb.mu.futures, id)
@@ -508,18 +511,17 @@ func (rb *remoteBackend) requestDone(response Message) {
 }
 
 func (rb *remoteBackend) addFuture(f *Future) {
-	id := hack.SliceToString(f.request.ID())
 	rb.mu.Lock()
 	defer rb.mu.Unlock()
 
-	rb.mu.futures[id] = f
+	rb.mu.futures[f.request.GetID()] = f
 }
 
 func (rb *remoteBackend) releaseFuture(f *Future) {
 	rb.mu.Lock()
 	defer rb.mu.Unlock()
 
-	delete(rb.mu.futures, hack.SliceToString(f.id))
+	delete(rb.mu.futures, f.id)
 	if !f.stream {
 		f.reset()
 		rb.futurePool.Put(f)
@@ -593,6 +595,10 @@ func (rb *remoteBackend) newFuture() *Future {
 	return rb.futurePool.Get().(*Future)
 }
 
+func (rb *remoteBackend) nextID() uint64 {
+	return atomic.AddUint64(&rb.atomic.id, 1)
+}
+
 type goettyBasedBackendFactory struct {
 	codec   Codec
 	options []BackendOption
@@ -610,7 +616,7 @@ func (bf *goettyBasedBackendFactory) Create(remote string) (Backend, error) {
 }
 
 type stream struct {
-	id       []byte
+	id       uint64
 	ctx      context.Context
 	cancel   context.CancelFunc
 	c        chan Message
@@ -622,11 +628,10 @@ type stream struct {
 	}
 }
 
-func newStream(c chan Message, sendFunc func(f *Future) error) *stream {
+func newStream(id uint64, c chan Message, sendFunc func(f *Future) error) *stream {
 	ctx, cancel := context.WithCancel(context.Background())
-	id := uuid.New()
 	return &stream{
-		id:       id[:],
+		id:       id,
 		c:        c,
 		sendFunc: sendFunc,
 		ctx:      ctx,
@@ -635,7 +640,7 @@ func newStream(c chan Message, sendFunc func(f *Future) error) *stream {
 }
 
 func (s *stream) Send(request Message, opts SendOptions) error {
-	if !bytes.Equal(s.id, request.ID()) {
+	if s.id != request.GetID() {
 		panic("request.id != stream.id")
 	}
 
@@ -682,6 +687,6 @@ func (s *stream) Close() error {
 	return nil
 }
 
-func (s *stream) ID() []byte {
+func (s *stream) ID() uint64 {
 	return s.id
 }
diff --git a/pkg/common/morpc/backend_test.go b/pkg/common/morpc/backend_test.go
index 416066eb8..ac44cd90a 100644
--- a/pkg/common/morpc/backend_test.go
+++ b/pkg/common/morpc/backend_test.go
@@ -16,12 +16,14 @@ package morpc
 
 import (
 	"context"
+	"fmt"
 	"os"
 	"sync"
 	"testing"
 	"time"
 
 	"github.com/fagongzi/goetty/v2"
+	"github.com/fagongzi/goetty/v2/buf"
 	"github.com/matrixorigin/matrixone/pkg/logutil"
 	"github.com/stretchr/testify/assert"
 	"go.uber.org/zap"
@@ -40,7 +42,7 @@ func TestSend(t *testing.T) {
 		func(b *remoteBackend) {
 			ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 			defer cancel()
-			req := &testMessage{id: []byte("id1")}
+			req := newTestMessage(1)
 			f, err := b.Send(ctx, req, SendOptions{})
 			assert.NoError(t, err)
 			defer f.Close()
@@ -61,7 +63,7 @@ func TestSendWithResetConnAndRetry(t *testing.T) {
 		func(b *remoteBackend) {
 			ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 			defer cancel()
-			req := &testMessage{id: []byte("id1")}
+			req := &testMessage{id: 1}
 			f, err := b.Send(ctx, req, SendOptions{})
 			assert.NoError(t, err)
 			defer f.Close()
@@ -85,7 +87,7 @@ func TestSendWithTimeout(t *testing.T) {
 		func(b *remoteBackend) {
 			ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
 			defer cancel()
-			req := &testMessage{id: []byte("id1")}
+			req := &testMessage{id: 1}
 			f, err := b.Send(ctx, req, SendOptions{})
 			assert.NoError(t, err)
 			defer f.Close()
@@ -191,7 +193,7 @@ func TestBusy(t *testing.T) {
 
 			ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
 			defer cancel()
-			req := &testMessage{id: []byte("id1")}
+			req := &testMessage{id: 1}
 			f1, err := b.Send(ctx, req, SendOptions{})
 			assert.NoError(t, err)
 			defer f1.Close()
@@ -219,7 +221,7 @@ func TestBusy(t *testing.T) {
 func TestDoneWithClosedStreamCannotPanic(t *testing.T) {
 	var f *Future
 	c := make(chan Message, 1)
-	s := newStream(c, func(v *Future) error {
+	s := newStream(1, c, func(v *Future) error {
 		f = v
 		return nil
 	})
@@ -297,7 +299,7 @@ func (b *testBackend) Send(ctx context.Context, request Message, opts SendOption
 }
 
 func (b *testBackend) NewStream(bufferSize int) (Stream, error) {
-	return newStream(make(chan Message, bufferSize), func(f *Future) error {
+	return newStream(1, make(chan Message, bufferSize), func(f *Future) error {
 		return nil
 	}), nil
 }
@@ -306,33 +308,37 @@ func (b *testBackend) Close()     {}
 func (b *testBackend) Busy() bool { return b.busy }
 
 type testMessage struct {
-	id      []byte
+	id      uint64
 	payload []byte
 }
 
-func newTestMessage(id []byte) *testMessage {
+func newTestMessage(id uint64) *testMessage {
 	return &testMessage{id: id}
 }
 
-func (tm *testMessage) ID() []byte {
+func (tm *testMessage) SetID(id uint64) {
+	tm.id = id
+}
+
+func (tm *testMessage) GetID() uint64 {
 	return tm.id
 }
 
 func (tm *testMessage) DebugString() string {
-	return string(tm.id)
+	return fmt.Sprintf("%d", tm.id)
 }
 
 func (tm *testMessage) Size() int {
-	return len(tm.id)
+	return 8
 }
 
 func (tm *testMessage) MarshalTo(data []byte) (int, error) {
-	return copy(data, tm.id), nil
+	buf.Uint64ToBytesTo(tm.id, data)
+	return 8, nil
 }
 
 func (tm *testMessage) Unmarshal(data []byte) error {
-	tm.id = make([]byte, len(data))
-	copy(tm.id, data)
+	tm.id = buf.Byte2Uint64(data)
 	return nil
 }
 
diff --git a/pkg/common/morpc/codec_test.go b/pkg/common/morpc/codec_test.go
index f587753e0..f23d9bd39 100644
--- a/pkg/common/morpc/codec_test.go
+++ b/pkg/common/morpc/codec_test.go
@@ -25,7 +25,7 @@ func TestEncodeAndDecode(t *testing.T) {
 	codec := newTestCodec()
 	buf := buf.NewByteBuf(32)
 
-	msg := newTestMessage([]byte("1"))
+	msg := newTestMessage(1)
 	err := codec.Encode(msg, buf)
 	assert.NoError(t, err)
 
@@ -41,7 +41,7 @@ func TestEncodeAndDecodeWithPayload(t *testing.T) {
 	buf2 := buf.NewByteBuf(32)
 	buf1.SetSinkTo(buf2)
 
-	msg := newTestMessage([]byte("1"))
+	msg := newTestMessage(1)
 	msg.payload = []byte("payload")
 	err := codec.Encode(msg, buf1)
 	assert.NoError(t, err)
diff --git a/pkg/common/morpc/examples/message/message.go b/pkg/common/morpc/examples/message/message.go
index e52d9fe50..c2de16796 100644
--- a/pkg/common/morpc/examples/message/message.go
+++ b/pkg/common/morpc/examples/message/message.go
@@ -15,39 +15,36 @@
 package message
 
 import (
-	"encoding/hex"
-
 	"github.com/fagongzi/goetty/v2/buf"
 )
 
 // ExampleMessage example message
 type ExampleMessage struct {
-	MsgID   []byte
+	MsgID   uint64
 	Content string
 }
 
-func (tm *ExampleMessage) ID() []byte {
+func (tm *ExampleMessage) GetID() uint64 {
 	return tm.MsgID
 }
 
+func (tm *ExampleMessage) SetID(id uint64) {
+	tm.MsgID = id
+}
+
 func (tm *ExampleMessage) DebugString() string {
-	return hex.EncodeToString(tm.MsgID) + "," + string(tm.Content)
+	return ""
 }
 
 func (tm *ExampleMessage) Size() int {
-	return 8 + len(tm.MsgID) + len(tm.Content)
+	return 4 + 8 + len(tm.Content)
 }
 
 func (tm *ExampleMessage) MarshalTo(data []byte) (int, error) {
 	offset := 0
 
-	buf.Int2BytesTo(len(tm.MsgID), data[offset:])
-	offset += 4
-
-	if len(tm.MsgID) > 0 {
-		copy(data[offset:], tm.MsgID)
-		offset += len(tm.MsgID)
-	}
+	buf.Uint64ToBytesTo(tm.MsgID, data[offset:])
+	offset += 8
 
 	buf.Int2BytesTo(len(tm.Content), data[offset:])
 	offset += 4
@@ -63,16 +60,10 @@ func (tm *ExampleMessage) MarshalTo(data []byte) (int, error) {
 func (tm *ExampleMessage) Unmarshal(data []byte) error {
 	offset := 0
 
-	n := buf.Byte2Int(data)
-	offset += 4
-
-	tm.MsgID = make([]byte, n)
-	if n > 0 {
-		copy(tm.MsgID, data[offset:offset+n])
-		offset += n
-	}
+	tm.MsgID = buf.Byte2Uint64(data)
+	offset += 8
 
-	n = buf.Byte2Int(data[offset:])
+	n := buf.Byte2Int(data[offset:])
 	offset += 4
 
 	content := make([]byte, n)
diff --git a/pkg/common/morpc/examples/message/message_test.go b/pkg/common/morpc/examples/message/message_test.go
index d903783d9..bd4d77d0b 100644
--- a/pkg/common/morpc/examples/message/message_test.go
+++ b/pkg/common/morpc/examples/message/message_test.go
@@ -21,7 +21,7 @@ import (
 )
 
 func TestMarshal(t *testing.T) {
-	m := &ExampleMessage{MsgID: []byte{1}, Content: "hello"}
+	m := &ExampleMessage{MsgID: 1, Content: "hello"}
 
 	data := make([]byte, m.Size())
 	_, err := m.MarshalTo(data)
diff --git a/pkg/common/morpc/examples/pingpong/main.go b/pkg/common/morpc/examples/pingpong/main.go
index 39f092f06..f6b4430a8 100644
--- a/pkg/common/morpc/examples/pingpong/main.go
+++ b/pkg/common/morpc/examples/pingpong/main.go
@@ -47,7 +47,7 @@ func main() {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
 
-	f, err := cli.Send(ctx, addr, &message.ExampleMessage{MsgID: []byte{1}, Content: "hello"}, morpc.SendOptions{})
+	f, err := cli.Send(ctx, addr, &message.ExampleMessage{MsgID: 1, Content: "hello"}, morpc.SendOptions{})
 	if err != nil {
 		panic(err)
 	}
diff --git a/pkg/common/morpc/examples/stream/main.go b/pkg/common/morpc/examples/stream/main.go
index 695d453c1..dab911eb3 100644
--- a/pkg/common/morpc/examples/stream/main.go
+++ b/pkg/common/morpc/examples/stream/main.go
@@ -77,7 +77,7 @@ func startServer() error {
 		// send more message back
 		go func() {
 			for i := 0; i < 10; i++ {
-				if err := cs.Write(&message.ExampleMessage{MsgID: request.ID(), Content: fmt.Sprintf("stream-%d", i)},
+				if err := cs.Write(&message.ExampleMessage{MsgID: request.GetID(), Content: fmt.Sprintf("stream-%d", i)},
 					morpc.SendOptions{}); err != nil {
 					panic(err)
 				}
diff --git a/pkg/common/morpc/future.go b/pkg/common/morpc/future.go
index b713d7985..b4ae0f3ec 100644
--- a/pkg/common/morpc/future.go
+++ b/pkg/common/morpc/future.go
@@ -15,7 +15,6 @@
 package morpc
 
 import (
-	"bytes"
 	"context"
 	"runtime"
 	"sync"
@@ -36,7 +35,7 @@ func newFutureWithChan(c chan Message) *Future {
 
 // Future is used to obtain response data synchronously.
 type Future struct {
-	id          []byte
+	id          uint64
 	c           chan Message
 	releaseFunc func(*Future)
 	stream      bool
@@ -60,7 +59,7 @@ func (f *Future) init(ctx context.Context, request Message, opts SendOptions, st
 		}
 	}
 
-	f.id = request.ID()
+	f.id = request.GetID()
 	f.ctx = ctx
 	f.request = request
 	f.opts = opts
@@ -103,7 +102,7 @@ func (f *Future) done(response Message) {
 	defer f.mu.Unlock()
 
 	if !f.mu.closed && !f.timeout() {
-		if !bytes.Equal(response.ID(), f.id) {
+		if response.GetID() != f.id {
 			return
 		}
 
@@ -131,7 +130,7 @@ func (f *Future) unRef() {
 
 func (f *Future) reset() {
 	f.request = nil
-	f.id = nil
+	f.id = 0
 	f.ctx = nil
 	f.opts = SendOptions{}
 	f.stream = false
diff --git a/pkg/common/morpc/future_test.go b/pkg/common/morpc/future_test.go
index a394429b7..ee55eb58b 100644
--- a/pkg/common/morpc/future_test.go
+++ b/pkg/common/morpc/future_test.go
@@ -55,7 +55,7 @@ func TestNewFuture(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
 
-	req := newTestMessage([]byte("id"))
+	req := newTestMessage(1)
 	f := newFuture(nil)
 	f.init(ctx, req, SendOptions{}, false)
 	defer f.Close()
@@ -72,7 +72,7 @@ func TestReleaseFuture(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
 
-	req := newTestMessage([]byte("id"))
+	req := newTestMessage(1)
 	f := newFuture(func(f *Future) { f.reset() })
 	f.init(ctx, req, SendOptions{}, false)
 	f.c <- req
@@ -87,7 +87,7 @@ func TestGet(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
 
-	req := newTestMessage([]byte("id"))
+	req := newTestMessage(1)
 	f := newFuture(func(f *Future) { f.reset() })
 	f.init(ctx, req, SendOptions{}, false)
 	defer f.Close()
@@ -102,7 +102,7 @@ func TestGetWithTimeout(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), 1)
 	defer cancel()
 
-	req := newTestMessage([]byte("id"))
+	req := newTestMessage(1)
 	f := newFuture(func(f *Future) { f.reset() })
 	f.init(ctx, req, SendOptions{}, false)
 	defer f.Close()
@@ -117,18 +117,18 @@ func TestGetWithInvalidResponse(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
 
-	req := newTestMessage([]byte("id"))
+	req := newTestMessage(1)
 	f := newFuture(func(f *Future) { f.reset() })
 	f.init(ctx, req, SendOptions{}, false)
 	defer f.Close()
 
-	f.done(newTestMessage([]byte("id2")))
+	f.done(newTestMessage(2))
 	assert.Equal(t, 0, len(f.c))
 }
 
 func TestTimeout(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
-	req := newTestMessage([]byte("id"))
+	req := newTestMessage(1)
 	f := newFuture(func(f *Future) { f.reset() })
 	f.init(ctx, req, SendOptions{}, false)
 	defer f.Close()
diff --git a/pkg/common/morpc/server.go b/pkg/common/morpc/server.go
index ed1e4b828..9789ca1d9 100644
--- a/pkg/common/morpc/server.go
+++ b/pkg/common/morpc/server.go
@@ -16,7 +16,6 @@ package morpc
 
 import (
 	"context"
-	"encoding/hex"
 	"fmt"
 	"sync"
 	"time"
@@ -98,6 +97,7 @@ func NewRPCServer(name, address string, codec Codec, options ...ServerOption) (R
 		goetty.WithAppSessionOptions(
 			goetty.WithCodec(codec, codec),
 			goetty.WithLogger(s.logger),
+			goetty.WithDisableReleaseOutBuf(), // release out buf when write loop reutrned
 		),
 	)
 	if err != nil {
@@ -165,7 +165,7 @@ func (s *server) onMessage(rs goetty.IOSession, value interface{}, sequence uint
 	if ce := s.logger.Check(zap.DebugLevel, "received request"); ce != nil {
 		ce.Write(zap.Uint64("sequence", sequence),
 			zap.String("client", rs.RemoteAddr()),
-			zap.String("request-id", hex.EncodeToString(request.ID())),
+			zap.Uint64("request-id", request.GetID()),
 			zap.String("request", request.DebugString()))
 	}
 
@@ -181,14 +181,17 @@ func (s *server) onMessage(rs goetty.IOSession, value interface{}, sequence uint
 	if ce := s.logger.Check(zap.DebugLevel, "handle request completed"); ce != nil {
 		ce.Write(zap.Uint64("sequence", sequence),
 			zap.String("client", rs.RemoteAddr()),
-			zap.String("request-id", hex.EncodeToString(request.ID())))
+			zap.Uint64("request-id", request.GetID()))
 	}
 	return nil
 }
 
 func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error {
 	return s.stopper.RunTask(func(ctx context.Context) {
-		defer cs.close()
+		defer func() {
+			cs.close()
+			rs.OutBuf().Release()
+		}()
 
 		responses := make([]sendMessage, 0, s.options.batchSendSize)
 		fetch := func() {
@@ -240,7 +243,7 @@ func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error {
 						timeout += sendResponses[idx].opts.Timeout
 						if err := rs.Write(sendResponses[idx].message, goetty.WriteOptions{}); err != nil {
 							s.logger.Error("write response failed",
-								zap.String("request-id", hex.EncodeToString(sendResponses[idx].message.ID())),
+								zap.Uint64("request-id", sendResponses[idx].message.GetID()),
 								zap.Error(err))
 							return
 						}
@@ -251,7 +254,7 @@ func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error {
 						if err := rs.Flush(timeout); err != nil {
 							for idx := range sendResponses {
 								s.logger.Error("write response failed",
-									zap.String("request-id", hex.EncodeToString(sendResponses[idx].message.ID())),
+									zap.Uint64("request-id", sendResponses[idx].message.GetID()),
 									zap.Error(err))
 							}
 							return
@@ -260,8 +263,8 @@ func (s *server) startWriteLoop(rs goetty.IOSession, cs *clientSession) error {
 							var fields []zap.Field
 							fields = append(fields, zap.String("client", rs.RemoteAddr()))
 							for idx := range sendResponses {
-								fields = append(fields, zap.String("request-id",
-									hex.EncodeToString(sendResponses[idx].message.ID())))
+								fields = append(fields, zap.Uint64("request-id",
+									sendResponses[idx].message.GetID()))
 								fields = append(fields, zap.String("response",
 									sendResponses[idx].message.DebugString()))
 							}
diff --git a/pkg/common/morpc/server_test.go b/pkg/common/morpc/server_test.go
index b91af41bb..cbaa1293b 100644
--- a/pkg/common/morpc/server_test.go
+++ b/pkg/common/morpc/server_test.go
@@ -40,7 +40,7 @@ func TestHandleServer(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 		defer cancel()
 
-		req := &testMessage{id: []byte{1}}
+		req := newTestMessage(1)
 		f, err := c.Send(ctx, testAddr, req, SendOptions{})
 		assert.NoError(t, err)
 
@@ -65,7 +65,7 @@ func TestHandleServerWithPayloadMessage(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 		defer cancel()
 
-		req := &testMessage{id: []byte{1}, payload: []byte("payload")}
+		req := &testMessage{id: 1, payload: []byte("payload")}
 		f, err := c.Send(ctx, testAddr, req, SendOptions{})
 		assert.NoError(t, err)
 
@@ -91,7 +91,7 @@ func TestHandleServerWriteWithClosedSession(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 		defer cancel()
 
-		req := &testMessage{id: []byte{1}}
+		req := newTestMessage(1)
 		f, err := c.Send(ctx, testAddr, req, SendOptions{})
 		assert.NoError(t, err)
 
diff --git a/pkg/common/morpc/types.go b/pkg/common/morpc/types.go
index a8678840c..de1940680 100644
--- a/pkg/common/morpc/types.go
+++ b/pkg/common/morpc/types.go
@@ -32,9 +32,11 @@ var (
 // Message morpc is not a normal remote method call, rather it is a message-based asynchronous
 // driven framework.
 type Message interface {
-	// ID each message has a unique ID. If it is a message transmitted in stream, the ID must
-	// be set to Stream.ID
-	ID() []byte
+	// SetID each message has a unique ID in a RPCClient Backend. If it is a message transmitted
+	// in stream, the ID must be set to Stream.ID.
+	SetID(uint64)
+	// GetID returns ID of the message
+	GetID() uint64
 	// DebugString return debug string
 	DebugString() string
 	// Size size of message after marshal
@@ -140,7 +142,7 @@ type Backend interface {
 type Stream interface {
 	// ID returns the stream ID. All messages transmitted on the current stream need to use the
 	// stream ID as the message ID
-	ID() []byte
+	ID() uint64
 	// Send send message to stream
 	Send(request Message, opts SendOptions) error
 	// Receive returns a channel to read message from server. The channel will be closed after stream
diff --git a/pkg/logservice/service.go b/pkg/logservice/service.go
index 82f3aff4b..d683c10a7 100644
--- a/pkg/logservice/service.go
+++ b/pkg/logservice/service.go
@@ -52,8 +52,12 @@ type RPCRequest struct {
 
 var _ morpc.PayloadMessage = (*RPCRequest)(nil)
 
-func (r *RPCRequest) ID() []byte {
-	return nil
+func (r *RPCRequest) SetID(id uint64) {
+	r.RequestID = id
+}
+
+func (r *RPCRequest) GetID() uint64 {
+	return r.RequestID
 }
 
 func (r *RPCRequest) DebugString() string {
@@ -76,8 +80,12 @@ type RPCResponse struct {
 
 var _ morpc.PayloadMessage = (*RPCResponse)(nil)
 
-func (r *RPCResponse) ID() []byte {
-	return nil
+func (r *RPCResponse) SetID(id uint64) {
+	r.RequestID = id
+}
+
+func (r *RPCResponse) GetID() uint64 {
+	return r.RequestID
 }
 
 func (r *RPCResponse) DebugString() string {
@@ -167,6 +175,7 @@ func (s *Service) handleRPCRequest(req morpc.Message,
 		recs = MustMarshal(&records)
 	}
 	// FIXME: set timeout value
+	resp.RequestID = rr.RequestID
 	return cs.Write(&RPCResponse{
 		Response: resp,
 		payload:  recs,
diff --git a/pkg/pb/logservice/logservice.pb.go b/pkg/pb/logservice/logservice.pb.go
index f3c159cf7..a68618c03 100644
--- a/pkg/pb/logservice/logservice.pb.go
+++ b/pkg/pb/logservice/logservice.pb.go
@@ -5,11 +5,12 @@ package logservice
 
 import (
 	fmt "fmt"
-	_ "github.com/gogo/protobuf/gogoproto"
-	proto "github.com/gogo/protobuf/proto"
 	io "io"
 	math "math"
 	math_bits "math/bits"
+
+	_ "github.com/gogo/protobuf/gogoproto"
+	proto "github.com/gogo/protobuf/proto"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -637,14 +638,15 @@ func (m *LogRecord) GetData() []byte {
 }
 
 type Request struct {
-	Method               MethodType `protobuf:"varint,1,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"`
-	Name                 string     `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"`
-	ShardID              uint64     `protobuf:"varint,3,opt,name=ShardID,proto3" json:"ShardID,omitempty"`
-	Index                uint64     `protobuf:"varint,4,opt,name=Index,proto3" json:"Index,omitempty"`
-	MaxSize              uint64     `protobuf:"varint,5,opt,name=MaxSize,proto3" json:"MaxSize,omitempty"`
-	Timeout              int64      `protobuf:"varint,6,opt,name=Timeout,proto3" json:"Timeout,omitempty"`
-	DNShardID            uint64     `protobuf:"varint,7,opt,name=DNShardID,proto3" json:"DNShardID,omitempty"`
-	DNID                 uint64     `protobuf:"varint,8,opt,name=DNID,proto3" json:"DNID,omitempty"`
+	RequestID            uint64     `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
+	Method               MethodType `protobuf:"varint,2,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"`
+	Name                 string     `protobuf:"bytes,3,opt,name=Name,proto3" json:"Name,omitempty"`
+	ShardID              uint64     `protobuf:"varint,4,opt,name=ShardID,proto3" json:"ShardID,omitempty"`
+	Index                uint64     `protobuf:"varint,5,opt,name=Index,proto3" json:"Index,omitempty"`
+	MaxSize              uint64     `protobuf:"varint,6,opt,name=MaxSize,proto3" json:"MaxSize,omitempty"`
+	Timeout              int64      `protobuf:"varint,7,opt,name=Timeout,proto3" json:"Timeout,omitempty"`
+	DNShardID            uint64     `protobuf:"varint,8,opt,name=DNShardID,proto3" json:"DNShardID,omitempty"`
+	DNID                 uint64     `protobuf:"varint,9,opt,name=DNID,proto3" json:"DNID,omitempty"`
 	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
 	XXX_unrecognized     []byte     `json:"-"`
 	XXX_sizecache        int32      `json:"-"`
@@ -683,6 +685,13 @@ func (m *Request) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_Request proto.InternalMessageInfo
 
+func (m *Request) GetRequestID() uint64 {
+	if m != nil {
+		return m.RequestID
+	}
+	return 0
+}
+
 func (m *Request) GetMethod() MethodType {
 	if m != nil {
 		return m.Method
@@ -740,12 +749,13 @@ func (m *Request) GetDNID() uint64 {
 }
 
 type Response struct {
-	Method               MethodType `protobuf:"varint,1,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"`
-	ErrorCode            ErrorCode  `protobuf:"varint,2,opt,name=ErrorCode,proto3,enum=logservice.ErrorCode" json:"ErrorCode,omitempty"`
-	ErrorMessage         string     `protobuf:"bytes,3,opt,name=ErrorMessage,proto3" json:"ErrorMessage,omitempty"`
-	ShardID              uint64     `protobuf:"varint,4,opt,name=ShardID,proto3" json:"ShardID,omitempty"`
-	Index                uint64     `protobuf:"varint,5,opt,name=Index,proto3" json:"Index,omitempty"`
-	LastIndex            uint64     `protobuf:"varint,6,opt,name=LastIndex,proto3" json:"LastIndex,omitempty"`
+	RequestID            uint64     `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
+	Method               MethodType `protobuf:"varint,2,opt,name=Method,proto3,enum=logservice.MethodType" json:"Method,omitempty"`
+	ErrorCode            ErrorCode  `protobuf:"varint,3,opt,name=ErrorCode,proto3,enum=logservice.ErrorCode" json:"ErrorCode,omitempty"`
+	ErrorMessage         string     `protobuf:"bytes,4,opt,name=ErrorMessage,proto3" json:"ErrorMessage,omitempty"`
+	ShardID              uint64     `protobuf:"varint,5,opt,name=ShardID,proto3" json:"ShardID,omitempty"`
+	Index                uint64     `protobuf:"varint,6,opt,name=Index,proto3" json:"Index,omitempty"`
+	LastIndex            uint64     `protobuf:"varint,7,opt,name=LastIndex,proto3" json:"LastIndex,omitempty"`
 	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
 	XXX_unrecognized     []byte     `json:"-"`
 	XXX_sizecache        int32      `json:"-"`
@@ -784,6 +794,13 @@ func (m *Response) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_Response proto.InternalMessageInfo
 
+func (m *Response) GetRequestID() uint64 {
+	if m != nil {
+		return m.RequestID
+	}
+	return 0
+}
+
 func (m *Response) GetMethod() MethodType {
 	if m != nil {
 		return m.Method
@@ -894,72 +911,73 @@ func init() {
 func init() { proto.RegisterFile("logservice.proto", fileDescriptor_fd1040c5381ab5a7) }
 
 var fileDescriptor_fd1040c5381ab5a7 = []byte{
-	// 1031 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x4f, 0x6b, 0xe3, 0x46,
-	0x14, 0x8f, 0x6c, 0xf9, 0xdf, 0xb3, 0xd7, 0x9d, 0x0c, 0xcd, 0x56, 0x0d, 0x4b, 0x36, 0x98, 0xb2,
-	0x84, 0x40, 0x1d, 0x48, 0x58, 0x28, 0x6d, 0xa1, 0x24, 0x96, 0xba, 0xeb, 0x25, 0x91, 0xc3, 0xd8,
-	0x39, 0xb4, 0x50, 0x96, 0x89, 0x35, 0x71, 0xd4, 0xb5, 0x35, 0xee, 0x68, 0x1c, 0xe2, 0x7e, 0x88,
-	0x7e, 0xae, 0x3d, 0xe6, 0xd2, 0xeb, 0xd2, 0x4d, 0x2f, 0xbd, 0xf5, 0x03, 0xf4, 0x52, 0x66, 0x46,
-	0x92, 0xa5, 0x84, 0xb4, 0xbb, 0xb7, 0x79, 0xbf, 0xf7, 0x47, 0xbf, 0xf7, 0xde, 0x6f, 0x06, 0x01,
-	0x9a, 0xf2, 0x49, 0xcc, 0xc4, 0x55, 0x38, 0x66, 0xdd, 0xb9, 0xe0, 0x92, 0x63, 0x58, 0x21, 0x9b,
-	0x5f, 0x4e, 0x42, 0x79, 0xb9, 0x38, 0xef, 0x8e, 0xf9, 0x6c, 0x6f, 0xc2, 0x27, 0x7c, 0x4f, 0x87,
-	0x9c, 0x2f, 0x2e, 0xb4, 0xa5, 0x0d, 0x7d, 0x32, 0xa9, 0x9d, 0xbf, 0x2d, 0x68, 0x1d, 0xf3, 0xc9,
-	0xf0, 0x92, 0x8a, 0xa0, 0x1f, 0x5d, 0x70, 0xec, 0x40, 0xcd, 0x18, 0xae, 0x63, 0x6d, 0x5b, 0x3b,
-	0x36, 0x49, 0x4d, 0x7c, 0x04, 0x75, 0xc2, 0xe6, 0xd3, 0x70, 0x4c, 0x63, 0xa7, 0xb4, 0x5d, 0xde,
-	0x69, 0xee, 0x3f, 0xeb, 0xe6, 0xa8, 0xe4, 0xab, 0x74, 0xd3, 0x40, 0x2f, 0x92, 0x62, 0x49, 0xb2,
-	0x3c, 0xfc, 0x29, 0x54, 0xbc, 0x39, 0x1f, 0x5f, 0x3a, 0x65, 0x5d, 0xdb, 0x18, 0x78, 0x13, 0xea,
-	0xc7, 0x8c, 0x06, 0x4c, 0xf4, 0x5d, 0xc7, 0xd6, 0x8e, 0xcc, 0xc6, 0x18, 0xec, 0x11, 0x13, 0x33,
-	0xa7, 0xa2, 0x71, 0x7d, 0xde, 0xfc, 0x06, 0x1e, 0x15, 0x3e, 0x80, 0x11, 0x94, 0xdf, 0xb0, 0x65,
-	0x42, 0x58, 0x1d, 0xd5, 0x87, 0xae, 0xe8, 0x74, 0xc1, 0x9c, 0xd2, 0xb6, 0xb5, 0xd3, 0x20, 0xc6,
-	0xf8, 0xba, 0xf4, 0x95, 0xd5, 0xb9, 0x82, 0xf6, 0x31, 0x9f, 0x24, 0xf9, 0xba, 0xe5, 0xef, 0x8b,
-	0x23, 0xd0, 0x65, 0x9a, 0xfb, 0xce, 0x43, 0xcd, 0x1d, 0xd5, 0xdf, 0xbe, 0x7b, 0xba, 0x76, 0xf3,
-	0xee, 0xa9, 0x45, 0x8a, 0xa3, 0x7b, 0x02, 0x8d, 0xb4, 0xac, 0xab, 0xbf, 0x6b, 0x93, 0x15, 0xd0,
-	0xf9, 0xdd, 0x82, 0x75, 0x15, 0x2e, 0xb9, 0x60, 0x2f, 0x19, 0x15, 0xf2, 0x9c, 0x51, 0xa9, 0xda,
-	0x3b, 0x3b, 0x4b, 0x66, 0xdd, 0x20, 0xfa, 0x8c, 0xb7, 0xa1, 0x49, 0xe8, 0x85, 0x3c, 0x0c, 0x02,
-	0xc1, 0xe2, 0x38, 0xe9, 0x20, 0x0f, 0xe1, 0x67, 0xd0, 0x1e, 0x1a, 0x66, 0x69, 0x50, 0x59, 0x07,
-	0xdd, 0x41, 0xf1, 0x17, 0xf0, 0xe8, 0x05, 0x8f, 0xe3, 0x70, 0x9e, 0x86, 0xd9, 0x3a, 0xac, 0x08,
-	0xe2, 0x6f, 0x73, 0x8b, 0xad, 0xe8, 0xc5, 0x6e, 0xde, 0xe9, 0x3d, 0x37, 0xad, 0x23, 0x5b, 0x75,
-	0xbf, 0x5a, 0x69, 0xc7, 0x83, 0xa6, 0xeb, 0x7f, 0x88, 0x7e, 0xfe, 0x7b, 0x3c, 0x3f, 0x01, 0x72,
-	0xfd, 0x0f, 0x18, 0xce, 0x73, 0xa8, 0xea, 0x82, 0xa9, 0x06, 0x3f, 0xcb, 0x53, 0xcd, 0x11, 0x49,
-	0x78, 0x26, 0xc1, 0x9d, 0x7f, 0x2c, 0xa8, 0x93, 0xe1, 0xc9, 0x50, 0x52, 0xc9, 0x94, 0x38, 0xfa,
-	0x51, 0xc0, 0xae, 0x13, 0x86, 0xc6, 0x50, 0xc3, 0x3a, 0x66, 0x34, 0x66, 0x2f, 0xf9, 0xd4, 0x48,
-	0xd1, 0x70, 0x2c, 0x82, 0x6a, 0xf4, 0x23, 0xb1, 0x88, 0xc6, 0x54, 0xb2, 0xc0, 0x14, 0x31, 0x52,
-	0xbe, 0x83, 0xe2, 0x57, 0xd0, 0x32, 0x89, 0x61, 0x2c, 0xb9, 0x58, 0x3a, 0xf6, 0xfd, 0x1b, 0x93,
-	0xf2, 0xe9, 0xe6, 0x03, 0xcd, 0x8d, 0x29, 0xe4, 0x6e, 0x7e, 0x07, 0xeb, 0xf7, 0x42, 0xfe, 0x4f,
-	0xf3, 0x76, 0x5e, 0xf3, 0x14, 0x1a, 0x7a, 0x8b, 0x63, 0x2e, 0x82, 0x07, 0xba, 0xdf, 0x05, 0x7b,
-	0xb4, 0x9c, 0x9b, 0xdc, 0xf6, 0xfe, 0xe3, 0x02, 0x4f, 0x9d, 0xa7, 0xbc, 0x44, 0xc7, 0xa8, 0xbd,
-	0xb8, 0x54, 0x52, 0xdd, 0x79, 0x8b, 0xe8, 0x73, 0xe7, 0x4f, 0x0b, 0x6a, 0x84, 0xfd, 0xb2, 0x60,
-	0xb1, 0xc4, 0x5d, 0xa8, 0x9e, 0x30, 0x79, 0xc9, 0x03, 0xfd, 0x89, 0x3b, 0xd5, 0x8c, 0x47, 0x57,
-	0x4b, 0xa2, 0x54, 0x3d, 0x9f, 0xce, 0xd2, 0xbb, 0xaa, 0xcf, 0x79, 0x1d, 0x95, 0x8b, 0x3a, 0xca,
-	0xf8, 0xdb, 0x79, 0xfe, 0x0e, 0xd4, 0x4e, 0xe8, 0xf5, 0x30, 0xfc, 0x95, 0x25, 0x4f, 0x45, 0x6a,
-	0x2a, 0xcf, 0x28, 0x9c, 0x31, 0xbe, 0x90, 0x4e, 0x75, 0xdb, 0xda, 0x29, 0x93, 0xd4, 0x54, 0x8a,
-	0x4c, 0x15, 0xe3, 0x3a, 0x35, 0xa3, 0xc8, 0x0c, 0xd0, 0x5d, 0xfa, 0x7d, 0xd7, 0xa9, 0x9b, 0x97,
-	0x47, 0x9d, 0x3b, 0xef, 0x95, 0x8c, 0x58, 0x3c, 0xe7, 0x51, 0xcc, 0x3e, 0xba, 0xcd, 0x03, 0x68,
-	0x78, 0x42, 0x70, 0xd1, 0xe3, 0x41, 0x3a, 0xe7, 0x8d, 0x7c, 0x4a, 0xe6, 0x24, 0xab, 0x38, 0xdc,
-	0x81, 0x96, 0x36, 0x4e, 0x58, 0x1c, 0xd3, 0x09, 0x4b, 0x2e, 0x7a, 0x01, 0xcb, 0xcf, 0xca, 0x7e,
-	0x60, 0x56, 0x95, 0xfc, 0xac, 0x9e, 0x40, 0xe3, 0x98, 0xc6, 0xd2, 0x78, 0xaa, 0xa6, 0xef, 0x0c,
-	0xe8, 0xbc, 0xd2, 0xef, 0x94, 0x59, 0x7a, 0xd6, 0xeb, 0x73, 0xb5, 0x5d, 0x85, 0xc4, 0x8e, 0xa5,
-	0x95, 0xbc, 0x71, 0xef, 0x89, 0x50, 0xde, 0xe4, 0xd6, 0xa5, 0xb1, 0xbb, 0xd7, 0x00, 0xab, 0x41,
-	0x60, 0x80, 0x6a, 0x8f, 0x78, 0x87, 0x23, 0x0f, 0xad, 0xe1, 0x26, 0xd4, 0x5c, 0x6f, 0x38, 0x22,
-	0x83, 0x1f, 0x90, 0xa5, 0x1c, 0x87, 0xa7, 0xa7, 0x9e, 0xef, 0xa2, 0x12, 0xae, 0x83, 0x4d, 0xbc,
-	0x43, 0x17, 0x95, 0x71, 0x0b, 0xea, 0x23, 0x72, 0xe6, 0xf7, 0x54, 0x82, 0x8d, 0x11, 0xb4, 0x5e,
-	0x78, 0xa3, 0xd7, 0x19, 0x52, 0x51, 0x25, 0x7a, 0x03, 0xdf, 0xf7, 0x7a, 0x23, 0x54, 0xc5, 0x6d,
-	0x80, 0xc4, 0x78, 0x4d, 0x06, 0xa8, 0xb6, 0xeb, 0x03, 0xac, 0x74, 0xab, 0xbc, 0x67, 0x31, 0x13,
-	0x06, 0x41, 0x6b, 0xaa, 0x74, 0x3f, 0x92, 0x4c, 0x44, 0x74, 0x8a, 0x2c, 0xfc, 0x09, 0x34, 0xf5,
-	0xfd, 0x3a, 0x9b, 0x07, 0x54, 0x32, 0x54, 0xc2, 0xeb, 0xc9, 0x53, 0x40, 0xd8, 0xcf, 0x6c, 0x2c,
-	0x59, 0x80, 0xca, 0xbb, 0xbf, 0x95, 0x72, 0xdb, 0x53, 0x9f, 0xf6, 0xb9, 0x36, 0x4d, 0x2b, 0x89,
-	0xa2, 0x90, 0xa5, 0x2a, 0xf7, 0x68, 0x34, 0x66, 0x53, 0x16, 0xa0, 0x92, 0x22, 0xdd, 0x8f, 0xae,
-	0xe8, 0x34, 0x0c, 0xf4, 0x46, 0x50, 0x19, 0x63, 0x68, 0x27, 0x48, 0x9a, 0x63, 0xe7, 0xb0, 0x53,
-	0xba, 0x9c, 0x72, 0x1a, 0xa0, 0x0a, 0x7e, 0x0c, 0xb8, 0x88, 0x29, 0x2d, 0xa3, 0xaa, 0xaa, 0x9f,
-	0xb1, 0xaa, 0x29, 0xa2, 0xba, 0xb0, 0xcf, 0x25, 0x61, 0x34, 0x58, 0xa2, 0xba, 0xfa, 0xe4, 0x70,
-	0x19, 0x4b, 0x36, 0xeb, 0x4d, 0x79, 0xcc, 0x02, 0xd4, 0xc0, 0x9f, 0xc3, 0x86, 0xde, 0xec, 0xe1,
-	0x54, 0xa8, 0x98, 0xec, 0xa1, 0x42, 0x81, 0x9a, 0xcb, 0x60, 0x21, 0x07, 0x17, 0x84, 0x46, 0x13,
-	0x86, 0xd4, 0xcd, 0x6e, 0xfb, 0x5c, 0xe6, 0x5e, 0x3c, 0x74, 0x81, 0x37, 0x00, 0x0d, 0xe4, 0x25,
-	0x13, 0xa6, 0xaa, 0x69, 0xfa, 0xaf, 0xda, 0x91, 0x77, 0xf3, 0x7e, 0x6b, 0xed, 0xed, 0xed, 0x96,
-	0x75, 0x73, 0xbb, 0x65, 0xfd, 0x71, 0xbb, 0x65, 0xfd, 0x78, 0x90, 0xfb, 0xf5, 0x98, 0x51, 0x29,
-	0xc2, 0x6b, 0x2e, 0xc2, 0x49, 0x18, 0xa5, 0x46, 0xc4, 0xf6, 0xe6, 0x6f, 0x26, 0x7b, 0xf3, 0xf3,
-	0xbd, 0x95, 0x74, 0xce, 0xab, 0xfa, 0x3f, 0xe4, 0xe0, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb2,
-	0xc8, 0xc0, 0x42, 0xd6, 0x08, 0x00, 0x00,
+	// 1042 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4f, 0x6f, 0xe3, 0x44,
+	0x14, 0xaf, 0x13, 0xe7, 0xdf, 0x4b, 0x36, 0x4c, 0x47, 0x74, 0x31, 0xd5, 0xaa, 0x5b, 0x45, 0x68,
+	0x55, 0x55, 0x22, 0x95, 0x5a, 0xad, 0x84, 0x00, 0x09, 0xb5, 0xb1, 0xd9, 0xcd, 0xaa, 0x75, 0xaa,
+	0x49, 0x7a, 0x00, 0x09, 0xad, 0xa6, 0xf1, 0x34, 0x35, 0x9b, 0x78, 0xc2, 0x78, 0x52, 0x35, 0xdc,
+	0xf8, 0x02, 0x7c, 0xae, 0x3d, 0xf6, 0xc2, 0x75, 0x05, 0x3d, 0x71, 0xe3, 0x03, 0x70, 0x00, 0xcd,
+	0x8c, 0xed, 0xd8, 0x2d, 0x85, 0x3d, 0x70, 0x9b, 0xf7, 0x7b, 0x7f, 0xe6, 0xbd, 0xdf, 0xfb, 0x79,
+	0x64, 0x40, 0x53, 0x3e, 0x89, 0x99, 0xb8, 0x0a, 0xc7, 0xac, 0x3b, 0x17, 0x5c, 0x72, 0x0c, 0x2b,
+	0x64, 0xf3, 0xd3, 0x49, 0x28, 0x2f, 0x17, 0xe7, 0xdd, 0x31, 0x9f, 0xed, 0x4d, 0xf8, 0x84, 0xef,
+	0xe9, 0x90, 0xf3, 0xc5, 0x85, 0xb6, 0xb4, 0xa1, 0x4f, 0x26, 0xb5, 0xf3, 0x87, 0x05, 0xad, 0x63,
+	0x3e, 0x19, 0x5e, 0x52, 0x11, 0xf4, 0xa3, 0x0b, 0x8e, 0x1d, 0xa8, 0x19, 0xc3, 0x75, 0xac, 0x6d,
+	0x6b, 0xc7, 0x26, 0xa9, 0x89, 0x8f, 0xa0, 0x4e, 0xd8, 0x7c, 0x1a, 0x8e, 0x69, 0xec, 0x94, 0xb6,
+	0xcb, 0x3b, 0xcd, 0xfd, 0x67, 0xdd, 0x5c, 0x2b, 0xf9, 0x2a, 0xdd, 0x34, 0xd0, 0x8b, 0xa4, 0x58,
+	0x92, 0x2c, 0x0f, 0x7f, 0x08, 0x15, 0x6f, 0xce, 0xc7, 0x97, 0x4e, 0x59, 0xd7, 0x36, 0x06, 0xde,
+	0x84, 0xfa, 0x31, 0xa3, 0x01, 0x13, 0x7d, 0xd7, 0xb1, 0xb5, 0x23, 0xb3, 0x31, 0x06, 0x7b, 0xc4,
+	0xc4, 0xcc, 0xa9, 0x68, 0x5c, 0x9f, 0x37, 0xbf, 0x80, 0x47, 0x85, 0x0b, 0x30, 0x82, 0xf2, 0x1b,
+	0xb6, 0x4c, 0x1a, 0x56, 0x47, 0x75, 0xd1, 0x15, 0x9d, 0x2e, 0x98, 0x53, 0xda, 0xb6, 0x76, 0x1a,
+	0xc4, 0x18, 0x9f, 0x97, 0x3e, 0xb3, 0x3a, 0x57, 0xd0, 0x3e, 0xe6, 0x93, 0x24, 0x5f, 0x8f, 0xfc,
+	0x75, 0x91, 0x02, 0x5d, 0xa6, 0xb9, 0xef, 0x3c, 0x34, 0xdc, 0x51, 0xfd, 0xed, 0xbb, 0xa7, 0x6b,
+	0x37, 0xef, 0x9e, 0x5a, 0xa4, 0x48, 0xdd, 0x13, 0x68, 0xa4, 0x65, 0x5d, 0x7d, 0xaf, 0x4d, 0x56,
+	0x40, 0xe7, 0x17, 0x0b, 0xd6, 0x55, 0xb8, 0xe4, 0x82, 0xbd, 0x64, 0x54, 0xc8, 0x73, 0x46, 0xa5,
+	0x1a, 0xef, 0xec, 0x2c, 0xe1, 0xba, 0x41, 0xf4, 0x19, 0x6f, 0x43, 0x93, 0xd0, 0x0b, 0x79, 0x18,
+	0x04, 0x82, 0xc5, 0x71, 0x32, 0x41, 0x1e, 0xc2, 0xcf, 0xa0, 0x3d, 0x34, 0x9d, 0xa5, 0x41, 0x65,
+	0x1d, 0x74, 0x07, 0xc5, 0x9f, 0xc0, 0xa3, 0x17, 0x3c, 0x8e, 0xc3, 0x79, 0x1a, 0x66, 0xeb, 0xb0,
+	0x22, 0x88, 0xbf, 0xcc, 0x2d, 0xb6, 0xa2, 0x17, 0xbb, 0x79, 0x67, 0xf6, 0x1c, 0x5b, 0x47, 0xb6,
+	0x9a, 0x7e, 0xb5, 0xd2, 0x8e, 0x07, 0x4d, 0xd7, 0x7f, 0x1f, 0xfd, 0xfc, 0x3b, 0x3d, 0xdf, 0x01,
+	0x72, 0xfd, 0xf7, 0x20, 0xe7, 0x39, 0x54, 0x75, 0xc1, 0x54, 0x83, 0x1f, 0xe5, 0x5b, 0xcd, 0x35,
+	0x92, 0xf4, 0x99, 0x04, 0x77, 0xfe, 0xb4, 0xa0, 0x4e, 0x86, 0x27, 0x43, 0x49, 0x25, 0x53, 0xe2,
+	0xe8, 0x47, 0x01, 0xbb, 0x4e, 0x3a, 0x34, 0x86, 0x22, 0xeb, 0x98, 0xd1, 0x98, 0xbd, 0xe4, 0x53,
+	0x23, 0x45, 0xd3, 0x63, 0x11, 0x54, 0xd4, 0x8f, 0xc4, 0x22, 0x1a, 0x53, 0xc9, 0x02, 0x53, 0xc4,
+	0x48, 0xf9, 0x0e, 0x8a, 0x5f, 0x41, 0xcb, 0x24, 0x86, 0xb1, 0xe4, 0x62, 0xe9, 0xd8, 0xf7, 0xbf,
+	0x98, 0xb4, 0x9f, 0x6e, 0x3e, 0xd0, 0x7c, 0x31, 0x85, 0xdc, 0xcd, 0xaf, 0x60, 0xfd, 0x5e, 0xc8,
+	0x7f, 0x69, 0xde, 0xce, 0x6b, 0x9e, 0x42, 0x43, 0x6f, 0x71, 0xcc, 0x45, 0xf0, 0xc0, 0xf4, 0xbb,
+	0x60, 0x8f, 0x96, 0x73, 0x93, 0xdb, 0xde, 0x7f, 0x5c, 0xe8, 0x53, 0xe7, 0x29, 0x2f, 0xd1, 0x31,
+	0x6a, 0x2f, 0x2e, 0x95, 0x54, 0x4f, 0xde, 0x22, 0xfa, 0xdc, 0xf9, 0xa9, 0x04, 0x35, 0xc2, 0x7e,
+	0x58, 0xb0, 0x58, 0x9a, 0x4d, 0xeb, 0x63, 0xa6, 0x82, 0x15, 0x80, 0xbb, 0x50, 0x3d, 0x61, 0xf2,
+	0x92, 0x07, 0xff, 0x74, 0x97, 0xf1, 0xe8, 0xbb, 0x92, 0x28, 0x75, 0x9b, 0x4f, 0x67, 0x2c, 0x91,
+	0xb8, 0x3e, 0xe7, 0x55, 0x66, 0x17, 0x55, 0x96, 0x4d, 0x57, 0xc9, 0x4f, 0xe7, 0x40, 0xed, 0x84,
+	0x5e, 0x0f, 0xc3, 0x1f, 0x99, 0x53, 0x35, 0xf1, 0x89, 0xa9, 0x3c, 0xa3, 0x70, 0xc6, 0xf8, 0x42,
+	0x3a, 0xb5, 0x6d, 0x6b, 0xa7, 0x4c, 0x52, 0x53, 0x4d, 0x91, 0xea, 0xc9, 0x75, 0xea, 0x66, 0x8a,
+	0x0c, 0xd0, 0x1c, 0xf8, 0x7d, 0xd7, 0x69, 0x98, 0x77, 0x49, 0x9d, 0x3b, 0x7f, 0x29, 0x91, 0xb1,
+	0x78, 0xce, 0xa3, 0x98, 0xfd, 0xcf, 0x24, 0x1c, 0x40, 0xc3, 0x13, 0x82, 0x8b, 0x1e, 0x0f, 0x0c,
+	0x13, 0xed, 0xfd, 0x8d, 0x7c, 0x4a, 0xe6, 0x24, 0xab, 0x38, 0xdc, 0x81, 0x96, 0x36, 0x4e, 0x58,
+	0x1c, 0xd3, 0x09, 0x4b, 0xbe, 0xfe, 0x02, 0x96, 0x67, 0xb2, 0xf2, 0x00, 0x93, 0xd5, 0x3c, 0x93,
+	0x4f, 0xa0, 0x71, 0x4c, 0x63, 0x69, 0x3c, 0x35, 0x33, 0x56, 0x06, 0x74, 0x5e, 0xe9, 0x37, 0xce,
+	0x08, 0x26, 0x63, 0xe2, 0xb9, 0x52, 0x86, 0x42, 0x62, 0xc7, 0xd2, 0x5f, 0xc1, 0xc6, 0xbd, 0xe7,
+	0x45, 0x79, 0x93, 0x2f, 0x36, 0x8d, 0xdd, 0xbd, 0x06, 0x58, 0x11, 0x81, 0x01, 0xaa, 0x3d, 0xe2,
+	0x1d, 0x8e, 0x3c, 0xb4, 0x86, 0x9b, 0x50, 0x73, 0xbd, 0xe1, 0x88, 0x0c, 0xbe, 0x41, 0x96, 0x72,
+	0x1c, 0x9e, 0x9e, 0x7a, 0xbe, 0x8b, 0x4a, 0xb8, 0x0e, 0x36, 0xf1, 0x0e, 0x5d, 0x54, 0xc6, 0x2d,
+	0xa8, 0x8f, 0xc8, 0x99, 0xdf, 0x53, 0x09, 0x36, 0x46, 0xd0, 0x7a, 0xe1, 0x8d, 0x5e, 0x67, 0x48,
+	0x45, 0x95, 0xe8, 0x0d, 0x7c, 0xdf, 0xeb, 0x8d, 0x50, 0x15, 0xb7, 0x01, 0x12, 0xe3, 0x35, 0x19,
+	0xa0, 0xda, 0xae, 0x0f, 0xb0, 0xd2, 0xbc, 0xf2, 0x9e, 0xc5, 0x4c, 0x18, 0x04, 0xad, 0xa9, 0xd2,
+	0xfd, 0x48, 0x32, 0x11, 0xd1, 0x29, 0xb2, 0xf0, 0x07, 0xd0, 0xd4, 0xdf, 0xe6, 0xd9, 0x3c, 0xa0,
+	0x92, 0xa1, 0x12, 0x5e, 0x4f, 0x9e, 0x11, 0xc2, 0xbe, 0x67, 0x63, 0xc9, 0x02, 0x54, 0xde, 0xfd,
+	0xb9, 0x94, 0xdb, 0x9e, 0xba, 0xda, 0xe7, 0xda, 0x34, 0xa3, 0x24, 0x7a, 0x43, 0x96, 0xaa, 0xdc,
+	0xa3, 0xd1, 0x98, 0x4d, 0x59, 0x80, 0x4a, 0xaa, 0xe9, 0x7e, 0x74, 0x45, 0xa7, 0x61, 0xa0, 0x37,
+	0x82, 0xca, 0x18, 0x43, 0x3b, 0x41, 0xd2, 0x1c, 0x3b, 0x87, 0x9d, 0xd2, 0xe5, 0x94, 0xd3, 0x00,
+	0x55, 0xf0, 0x63, 0xc0, 0x45, 0x4c, 0x29, 0x1d, 0x55, 0x55, 0xfd, 0xac, 0xab, 0x9a, 0x6a, 0x54,
+	0x17, 0xf6, 0xb9, 0x24, 0x8c, 0x06, 0x4b, 0x54, 0x57, 0x57, 0x0e, 0x97, 0xb1, 0x64, 0xb3, 0xde,
+	0x94, 0xc7, 0x2c, 0x40, 0x0d, 0xfc, 0x31, 0x6c, 0xe8, 0xcd, 0x1e, 0x4e, 0x85, 0x8a, 0xc9, 0x1e,
+	0x39, 0x14, 0x28, 0x5e, 0x06, 0x0b, 0x39, 0xb8, 0x20, 0x34, 0x9a, 0x30, 0xa4, 0x5e, 0x85, 0xb6,
+	0xcf, 0x65, 0xee, 0xb5, 0x44, 0x17, 0x78, 0x03, 0xd0, 0x40, 0x5e, 0x32, 0x61, 0xaa, 0x9a, 0xa1,
+	0x7f, 0xaf, 0x1d, 0x79, 0x37, 0xbf, 0x6d, 0xad, 0xbd, 0xbd, 0xdd, 0xb2, 0x6e, 0x6e, 0xb7, 0xac,
+	0x5f, 0x6f, 0xb7, 0xac, 0x6f, 0x0f, 0x72, 0xbf, 0x2d, 0x33, 0x2a, 0x45, 0x78, 0xcd, 0x45, 0x38,
+	0x09, 0xa3, 0xd4, 0x88, 0xd8, 0xde, 0xfc, 0xcd, 0x64, 0x6f, 0x7e, 0xbe, 0xb7, 0x92, 0xce, 0x79,
+	0x55, 0xff, 0xc3, 0x1c, 0xfc, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x8f, 0x96, 0x66, 0x53, 0x12, 0x09,
+	0x00, 0x00,
 }
 
 func (m *LogShardInfo) Marshal() (dAtA []byte, err error) {
@@ -1350,43 +1368,48 @@ func (m *Request) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 	if m.DNID != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.DNID))
 		i--
-		dAtA[i] = 0x40
+		dAtA[i] = 0x48
 	}
 	if m.DNShardID != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.DNShardID))
 		i--
-		dAtA[i] = 0x38
+		dAtA[i] = 0x40
 	}
 	if m.Timeout != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.Timeout))
 		i--
-		dAtA[i] = 0x30
+		dAtA[i] = 0x38
 	}
 	if m.MaxSize != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.MaxSize))
 		i--
-		dAtA[i] = 0x28
+		dAtA[i] = 0x30
 	}
 	if m.Index != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.Index))
 		i--
-		dAtA[i] = 0x20
+		dAtA[i] = 0x28
 	}
 	if m.ShardID != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.ShardID))
 		i--
-		dAtA[i] = 0x18
+		dAtA[i] = 0x20
 	}
 	if len(m.Name) > 0 {
 		i -= len(m.Name)
 		copy(dAtA[i:], m.Name)
 		i = encodeVarintLogservice(dAtA, i, uint64(len(m.Name)))
 		i--
-		dAtA[i] = 0x12
+		dAtA[i] = 0x1a
 	}
 	if m.Method != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.Method))
 		i--
+		dAtA[i] = 0x10
+	}
+	if m.RequestID != 0 {
+		i = encodeVarintLogservice(dAtA, i, uint64(m.RequestID))
+		i--
 		dAtA[i] = 0x8
 	}
 	return len(dAtA) - i, nil
@@ -1419,33 +1442,38 @@ func (m *Response) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 	if m.LastIndex != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.LastIndex))
 		i--
-		dAtA[i] = 0x30
+		dAtA[i] = 0x38
 	}
 	if m.Index != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.Index))
 		i--
-		dAtA[i] = 0x28
+		dAtA[i] = 0x30
 	}
 	if m.ShardID != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.ShardID))
 		i--
-		dAtA[i] = 0x20
+		dAtA[i] = 0x28
 	}
 	if len(m.ErrorMessage) > 0 {
 		i -= len(m.ErrorMessage)
 		copy(dAtA[i:], m.ErrorMessage)
 		i = encodeVarintLogservice(dAtA, i, uint64(len(m.ErrorMessage)))
 		i--
-		dAtA[i] = 0x1a
+		dAtA[i] = 0x22
 	}
 	if m.ErrorCode != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.ErrorCode))
 		i--
-		dAtA[i] = 0x10
+		dAtA[i] = 0x18
 	}
 	if m.Method != 0 {
 		i = encodeVarintLogservice(dAtA, i, uint64(m.Method))
 		i--
+		dAtA[i] = 0x10
+	}
+	if m.RequestID != 0 {
+		i = encodeVarintLogservice(dAtA, i, uint64(m.RequestID))
+		i--
 		dAtA[i] = 0x8
 	}
 	return len(dAtA) - i, nil
@@ -1683,6 +1711,9 @@ func (m *Request) Size() (n int) {
 	}
 	var l int
 	_ = l
+	if m.RequestID != 0 {
+		n += 1 + sovLogservice(uint64(m.RequestID))
+	}
 	if m.Method != 0 {
 		n += 1 + sovLogservice(uint64(m.Method))
 	}
@@ -1720,6 +1751,9 @@ func (m *Response) Size() (n int) {
 	}
 	var l int
 	_ = l
+	if m.RequestID != 0 {
+		n += 1 + sovLogservice(uint64(m.RequestID))
+	}
 	if m.Method != 0 {
 		n += 1 + sovLogservice(uint64(m.Method))
 	}
@@ -2891,6 +2925,25 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 		}
 		switch fieldNum {
 		case 1:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType)
+			}
+			m.RequestID = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowLogservice
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				m.RequestID |= uint64(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType)
 			}
@@ -2909,7 +2962,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 2:
+		case 3:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
 			}
@@ -2941,7 +2994,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 			}
 			m.Name = string(dAtA[iNdEx:postIndex])
 			iNdEx = postIndex
-		case 3:
+		case 4:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field ShardID", wireType)
 			}
@@ -2960,7 +3013,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 4:
+		case 5:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
 			}
@@ -2979,7 +3032,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 5:
+		case 6:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field MaxSize", wireType)
 			}
@@ -2998,7 +3051,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 6:
+		case 7:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Timeout", wireType)
 			}
@@ -3017,7 +3070,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 7:
+		case 8:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field DNShardID", wireType)
 			}
@@ -3036,7 +3089,7 @@ func (m *Request) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 8:
+		case 9:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field DNID", wireType)
 			}
@@ -3107,6 +3160,25 @@ func (m *Response) Unmarshal(dAtA []byte) error {
 		}
 		switch fieldNum {
 		case 1:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType)
+			}
+			m.RequestID = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowLogservice
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				m.RequestID |= uint64(b&0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType)
 			}
@@ -3125,7 +3197,7 @@ func (m *Response) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 2:
+		case 3:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field ErrorCode", wireType)
 			}
@@ -3144,7 +3216,7 @@ func (m *Response) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 3:
+		case 4:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field ErrorMessage", wireType)
 			}
@@ -3176,7 +3248,7 @@ func (m *Response) Unmarshal(dAtA []byte) error {
 			}
 			m.ErrorMessage = string(dAtA[iNdEx:postIndex])
 			iNdEx = postIndex
-		case 4:
+		case 5:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field ShardID", wireType)
 			}
@@ -3195,7 +3267,7 @@ func (m *Response) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 5:
+		case 6:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
 			}
@@ -3214,7 +3286,7 @@ func (m *Response) Unmarshal(dAtA []byte) error {
 					break
 				}
 			}
-		case 6:
+		case 7:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field LastIndex", wireType)
 			}
diff --git a/pkg/pb/metadata/metadata.pb.go b/pkg/pb/metadata/metadata.pb.go
index cca8a473e..51843c03c 100644
--- a/pkg/pb/metadata/metadata.pb.go
+++ b/pkg/pb/metadata/metadata.pb.go
@@ -5,11 +5,12 @@ package metadata
 
 import (
 	fmt "fmt"
-	_ "github.com/gogo/protobuf/gogoproto"
-	proto "github.com/gogo/protobuf/proto"
 	io "io"
 	math "math"
 	math_bits "math/bits"
+
+	_ "github.com/gogo/protobuf/gogoproto"
+	proto "github.com/gogo/protobuf/proto"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/pkg/pb/metric/metric.pb.go b/pkg/pb/metric/metric.pb.go
index 70c4ea31c..5b8b208de 100644
--- a/pkg/pb/metric/metric.pb.go
+++ b/pkg/pb/metric/metric.pb.go
@@ -6,10 +6,11 @@ package metric
 import (
 	encoding_binary "encoding/binary"
 	fmt "fmt"
-	proto "github.com/gogo/protobuf/proto"
 	io "io"
 	math "math"
 	math_bits "math/bits"
+
+	proto "github.com/gogo/protobuf/proto"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/pkg/pb/plan/plan.pb.go b/pkg/pb/plan/plan.pb.go
index 188b55808..7d38841e5 100644
--- a/pkg/pb/plan/plan.pb.go
+++ b/pkg/pb/plan/plan.pb.go
@@ -6,11 +6,12 @@ package plan
 import (
 	encoding_binary "encoding/binary"
 	fmt "fmt"
-	_ "github.com/gogo/protobuf/gogoproto"
-	proto "github.com/gogo/protobuf/proto"
 	io "io"
 	math "math"
 	math_bits "math/bits"
+
+	_ "github.com/gogo/protobuf/gogoproto"
+	proto "github.com/gogo/protobuf/proto"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/pkg/pb/timestamp/timestamp.pb.go b/pkg/pb/timestamp/timestamp.pb.go
index 15548ce46..5e13dcc4f 100644
--- a/pkg/pb/timestamp/timestamp.pb.go
+++ b/pkg/pb/timestamp/timestamp.pb.go
@@ -5,10 +5,11 @@ package timestamp
 
 import (
 	fmt "fmt"
-	proto "github.com/gogo/protobuf/proto"
 	io "io"
 	math "math"
 	math_bits "math/bits"
+
+	proto "github.com/gogo/protobuf/proto"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/pkg/pb/txn/txn.go b/pkg/pb/txn/txn.go
index 65202882a..f205d4977 100644
--- a/pkg/pb/txn/txn.go
+++ b/pkg/pb/txn/txn.go
@@ -183,7 +183,12 @@ func (m TxnRequest) GetTargetDN() metadata.DNShard {
 }
 
 // ID implement morpc Messgae
-func (m TxnRequest) ID() []byte {
+func (m *TxnRequest) SetID(id uint64) {
+	m.RequestID = id
+}
+
+// ID implement morpc Messgae
+func (m *TxnRequest) GetID() uint64 {
 	return m.RequestID
 }
 
@@ -203,6 +208,11 @@ func (m *TxnRequest) SetPayloadField(data []byte) {
 }
 
 // ID implement morpc Messgae
-func (m TxnResponse) ID() []byte {
+func (m *TxnResponse) SetID(id uint64) {
+	m.RequestID = id
+}
+
+// ID implement morpc Messgae
+func (m *TxnResponse) GetID() uint64 {
 	return m.RequestID
 }
diff --git a/pkg/pb/txn/txn.pb.go b/pkg/pb/txn/txn.pb.go
index 8dbd37475..c0c5d9d16 100644
--- a/pkg/pb/txn/txn.pb.go
+++ b/pkg/pb/txn/txn.pb.go
@@ -5,13 +5,14 @@ package txn
 
 import (
 	fmt "fmt"
+	io "io"
+	math "math"
+	math_bits "math/bits"
+
 	_ "github.com/gogo/protobuf/gogoproto"
 	proto "github.com/gogo/protobuf/proto"
 	metadata "github.com/matrixorigin/matrixone/pkg/pb/metadata"
 	timestamp "github.com/matrixorigin/matrixone/pkg/pb/timestamp"
-	io "io"
-	math "math"
-	math_bits "math/bits"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -466,7 +467,7 @@ func (m *CNOpResponse) GetPayload() []byte {
 // 2. DN -> DN (TxnMethod.Prepare, TxnMethod.GetStatus, TxnMethod.CommitDNShard, TxnMethod.RollbackDNShard)
 type TxnRequest struct {
 	// RequestID request id
-	RequestID []byte `protobuf:"bytes,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
+	RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
 	// Txn transaction metadata
 	Txn TxnMeta `protobuf:"bytes,2,opt,name=Txn,proto3" json:"Txn"`
 	// TxnMethod TxnRequest opCode, select the Request defined below according to TxnMethod.
@@ -525,11 +526,11 @@ func (m *TxnRequest) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_TxnRequest proto.InternalMessageInfo
 
-func (m *TxnRequest) GetRequestID() []byte {
+func (m *TxnRequest) GetRequestID() uint64 {
 	if m != nil {
 		return m.RequestID
 	}
-	return nil
+	return 0
 }
 
 func (m *TxnRequest) GetTxn() TxnMeta {
@@ -605,7 +606,7 @@ func (m *TxnRequest) GetRollbackDNShardRequest() *TxnRollbackDNShardRequest {
 // TxnResponse response of TxnRequest.
 type TxnResponse struct {
 	// RequestID corresponding request id
-	RequestID []byte `protobuf:"bytes,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
+	RequestID uint64 `protobuf:"varint,1,opt,name=RequestID,proto3" json:"RequestID,omitempty"`
 	// Txn transaction metadata. TxnResponse.TxnMeta and TxnRequest.TxnMeta may differ
 	// in that the node initiating the TxnRequest needs to process the returned TxnMeta,
 	// e.g. to determine whether the transaction is Aborted by the status of the returned
@@ -669,11 +670,11 @@ func (m *TxnResponse) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_TxnResponse proto.InternalMessageInfo
 
-func (m *TxnResponse) GetRequestID() []byte {
+func (m *TxnResponse) GetRequestID() uint64 {
 	if m != nil {
 		return m.RequestID
 	}
-	return nil
+	return 0
 }
 
 func (m *TxnResponse) GetTxn() *TxnMeta {
@@ -1379,7 +1380,7 @@ func init() {
 func init() { proto.RegisterFile("txn.proto", fileDescriptor_4f782e76b37adb9a) }
 
 var fileDescriptor_4f782e76b37adb9a = []byte{
-	// 1032 bytes of a gzipped FileDescriptorProto
+	// 1033 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xdd, 0x6e, 0xe3, 0x44,
 	0x14, 0xae, 0xf3, 0xeb, 0x9c, 0xfc, 0xac, 0x7b, 0xfa, 0x97, 0xad, 0x4a, 0xb6, 0xb2, 0x56, 0xab,
 	0x52, 0x41, 0xc2, 0xb6, 0x70, 0x03, 0x52, 0xa5, 0x6e, 0xb2, 0x2c, 0x95, 0xd8, 0xb4, 0x9a, 0x44,
@@ -1408,43 +1409,43 @@ var fileDescriptor_4f782e76b37adb9a = []byte{
 	0x98, 0xdc, 0x5c, 0x95, 0xa8, 0x15, 0xd6, 0xa1, 0x78, 0x45, 0x6f, 0xc7, 0x9c, 0x3a, 0x52, 0x05,
 	0x15, 0xa2, 0x97, 0xd8, 0x82, 0x42, 0x9f, 0x06, 0x43, 0x26, 0xd4, 0x2f, 0x4f, 0xdd, 0xa3, 0x0a,
 	0xb3, 0x8f, 0xa0, 0x12, 0x65, 0x9c, 0xfa, 0xdc, 0x9b, 0xc6, 0xa8, 0x8d, 0x18, 0xb5, 0xfd, 0x4f,
-	0x0e, 0xa0, 0x3f, 0xf3, 0x74, 0x6d, 0xb2, 0x35, 0xf2, 0x73, 0xae, 0xcf, 0x85, 0x41, 0xff, 0x93,
-	0xcc, 0xfd, 0xff, 0xe4, 0x19, 0x14, 0x5e, 0x33, 0x31, 0xe2, 0x8e, 0xac, 0x76, 0x49, 0xcc, 0x91,
-	0x95, 0x28, 0x2f, 0x22, 0xe4, 0xbe, 0x1c, 0xd3, 0xa1, 0x6c, 0x6e, 0x95, 0xc8, 0x6f, 0x6c, 0x42,
-	0xa9, 0xdd, 0x55, 0x09, 0x95, 0xde, 0x2c, 0x09, 0x5f, 0x6a, 0x20, 0x59, 0x84, 0xe0, 0x17, 0x50,
-	0x8d, 0x84, 0xa7, 0x31, 0x91, 0xdc, 0x76, 0x74, 0xca, 0x98, 0x93, 0xc4, 0x63, 0xf1, 0x1c, 0x1e,
-	0x11, 0x3e, 0x1e, 0x0f, 0xe8, 0xf5, 0x1b, 0x0d, 0x2f, 0x4a, 0xf8, 0x9e, 0x86, 0x27, 0xdc, 0x24,
-	0x19, 0x8f, 0x67, 0x50, 0x53, 0x87, 0x45, 0x33, 0x98, 0x92, 0x61, 0x57, 0x33, 0xc4, 0xbd, 0x24,
-	0x11, 0x8d, 0x1d, 0xb0, 0x5e, 0x31, 0xa1, 0x4e, 0xb9, 0x62, 0x28, 0x49, 0x86, 0xba, 0x66, 0x48,
-	0xfa, 0xc9, 0x0a, 0x02, 0xaf, 0x60, 0x3b, 0xda, 0x99, 0x3e, 0x5b, 0x8a, 0x09, 0x24, 0xd3, 0x41,
-	0xbc, 0x19, 0xf1, 0x18, 0xb2, 0x16, 0x89, 0xdf, 0xc0, 0xae, 0xde, 0x6a, 0x82, 0xb3, 0x2c, 0x39,
-	0x1b, 0xc9, 0x0e, 0x25, 0x58, 0x53, 0xd0, 0xf6, 0xef, 0x79, 0x28, 0x4b, 0xb9, 0x29, 0x61, 0xde,
-	0xaf, 0xb7, 0x46, 0xaa, 0xde, 0xde, 0x5f, 0x69, 0x1f, 0x82, 0xd9, 0x9f, 0x79, 0x2f, 0x83, 0x80,
-	0x07, 0x4a, 0x68, 0x55, 0x8d, 0x96, 0x46, 0x32, 0x77, 0xe3, 0x67, 0xf1, 0xd3, 0x34, 0x1f, 0x69,
-	0x0b, 0x5d, 0x46, 0x0e, 0x12, 0x3f, 0x74, 0x67, 0x50, 0xd3, 0x7a, 0x53, 0xc0, 0x62, 0x5c, 0x1b,
-	0x71, 0x2f, 0x49, 0x44, 0x87, 0xda, 0x58, 0xc8, 0x4d, 0x31, 0x98, 0x71, 0x6d, 0x24, 0xfd, 0x64,
-	0x05, 0x11, 0x8a, 0x7c, 0xae, 0x39, 0x45, 0x52, 0x8a, 0x8b, 0x3c, 0xe1, 0x26, 0xc9, 0x78, 0x7c,
-	0x05, 0x9b, 0x4b, 0x92, 0x53, 0x24, 0x91, 0xb6, 0x1e, 0xaf, 0x51, 0xa9, 0xa2, 0x59, 0xc5, 0x60,
-	0x0f, 0x76, 0x12, 0x6a, 0x53, 0x64, 0x91, 0xa8, 0x3e, 0x48, 0x11, 0xaa, 0x22, 0x5c, 0x8f, 0xc5,
-	0xef, 0x60, 0x6f, 0x45, 0x6c, 0x8a, 0xb6, 0x22, 0x69, 0x9f, 0xa4, 0x6a, 0x55, 0x11, 0xa7, 0xe1,
-	0xed, 0x09, 0x58, 0xc9, 0x19, 0x12, 0xbb, 0x71, 0x8c, 0xff, 0x7a, 0xe3, 0xac, 0xdc, 0x13, 0x99,
-	0x75, 0xf7, 0xc4, 0x16, 0x6c, 0xae, 0xa8, 0xc2, 0xbe, 0x00, 0x5c, 0x1d, 0x44, 0x0f, 0xaa, 0xc2,
-	0xde, 0x81, 0xad, 0x35, 0x9a, 0xb1, 0x7f, 0x91, 0x69, 0x13, 0x83, 0xe9, 0x39, 0x14, 0x15, 0x4e,
-	0x5d, 0xc1, 0xa9, 0xfc, 0x3a, 0xee, 0x41, 0x77, 0xb1, 0xbd, 0x2d, 0xb7, 0x97, 0x50, 0x9c, 0xfd,
-	0x95, 0xac, 0x74, 0x65, 0xce, 0xfd, 0xff, 0xa2, 0xec, 0x5d, 0xd8, 0x5e, 0xa7, 0x4e, 0xfb, 0x6b,
-	0xd8, 0x4b, 0x99, 0x88, 0x0f, 0xc9, 0xb2, 0x0f, 0xf5, 0x34, 0xd9, 0xda, 0x5d, 0x78, 0x9c, 0x3a,
-	0x27, 0x1f, 0x92, 0xeb, 0x00, 0xf6, 0xd3, 0xb5, 0x6c, 0xc3, 0x62, 0xac, 0x1d, 0xff, 0x00, 0xa5,
-	0xf9, 0x13, 0x12, 0x01, 0x0a, 0xe7, 0xd7, 0xc2, 0xfd, 0x99, 0x59, 0x1b, 0x58, 0x01, 0x53, 0x3f,
-	0xf1, 0x2c, 0x03, 0x6b, 0x00, 0x51, 0xe5, 0xc2, 0xf5, 0x86, 0x56, 0x06, 0xab, 0x50, 0x52, 0x6b,
-	0xe6, 0x58, 0xd9, 0x30, 0xf8, 0x7c, 0xc0, 0x03, 0xe9, 0xcc, 0x61, 0x19, 0x8a, 0x72, 0xc5, 0x1c,
-	0x2b, 0x7f, 0xfc, 0xab, 0x4c, 0xa0, 0x86, 0xac, 0x09, 0xb9, 0xf0, 0x11, 0x65, 0x6d, 0x60, 0x09,
-	0xf2, 0xf2, 0x79, 0x64, 0x19, 0x61, 0xd6, 0x88, 0xcb, 0xca, 0x84, 0x44, 0xba, 0x6a, 0x2b, 0x1b,
-	0x12, 0xa9, 0x1a, 0xac, 0x5c, 0x98, 0x72, 0xfe, 0x8b, 0xac, 0x3c, 0x6e, 0xea, 0x5b, 0x5d, 0xed,
-	0xce, 0x2a, 0xe0, 0xd6, 0xe2, 0xae, 0xd6, 0xc6, 0xe2, 0x8b, 0xb3, 0x77, 0x7f, 0x37, 0x8c, 0xb7,
-	0x77, 0x0d, 0xe3, 0xdd, 0x5d, 0xc3, 0xf8, 0xeb, 0xae, 0x61, 0x7c, 0xff, 0xd1, 0xd2, 0xa3, 0x7d,
-	0x42, 0x45, 0xe0, 0xce, 0x78, 0xe0, 0x0e, 0x5d, 0x4f, 0x2f, 0x3c, 0xd6, 0xf2, 0xdf, 0x0c, 0x5b,
-	0xfe, 0xa0, 0x25, 0x66, 0xde, 0xa0, 0x20, 0x5f, 0xe6, 0xa7, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff,
-	0x77, 0x9c, 0x72, 0xbc, 0xfb, 0x0b, 0x00, 0x00,
+	0x0e, 0xa0, 0x3f, 0xf3, 0x74, 0x6d, 0xb2, 0x35, 0xf2, 0x53, 0xe9, 0x33, 0x47, 0x16, 0x06, 0xfd,
+	0x4f, 0x32, 0xf7, 0xff, 0x93, 0x67, 0x50, 0x78, 0xcd, 0xc4, 0x88, 0x3b, 0xb2, 0xda, 0x25, 0x31,
+	0x47, 0x56, 0xa2, 0xbc, 0x88, 0x90, 0xfb, 0x72, 0x4c, 0x87, 0xb2, 0xb9, 0x55, 0x22, 0xbf, 0xb1,
+	0x09, 0xa5, 0x76, 0x57, 0x25, 0x54, 0x7a, 0xb3, 0x24, 0x7c, 0xa9, 0x81, 0x64, 0x11, 0x82, 0x5f,
+	0x40, 0x35, 0x12, 0x9e, 0xc6, 0x44, 0x72, 0xdb, 0xd1, 0x29, 0x63, 0x4e, 0x12, 0x8f, 0xc5, 0x73,
+	0x78, 0x44, 0xf8, 0x78, 0x3c, 0xa0, 0xd7, 0x6f, 0x34, 0xbc, 0x28, 0xe1, 0x7b, 0x1a, 0x9e, 0x70,
+	0x93, 0x64, 0x3c, 0x9e, 0x41, 0x4d, 0x1d, 0x16, 0xcd, 0x60, 0x4a, 0x86, 0x5d, 0xcd, 0x10, 0xf7,
+	0x92, 0x44, 0x34, 0x76, 0xc0, 0x7a, 0xc5, 0x84, 0x3a, 0xe5, 0x8a, 0xa1, 0x24, 0x19, 0xea, 0x9a,
+	0x21, 0xe9, 0x27, 0x2b, 0x08, 0xbc, 0x82, 0xed, 0x68, 0x67, 0xfa, 0x6c, 0x29, 0x26, 0x90, 0x4c,
+	0x07, 0xf1, 0x66, 0xc4, 0x63, 0xc8, 0x5a, 0x24, 0x7e, 0x03, 0xbb, 0x7a, 0xab, 0x09, 0xce, 0xb2,
+	0xe4, 0x6c, 0x24, 0x3b, 0x94, 0x60, 0x4d, 0x41, 0xdb, 0xbf, 0xe7, 0xa1, 0x2c, 0xe5, 0xa6, 0x84,
+	0x79, 0xbf, 0xde, 0x1a, 0xa9, 0x7a, 0x7b, 0x7f, 0xa5, 0x7d, 0x08, 0x66, 0x7f, 0xe6, 0xbd, 0x0c,
+	0x02, 0x1e, 0x28, 0xa1, 0x55, 0x35, 0x5a, 0x1a, 0xc9, 0xdc, 0x8d, 0x9f, 0xc5, 0x4f, 0xd3, 0x7c,
+	0xa4, 0x2d, 0x74, 0x19, 0x39, 0x48, 0xfc, 0xd0, 0x9d, 0x41, 0x4d, 0xeb, 0x4d, 0x01, 0x8b, 0x71,
+	0x6d, 0xc4, 0xbd, 0x24, 0x11, 0x1d, 0x6a, 0x63, 0x21, 0x37, 0xc5, 0x60, 0xc6, 0xb5, 0x91, 0xf4,
+	0x93, 0x15, 0x44, 0x28, 0xf2, 0xb9, 0xe6, 0x14, 0x49, 0x29, 0x2e, 0xf2, 0x84, 0x9b, 0x24, 0xe3,
+	0xf1, 0x15, 0x6c, 0x2e, 0x49, 0x4e, 0x91, 0x44, 0xda, 0x7a, 0xbc, 0x46, 0xa5, 0x8a, 0x66, 0x15,
+	0x83, 0x3d, 0xd8, 0x49, 0xa8, 0x4d, 0x91, 0x45, 0xa2, 0xfa, 0x20, 0x45, 0xa8, 0x8a, 0x70, 0x3d,
+	0x16, 0xbf, 0x83, 0xbd, 0x15, 0xb1, 0x29, 0xda, 0x8a, 0xa4, 0x7d, 0x92, 0xaa, 0x55, 0x45, 0x9c,
+	0x86, 0xb7, 0x27, 0x60, 0x25, 0x67, 0x48, 0xec, 0xc6, 0x31, 0xfe, 0xeb, 0x8d, 0xb3, 0x72, 0x4f,
+	0x64, 0xd6, 0xdd, 0x13, 0x5b, 0xb0, 0xb9, 0xa2, 0x0a, 0xfb, 0x02, 0x70, 0x75, 0x10, 0x3d, 0xa8,
+	0x0a, 0x7b, 0x07, 0xb6, 0xd6, 0x68, 0xc6, 0xfe, 0x45, 0xa6, 0x4d, 0x0c, 0xa6, 0xe7, 0x50, 0x54,
+	0x38, 0x75, 0x05, 0xa7, 0xf2, 0xeb, 0xb8, 0x07, 0xdd, 0xc5, 0xf6, 0xb6, 0xdc, 0x5e, 0x42, 0x71,
+	0xf6, 0x57, 0xb2, 0xd2, 0x95, 0x39, 0xf7, 0xff, 0x8b, 0xb2, 0x77, 0x61, 0x7b, 0x9d, 0x3a, 0xed,
+	0xaf, 0x61, 0x2f, 0x65, 0x22, 0x3e, 0x24, 0xcb, 0x3e, 0xd4, 0xd3, 0x64, 0x6b, 0x77, 0xe1, 0x71,
+	0xea, 0x9c, 0x7c, 0x48, 0xae, 0x03, 0xd8, 0x4f, 0xd7, 0xb2, 0x0d, 0x8b, 0xb1, 0x76, 0xfc, 0x03,
+	0x94, 0xe6, 0x4f, 0x48, 0x04, 0x28, 0x9c, 0x5f, 0x0b, 0xf7, 0x67, 0x66, 0x6d, 0x60, 0x05, 0x4c,
+	0xfd, 0xc4, 0xb3, 0x0c, 0xac, 0x01, 0x44, 0x95, 0x0b, 0xd7, 0x1b, 0x5a, 0x19, 0xac, 0x42, 0x49,
+	0xad, 0x99, 0x63, 0x65, 0xc3, 0xe0, 0xf3, 0x01, 0x0f, 0xa4, 0x33, 0x87, 0x65, 0x28, 0xca, 0x15,
+	0x73, 0xac, 0xfc, 0xf1, 0xaf, 0x32, 0x81, 0x1a, 0xb2, 0x26, 0xe4, 0xc2, 0x47, 0x94, 0xb5, 0x81,
+	0x25, 0xc8, 0xcb, 0xe7, 0x91, 0x65, 0x84, 0x59, 0x23, 0x2e, 0x2b, 0x13, 0x12, 0xe9, 0xaa, 0xad,
+	0x6c, 0x48, 0xa4, 0x6a, 0xb0, 0x72, 0x61, 0xca, 0xf9, 0x2f, 0xb2, 0xf2, 0xb8, 0xa9, 0x6f, 0x75,
+	0xb5, 0x3b, 0xab, 0x80, 0x5b, 0x8b, 0xbb, 0x5a, 0x1b, 0x8b, 0x2f, 0xce, 0xde, 0xfd, 0xdd, 0x30,
+	0xde, 0xde, 0x35, 0x8c, 0x77, 0x77, 0x0d, 0xe3, 0xaf, 0xbb, 0x86, 0xf1, 0xfd, 0x47, 0x4b, 0x8f,
+	0xf6, 0x09, 0x15, 0x81, 0x3b, 0xe3, 0x81, 0x3b, 0x74, 0x3d, 0xbd, 0xf0, 0x58, 0xcb, 0x7f, 0x33,
+	0x6c, 0xf9, 0x83, 0x96, 0x98, 0x79, 0x83, 0x82, 0x7c, 0x99, 0x9f, 0xfe, 0x1b, 0x00, 0x00, 0xff,
+	0xff, 0x8d, 0x41, 0xe2, 0x5d, 0xfb, 0x0b, 0x00, 0x00,
 }
 
 func (m *TxnMeta) Marshal() (dAtA []byte, err error) {
@@ -1824,12 +1825,10 @@ func (m *TxnRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 	}
 	i--
 	dAtA[i] = 0x12
-	if len(m.RequestID) > 0 {
-		i -= len(m.RequestID)
-		copy(dAtA[i:], m.RequestID)
-		i = encodeVarintTxn(dAtA, i, uint64(len(m.RequestID)))
+	if m.RequestID != 0 {
+		i = encodeVarintTxn(dAtA, i, uint64(m.RequestID))
 		i--
-		dAtA[i] = 0xa
+		dAtA[i] = 0x8
 	}
 	return len(dAtA) - i, nil
 }
@@ -1976,12 +1975,10 @@ func (m *TxnResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
 		i--
 		dAtA[i] = 0x12
 	}
-	if len(m.RequestID) > 0 {
-		i -= len(m.RequestID)
-		copy(dAtA[i:], m.RequestID)
-		i = encodeVarintTxn(dAtA, i, uint64(len(m.RequestID)))
+	if m.RequestID != 0 {
+		i = encodeVarintTxn(dAtA, i, uint64(m.RequestID))
 		i--
-		dAtA[i] = 0xa
+		dAtA[i] = 0x8
 	}
 	return len(dAtA) - i, nil
 }
@@ -2545,9 +2542,8 @@ func (m *TxnRequest) Size() (n int) {
 	}
 	var l int
 	_ = l
-	l = len(m.RequestID)
-	if l > 0 {
-		n += 1 + l + sovTxn(uint64(l))
+	if m.RequestID != 0 {
+		n += 1 + sovTxn(uint64(m.RequestID))
 	}
 	l = m.Txn.Size()
 	n += 1 + l + sovTxn(uint64(l))
@@ -2597,9 +2593,8 @@ func (m *TxnResponse) Size() (n int) {
 	}
 	var l int
 	_ = l
-	l = len(m.RequestID)
-	if l > 0 {
-		n += 1 + l + sovTxn(uint64(l))
+	if m.RequestID != 0 {
+		n += 1 + sovTxn(uint64(m.RequestID))
 	}
 	if m.Txn != nil {
 		l = m.Txn.Size()
@@ -3515,10 +3510,10 @@ func (m *TxnRequest) Unmarshal(dAtA []byte) error {
 		}
 		switch fieldNum {
 		case 1:
-			if wireType != 2 {
+			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType)
 			}
-			var byteLen int
+			m.RequestID = 0
 			for shift := uint(0); ; shift += 7 {
 				if shift >= 64 {
 					return ErrIntOverflowTxn
@@ -3528,26 +3523,11 @@ func (m *TxnRequest) Unmarshal(dAtA []byte) error {
 				}
 				b := dAtA[iNdEx]
 				iNdEx++
-				byteLen |= int(b&0x7F) << shift
+				m.RequestID |= uint64(b&0x7F) << shift
 				if b < 0x80 {
 					break
 				}
 			}
-			if byteLen < 0 {
-				return ErrInvalidLengthTxn
-			}
-			postIndex := iNdEx + byteLen
-			if postIndex < 0 {
-				return ErrInvalidLengthTxn
-			}
-			if postIndex > l {
-				return io.ErrUnexpectedEOF
-			}
-			m.RequestID = append(m.RequestID[:0], dAtA[iNdEx:postIndex]...)
-			if m.RequestID == nil {
-				m.RequestID = []byte{}
-			}
-			iNdEx = postIndex
 		case 2:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType)
@@ -3923,10 +3903,10 @@ func (m *TxnResponse) Unmarshal(dAtA []byte) error {
 		}
 		switch fieldNum {
 		case 1:
-			if wireType != 2 {
+			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType)
 			}
-			var byteLen int
+			m.RequestID = 0
 			for shift := uint(0); ; shift += 7 {
 				if shift >= 64 {
 					return ErrIntOverflowTxn
@@ -3936,26 +3916,11 @@ func (m *TxnResponse) Unmarshal(dAtA []byte) error {
 				}
 				b := dAtA[iNdEx]
 				iNdEx++
-				byteLen |= int(b&0x7F) << shift
+				m.RequestID |= uint64(b&0x7F) << shift
 				if b < 0x80 {
 					break
 				}
 			}
-			if byteLen < 0 {
-				return ErrInvalidLengthTxn
-			}
-			postIndex := iNdEx + byteLen
-			if postIndex < 0 {
-				return ErrInvalidLengthTxn
-			}
-			if postIndex > l {
-				return io.ErrUnexpectedEOF
-			}
-			m.RequestID = append(m.RequestID[:0], dAtA[iNdEx:postIndex]...)
-			if m.RequestID == nil {
-				m.RequestID = []byte{}
-			}
-			iNdEx = postIndex
 		case 2:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType)
diff --git a/pkg/txn/rpc/sender.go b/pkg/txn/rpc/sender.go
index 1760845f2..3995170c1 100644
--- a/pkg/txn/rpc/sender.go
+++ b/pkg/txn/rpc/sender.go
@@ -17,7 +17,6 @@ package rpc
 import (
 	"context"
 
-	"github.com/google/uuid"
 	"github.com/matrixorigin/matrixone/pkg/common/moerr"
 	"github.com/matrixorigin/matrixone/pkg/common/morpc"
 	"github.com/matrixorigin/matrixone/pkg/logutil"
@@ -50,7 +49,6 @@ func (s *sender) Close() error {
 
 func (s *sender) Send(ctx context.Context, requests []txn.TxnRequest) ([]txn.TxnResponse, error) {
 	if len(requests) == 1 {
-		requests[0].RequestID = s.getUUID()
 		resp, err := s.doSend(ctx, requests[0])
 		if err != nil {
 			return nil, err
@@ -113,11 +111,6 @@ func (s *sender) doSend(ctx context.Context, request txn.TxnRequest) (txn.TxnRes
 	return *(v.(*txn.TxnResponse)), nil
 }
 
-func (s *sender) getUUID() []byte {
-	id := uuid.New()
-	return id[:]
-}
-
 type executor struct {
 	ctx       context.Context
 	stream    morpc.Stream
diff --git a/pkg/txn/rpc/sender_test.go b/pkg/txn/rpc/sender_test.go
index 94842b08f..e04fdc979 100644
--- a/pkg/txn/rpc/sender_test.go
+++ b/pkg/txn/rpc/sender_test.go
@@ -15,7 +15,6 @@
 package rpc
 
 import (
-	"bytes"
 	"context"
 	"errors"
 	"fmt"
@@ -24,7 +23,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/google/uuid"
 	"github.com/matrixorigin/matrixone/pkg/common/morpc"
 	"github.com/matrixorigin/matrixone/pkg/pb/metadata"
 	"github.com/matrixorigin/matrixone/pkg/pb/txn"
@@ -45,7 +43,7 @@ func TestSendWithSingleRequest(t *testing.T) {
 
 	s.RegisterRequestHandler(func(request morpc.Message, sequence uint64, cs morpc.ClientSession) error {
 		return cs.Write(&txn.TxnResponse{
-			RequestID: request.ID(),
+			RequestID: request.GetID(),
 			Method:    txn.TxnMethod_Write,
 		}, morpc.SendOptions{})
 	})
@@ -84,7 +82,7 @@ func TestSendWithMultiDN(t *testing.T) {
 		s.RegisterRequestHandler(func(m morpc.Message, sequence uint64, cs morpc.ClientSession) error {
 			request := m.(*txn.TxnRequest)
 			return cs.Write(&txn.TxnResponse{
-				RequestID:    request.ID(),
+				RequestID:    request.GetID(),
 				CNOpResponse: &txn.CNOpResponse{Payload: []byte(fmt.Sprintf("%s-%d", request.GetTargetDN().Address, sequence))},
 			}, morpc.SendOptions{})
 		})
@@ -129,7 +127,7 @@ func TestSendWithMultiDN(t *testing.T) {
 }
 
 func TestNewExecutor(t *testing.T) {
-	ts := newTestStream(nil)
+	ts := newTestStream(1, nil)
 	defer func() {
 		assert.NoError(t, ts.Close())
 	}()
@@ -139,7 +137,7 @@ func TestNewExecutor(t *testing.T) {
 }
 
 func TestNewExectorWithClosedStream(t *testing.T) {
-	ts := newTestStream(nil)
+	ts := newTestStream(1, nil)
 	assert.NoError(t, ts.Close())
 	_, err := newExecutor(context.Background(), nil, ts)
 	assert.Error(t, err)
@@ -147,7 +145,7 @@ func TestNewExectorWithClosedStream(t *testing.T) {
 
 func TestExecute(t *testing.T) {
 	var requests []morpc.Message
-	ts := newTestStream(func(m morpc.Message) (morpc.Message, error) {
+	ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) {
 		requests = append(requests, m)
 		return m, nil
 	})
@@ -165,7 +163,7 @@ func TestExecute(t *testing.T) {
 
 	assert.Equal(t, n, len(requests))
 	for i := 0; i < n; i++ {
-		assert.Equal(t, ts.id, requests[i].ID())
+		assert.Equal(t, ts.id, requests[i].GetID())
 	}
 
 	assert.Equal(t, n, len(exec.indexes))
@@ -175,7 +173,7 @@ func TestExecute(t *testing.T) {
 }
 
 func TestExecuteWithClosedStream(t *testing.T) {
-	ts := newTestStream(nil)
+	ts := newTestStream(1, nil)
 
 	exec, err := newExecutor(context.Background(), nil, ts)
 	assert.NoError(t, err)
@@ -185,9 +183,9 @@ func TestExecuteWithClosedStream(t *testing.T) {
 }
 
 func TestWaitCompleted(t *testing.T) {
-	ts := newTestStream(func(m morpc.Message) (morpc.Message, error) {
+	ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) {
 		req := m.(*txn.TxnRequest)
-		return &txn.TxnResponse{RequestID: m.ID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil
+		return &txn.TxnResponse{RequestID: m.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil
 	})
 	defer func() {
 		assert.NoError(t, ts.Close())
@@ -208,9 +206,9 @@ func TestWaitCompleted(t *testing.T) {
 }
 
 func TestWaitCompletedWithContextDone(t *testing.T) {
-	ts := newTestStream(func(m morpc.Message) (morpc.Message, error) {
+	ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) {
 		req := m.(*txn.TxnRequest)
-		return &txn.TxnResponse{RequestID: m.ID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil
+		return &txn.TxnResponse{RequestID: m.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil
 	})
 	defer func() {
 		assert.NoError(t, ts.Close())
@@ -227,9 +225,9 @@ func TestWaitCompletedWithContextDone(t *testing.T) {
 }
 
 func TestWaitCompletedWithStreamClosed(t *testing.T) {
-	ts := newTestStream(func(m morpc.Message) (morpc.Message, error) {
+	ts := newTestStream(1, func(m morpc.Message) (morpc.Message, error) {
 		req := m.(*txn.TxnRequest)
-		return &txn.TxnResponse{RequestID: m.ID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil
+		return &txn.TxnResponse{RequestID: m.GetID(), CNOpResponse: &txn.CNOpResponse{Payload: req.CNRequest.Payload}}, nil
 	})
 
 	ctx, cancel := context.WithCancel(context.Background())
@@ -244,7 +242,7 @@ func TestWaitCompletedWithStreamClosed(t *testing.T) {
 }
 
 type testStream struct {
-	id     []byte
+	id     uint64
 	c      chan morpc.Message
 	handle func(morpc.Message) (morpc.Message, error)
 
@@ -254,17 +252,16 @@ type testStream struct {
 	}
 }
 
-func newTestStream(handle func(morpc.Message) (morpc.Message, error)) *testStream {
-	id := uuid.New()
+func newTestStream(id uint64, handle func(morpc.Message) (morpc.Message, error)) *testStream {
 	return &testStream{
-		id:     id[:],
+		id:     id,
 		c:      make(chan morpc.Message, 1024),
 		handle: handle,
 	}
 }
 
 func (s *testStream) Send(request morpc.Message, opts morpc.SendOptions) error {
-	if !bytes.Equal(s.id, request.ID()) {
+	if s.id != request.GetID() {
 		panic("request.id != stream.id")
 	}
 
@@ -304,7 +301,7 @@ func (s *testStream) Close() error {
 	return nil
 }
 
-func (s *testStream) ID() []byte {
+func (s *testStream) ID() uint64 {
 	return s.id
 }
 
diff --git a/proto/logservice.proto b/proto/logservice.proto
index 5d5aca461..c77892293 100644
--- a/proto/logservice.proto
+++ b/proto/logservice.proto
@@ -116,15 +116,16 @@ message LogRecord {
 };
 
 message Request {
-  MethodType Method   = 1;
-  string Name         = 2;
-  uint64 ShardID      = 3;
-  uint64 Index        = 4;
-  uint64 MaxSize      = 5;
-  int64 Timeout       = 6;
-
-  uint64 DNShardID    = 7;
-  uint64 DNID         = 8;
+  uint64 RequestID    = 1;
+  MethodType Method   = 2;
+  string Name         = 3;
+  uint64 ShardID      = 4;
+  uint64 Index        = 5;
+  uint64 MaxSize      = 6;
+  int64 Timeout       = 7;
+
+  uint64 DNShardID    = 8;
+  uint64 DNID         = 9;
 };
 
 enum ErrorCode {
@@ -147,12 +148,13 @@ enum ErrorCode {
 };
 
 message Response {
-  MethodType Method   = 1;
-  ErrorCode ErrorCode = 2;
-  string ErrorMessage = 3;
-  uint64 ShardID      = 4;
-  uint64 Index        = 5;
-  uint64 LastIndex    = 6;
+  uint64 RequestID    = 1;
+  MethodType Method   = 2;
+  ErrorCode ErrorCode = 3;
+  string ErrorMessage = 4;
+  uint64 ShardID      = 5;
+  uint64 Index        = 6;
+  uint64 LastIndex    = 7;
 };
 
 message LogRecordResponse {
diff --git a/proto/txn.proto b/proto/txn.proto
index 6349fd995..e5b3c8f2b 100644
--- a/proto/txn.proto
+++ b/proto/txn.proto
@@ -153,7 +153,7 @@ enum TxnMethod {
 // 2. DN -> DN (TxnMethod.Prepare, TxnMethod.GetStatus, TxnMethod.CommitDNShard, TxnMethod.RollbackDNShard)
 message TxnRequest {
     // RequestID request id
-    bytes                       RequestID                = 1;
+    uint64                      RequestID                = 1;
     // Txn transaction metadata
     TxnMeta                     Txn                      = 2 [(gogoproto.nullable) = false];
     // TxnMethod TxnRequest opCode, select the Request defined below according to TxnMethod.
@@ -179,7 +179,7 @@ message TxnRequest {
 // TxnResponse response of TxnRequest.
 message TxnResponse {
     // RequestID corresponding request id
-    bytes                        RequestID                 = 1;
+    uint64                       RequestID                 = 1;
     // Txn transaction metadata. TxnResponse.TxnMeta and TxnRequest.TxnMeta may differ 
     // in that the node initiating the TxnRequest needs to process the returned TxnMeta, 
     // e.g. to determine whether the transaction is Aborted by the status of the returned 
-- 
GitLab