Skip to content
Snippets Groups Projects
Unverified Commit a373f993 authored by fagongzi's avatar fagongzi Committed by GitHub
Browse files

morpc: active checksum in cn, dn and logservice (#4592)

parent 85948be3
No related branches found
No related tags found
No related merge requests found
...@@ -517,7 +517,7 @@ func getRPCClient(ctx context.Context, target string, pool *sync.Pool) (morpc.RP ...@@ -517,7 +517,7 @@ func getRPCClient(ctx context.Context, target string, pool *sync.Pool) (morpc.RP
// we set connection timeout to a constant value so if ctx's deadline is much // we set connection timeout to a constant value so if ctx's deadline is much
// larger, then we can ensure that all specified potential nodes have a chance // larger, then we can ensure that all specified potential nodes have a chance
// to be attempted // to be attempted
codec := morpc.NewMessageCodec(mf, defaultWriteSocketSize) codec := morpc.NewMessageCodecWithChecksum(mf, defaultWriteSocketSize)
bf := morpc.NewGoettyBasedBackendFactory(codec, backendOpts...) bf := morpc.NewGoettyBasedBackendFactory(codec, backendOpts...)
return morpc.NewClient(bf, clientOpts...) return morpc.NewClient(bf, clientOpts...)
} }
...@@ -99,7 +99,7 @@ func NewService(cfg Config, opts ...Option) (*Service, error) { ...@@ -99,7 +99,7 @@ func NewService(cfg Config, opts ...Option) (*Service, error) {
return pool.Get().(*RPCRequest) return pool.Get().(*RPCRequest)
} }
// TODO: check and fix all these magic numbers // TODO: check and fix all these magic numbers
codec := morpc.NewMessageCodec(mf, 16*1024) codec := morpc.NewMessageCodecWithChecksum(mf, 16*1024)
server, err := morpc.NewRPCServer(LogServiceRPCName, cfg.ServiceListenAddress, codec, server, err := morpc.NewRPCServer(LogServiceRPCName, cfg.ServiceListenAddress, codec,
morpc.WithServerGoettyOptions(goetty.WithSessionReleaseMsgFunc(func(i interface{}) { morpc.WithServerGoettyOptions(goetty.WithSessionReleaseMsgFunc(func(i interface{}) {
respPool.Put(i.(morpc.RPCMessage).Message) respPool.Put(i.(morpc.RPCMessage).Message)
......
...@@ -102,7 +102,7 @@ func NewSender(logger *zap.Logger, options ...SenderOption) (TxnSender, error) { ...@@ -102,7 +102,7 @@ func NewSender(logger *zap.Logger, options ...SenderOption) (TxnSender, error) {
}, },
} }
codec := morpc.NewMessageCodec(func() morpc.Message { return s.acquireResponse() }, codec := morpc.NewMessageCodecWithChecksum(func() morpc.Message { return s.acquireResponse() },
s.options.payloadCopyBufferSize) s.options.payloadCopyBufferSize)
bf := morpc.NewGoettyBasedBackendFactory(codec, s.options.backendCreateOptions...) bf := morpc.NewGoettyBasedBackendFactory(codec, s.options.backendCreateOptions...)
client, err := morpc.NewClient(bf, s.options.clientOptions...) client, err := morpc.NewClient(bf, s.options.clientOptions...)
......
...@@ -256,7 +256,7 @@ func TestNewSenderWithOptions(t *testing.T) { ...@@ -256,7 +256,7 @@ func TestNewSenderWithOptions(t *testing.T) {
func newTestTxnServer(t assert.TestingT, addr string) morpc.RPCServer { func newTestTxnServer(t assert.TestingT, addr string) morpc.RPCServer {
assert.NoError(t, os.RemoveAll(addr[7:])) assert.NoError(t, os.RemoveAll(addr[7:]))
codec := morpc.NewMessageCodec(func() morpc.Message { return &txn.TxnRequest{} }, 0) codec := morpc.NewMessageCodecWithChecksum(func() morpc.Message { return &txn.TxnRequest{} }, 0)
s, err := morpc.NewRPCServer("test-txn-server", addr, codec) s, err := morpc.NewRPCServer("test-txn-server", addr, codec)
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, s.Start()) assert.NoError(t, s.Start())
......
...@@ -58,7 +58,7 @@ func NewTxnServer(address string, logger *zap.Logger) (TxnServer, error) { ...@@ -58,7 +58,7 @@ func NewTxnServer(address string, logger *zap.Logger) (TxnServer, error) {
} }
rpc, err := morpc.NewRPCServer("txn-server", address, rpc, err := morpc.NewRPCServer("txn-server", address,
morpc.NewMessageCodec(s.acquireRequest, 16*1024), morpc.NewMessageCodecWithChecksum(s.acquireRequest, 16*1024),
morpc.WithServerLogger(s.logger), morpc.WithServerLogger(s.logger),
morpc.WithServerGoettyOptions(goetty.WithSessionReleaseMsgFunc(func(v interface{}) { morpc.WithServerGoettyOptions(goetty.WithSessionReleaseMsgFunc(func(v interface{}) {
m := v.(morpc.RPCMessage) m := v.(morpc.RPCMessage)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment