diff --git a/pkg/logservice/client.go b/pkg/logservice/client.go index d87f5c956c3fc7eeb9deb2a0c4b12d74de8ed697..59c1631229a71c0c58525ffa1e90a8691d53296a 100644 --- a/pkg/logservice/client.go +++ b/pkg/logservice/client.go @@ -517,7 +517,7 @@ func getRPCClient(ctx context.Context, target string, pool *sync.Pool) (morpc.RP // we set connection timeout to a constant value so if ctx's deadline is much // larger, then we can ensure that all specified potential nodes have a chance // to be attempted - codec := morpc.NewMessageCodec(mf, defaultWriteSocketSize) + codec := morpc.NewMessageCodecWithChecksum(mf, defaultWriteSocketSize) bf := morpc.NewGoettyBasedBackendFactory(codec, backendOpts...) return morpc.NewClient(bf, clientOpts...) } diff --git a/pkg/logservice/service.go b/pkg/logservice/service.go index 9f35afcf360230612533ad89fc7af91beefb0859..279030cfe3c0f638cbf63dba1163fe36707e2ec3 100644 --- a/pkg/logservice/service.go +++ b/pkg/logservice/service.go @@ -99,7 +99,7 @@ func NewService(cfg Config, opts ...Option) (*Service, error) { return pool.Get().(*RPCRequest) } // TODO: check and fix all these magic numbers - codec := morpc.NewMessageCodec(mf, 16*1024) + codec := morpc.NewMessageCodecWithChecksum(mf, 16*1024) server, err := morpc.NewRPCServer(LogServiceRPCName, cfg.ServiceListenAddress, codec, morpc.WithServerGoettyOptions(goetty.WithSessionReleaseMsgFunc(func(i interface{}) { respPool.Put(i.(morpc.RPCMessage).Message) diff --git a/pkg/txn/rpc/sender.go b/pkg/txn/rpc/sender.go index 017848521b721b02546bfa0fd0f1cc59a309b3d0..d00cff4422e78a132bcf7446d806203d9c658d54 100644 --- a/pkg/txn/rpc/sender.go +++ b/pkg/txn/rpc/sender.go @@ -102,7 +102,7 @@ func NewSender(logger *zap.Logger, options ...SenderOption) (TxnSender, error) { }, } - codec := morpc.NewMessageCodec(func() morpc.Message { return s.acquireResponse() }, + codec := morpc.NewMessageCodecWithChecksum(func() morpc.Message { return s.acquireResponse() }, s.options.payloadCopyBufferSize) bf := morpc.NewGoettyBasedBackendFactory(codec, s.options.backendCreateOptions...) client, err := morpc.NewClient(bf, s.options.clientOptions...) diff --git a/pkg/txn/rpc/sender_test.go b/pkg/txn/rpc/sender_test.go index 382ce0fe87f980dd2927321b803dd456790ef98e..71a2e5d8a087ecab114af08ed588a32e3d774225 100644 --- a/pkg/txn/rpc/sender_test.go +++ b/pkg/txn/rpc/sender_test.go @@ -256,7 +256,7 @@ func TestNewSenderWithOptions(t *testing.T) { func newTestTxnServer(t assert.TestingT, addr string) morpc.RPCServer { assert.NoError(t, os.RemoveAll(addr[7:])) - codec := morpc.NewMessageCodec(func() morpc.Message { return &txn.TxnRequest{} }, 0) + codec := morpc.NewMessageCodecWithChecksum(func() morpc.Message { return &txn.TxnRequest{} }, 0) s, err := morpc.NewRPCServer("test-txn-server", addr, codec) assert.NoError(t, err) assert.NoError(t, s.Start()) diff --git a/pkg/txn/rpc/server.go b/pkg/txn/rpc/server.go index f004fa839ac9f26d59bcf33d61e85de07f502197..f1b641d8c41aed1f77eeb5e8a60771cdb0857b75 100644 --- a/pkg/txn/rpc/server.go +++ b/pkg/txn/rpc/server.go @@ -58,7 +58,7 @@ func NewTxnServer(address string, logger *zap.Logger) (TxnServer, error) { } rpc, err := morpc.NewRPCServer("txn-server", address, - morpc.NewMessageCodec(s.acquireRequest, 16*1024), + morpc.NewMessageCodecWithChecksum(s.acquireRequest, 16*1024), morpc.WithServerLogger(s.logger), morpc.WithServerGoettyOptions(goetty.WithSessionReleaseMsgFunc(func(v interface{}) { m := v.(morpc.RPCMessage)