diff --git a/go.mod b/go.mod
index c99ccf52af7c6bf45ede489c61016020d4c2b0c7..a0851e16b312ce318a4a0a34d50b56519c271110 100644
--- a/go.mod
+++ b/go.mod
@@ -12,8 +12,7 @@ require (
github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a
github.com/cespare/xxhash/v2 v2.1.2
github.com/docker/go-units v0.4.0
- github.com/fagongzi/goetty v1.13.0
- github.com/fagongzi/goetty/v2 v2.0.3-0.20220811042451-6d175dc0a90f
+ github.com/fagongzi/goetty/v2 v2.0.3-0.20220812142536-dfcb3d33cfdc
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 73b5ee4d56ed3236430f0c079bc5e7936e5e1137..6ed769c11aee76da504fea2b7340e6d1d6bf6ef6 100644
--- a/go.sum
+++ b/go.sum
@@ -160,10 +160,9 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
-github.com/fagongzi/goetty v1.13.0 h1:k1RT+32/O97zu370BhHk8sL/kSAqF2ebSY1Nj+x/dBg=
github.com/fagongzi/goetty v1.13.0/go.mod h1:M1HA6OyQA3/4bV70W588Uek6sF7NxgxT/3gGhlf0yj0=
-github.com/fagongzi/goetty/v2 v2.0.3-0.20220811042451-6d175dc0a90f h1:D8nf+Mml+TpO2GFodU1Pm+WaSHbqvl2Eft7CMWsYi9I=
-github.com/fagongzi/goetty/v2 v2.0.3-0.20220811042451-6d175dc0a90f/go.mod h1:OwIBpVwRW1HjF/Jhc2Av3UvG2NygMg+bdqGxZaqwhU0=
+github.com/fagongzi/goetty/v2 v2.0.3-0.20220812142536-dfcb3d33cfdc h1:OBW9mZf4muo1ShBEH1zZXUFxVqSve7Hc3uRFg2+66JQ=
+github.com/fagongzi/goetty/v2 v2.0.3-0.20220812142536-dfcb3d33cfdc/go.mod h1:OwIBpVwRW1HjF/Jhc2Av3UvG2NygMg+bdqGxZaqwhU0=
github.com/fagongzi/util v0.0.0-20201116094402-221cc40c4593/go.mod h1:jYDIbpaqHXCCQ7QIDXRVfsQYAGKSNNb6N8BPTgdpcdE=
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d h1:1pILVCatHj3eVo9i52dZyY4BwjTmSIeN+/hoJh8rD0Y=
github.com/fagongzi/util v0.0.0-20210923134909-bccc37b5040d/go.mod h1:5cqSns2zMRcJeVGvAqeTrbXFqh5AqBFr5uVKP9T2kiE=
diff --git a/pkg/frontend/codec.go b/pkg/frontend/codec.go
index c110e55a53391ccc8aeec9decfa309762c50f46a..971d7829ed8d306059814eae7feffbd03260387a 100644
--- a/pkg/frontend/codec.go
+++ b/pkg/frontend/codec.go
@@ -16,16 +16,16 @@ package frontend
import (
"fmt"
+ "io"
- "github.com/fagongzi/goetty/buf"
- "github.com/fagongzi/goetty/codec"
+ "github.com/fagongzi/goetty/v2/buf"
+ "github.com/fagongzi/goetty/v2/codec"
)
const PacketHeaderLength = 4
-func NewSqlCodec() (codec.Encoder, codec.Decoder) {
- c := &sqlCodec{}
- return c, c
+func NewSqlCodec() codec.Codec {
+ return &sqlCodec{}
}
type sqlCodec struct {
@@ -37,43 +37,23 @@ type Packet struct {
Payload []byte
}
-func (c *sqlCodec) Decode(in *buf.ByteBuf) (bool, interface{}, error) {
+func (c *sqlCodec) Decode(in *buf.ByteBuf) (interface{}, bool, error) {
readable := in.Readable()
if readable < PacketHeaderLength {
- return false, nil, nil
- }
-
- header, err := in.PeekN(0, PacketHeaderLength)
- if err != nil {
- return false, "", err
+ return nil, false, nil
}
+ header := in.PeekN(0, PacketHeaderLength)
length := int32(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
sequenceID := int8(header[3])
if readable < int(length)+PacketHeaderLength {
- return false, nil, nil
- }
-
- err = in.Skip(PacketHeaderLength)
- if err != nil {
- return true, nil, err
- }
-
- err = in.MarkN(int(length))
- if err != nil {
- if length == 0 {
- packet := &Packet{
- Length: 0,
- SequenceID: sequenceID,
- Payload: make([]byte, 0),
- }
- return true, packet, nil
- }
- return false, nil, err
+ return nil, false, nil
}
- _, payload, _ := in.ReadMarkedBytes()
+ in.Skip(PacketHeaderLength)
+ in.SetMarkIndex(in.GetReadIndex() + int(length))
+ payload := in.ReadMarkedData()
packet := &Packet{
Length: length,
@@ -81,10 +61,10 @@ func (c *sqlCodec) Decode(in *buf.ByteBuf) (bool, interface{}, error) {
Payload: payload,
}
- return true, packet, nil
+ return packet, true, nil
}
-func (c *sqlCodec) Encode(data interface{}, out *buf.ByteBuf) error {
+func (c *sqlCodec) Encode(data interface{}, out *buf.ByteBuf, writer io.Writer) error {
x := data.([]byte)
xlen := len(x)
tlen, err := out.Write(data.([]byte))
diff --git a/pkg/frontend/iopackage_test.go b/pkg/frontend/iopackage_test.go
index 9734ed1c2e87bfeae6972cee14085605a32f1d2a..96031889aee78b90b4d9a81684719e83a3d60d17 100644
--- a/pkg/frontend/iopackage_test.go
+++ b/pkg/frontend/iopackage_test.go
@@ -22,9 +22,9 @@ import (
"testing"
"time"
- "github.com/fagongzi/goetty"
- "github.com/fagongzi/goetty/codec"
- "github.com/fagongzi/goetty/codec/simple"
+ "github.com/fagongzi/goetty/v2"
+ "github.com/fagongzi/goetty/v2/codec"
+ "github.com/fagongzi/goetty/v2/codec/simple"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/smartystreets/goconvey/convey"
)
@@ -239,7 +239,7 @@ func echoHandler(session goetty.IOSession, msg interface{}, received uint64) err
return errors.New("convert to string failed")
}
- err := session.WriteAndFlush(value)
+ err := session.Write(value, goetty.WriteOptions{Flush: true})
if err != nil {
return err
}
@@ -247,11 +247,11 @@ func echoHandler(session goetty.IOSession, msg interface{}, received uint64) err
}
func echoServer(handler func(goetty.IOSession, interface{}, uint64) error, aware goetty.IOSessionAware,
- encoder codec.Encoder, decoder codec.Decoder) {
- echoServer, err := goetty.NewTCPApplication("127.0.0.1:6001", handler,
+ codec codec.Codec) {
+ echoServer, err := goetty.NewApplication("127.0.0.1:6001", handler,
goetty.WithAppSessionOptions(
- goetty.WithCodec(encoder, decoder),
- goetty.WithLogger(logutil.GetGlobalLogger())),
+ goetty.WithSessionCodec(codec),
+ goetty.WithSessionLogger(logutil.GetGlobalLogger())),
goetty.WithAppSessionAware(aware))
if err != nil {
panic(err)
@@ -275,9 +275,8 @@ func echoServer(handler func(goetty.IOSession, interface{}, uint64) error, aware
func echoClient() {
addrPort := "localhost:6001"
- encoder, decoder := simple.NewStringCodec()
- io := goetty.NewIOSession(goetty.WithCodec(encoder, decoder))
- _, err := io.Connect(addrPort, time.Second*3)
+ io := goetty.NewIOSession(goetty.WithSessionCodec(simple.NewStringCodec()))
+ err := io.Connect(addrPort, time.Second*3)
if err != nil {
fmt.Println("connect server failed.", err.Error())
return
@@ -286,13 +285,13 @@ func echoClient() {
alphabet := [10]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
for i := 0; i < 10; i++ {
- err := io.WriteAndFlush(alphabet[i])
+ err := io.Write(alphabet[i], goetty.WriteOptions{Flush: true})
if err != nil {
fmt.Println("client writes packet failed.", err.Error())
break
}
fmt.Printf("client writes %s.\n", alphabet[i])
- data, err := io.Read()
+ data, err := io.Read(goetty.ReadOptions{})
if err != nil {
fmt.Println("client reads packet failed.", err.Error())
break
@@ -315,12 +314,11 @@ func echoClient() {
}
func TestIOPackageImpl_ReadPacket(t *testing.T) {
- encoder, decoder := simple.NewStringCodec()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
- echoServer(echoHandler, nil, encoder, decoder)
+ echoServer(echoHandler, nil, simple.NewStringCodec())
}()
to := NewTimeout(1*time.Minute, false)
diff --git a/pkg/frontend/load_test.go b/pkg/frontend/load_test.go
index 58fa8b199ea2cf1b4a1d2f276045c300c16c96cc..044abeabb7e6c8294037c93041a9cc64a629c655 100644
--- a/pkg/frontend/load_test.go
+++ b/pkg/frontend/load_test.go
@@ -18,14 +18,14 @@ import (
"context"
"errors"
"fmt"
- "github.com/matrixorigin/matrixone/pkg/config"
"os"
"sync/atomic"
"testing"
"time"
- "github.com/fagongzi/goetty/buf"
+ "github.com/fagongzi/goetty/v2/buf"
"github.com/golang/mock/gomock"
+ "github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -173,7 +173,7 @@ func Test_load(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
cws := []ComputationWrapper{}
@@ -358,7 +358,7 @@ func Test_load(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
cws := []*tree.Load{}
diff --git a/pkg/frontend/mysql_cmd_executor_test.go b/pkg/frontend/mysql_cmd_executor_test.go
index 008017b3f431eebae6f69fd521cfa9ccb0c79201..82abf1c22556b7d61b06cd17dbcce8d7972ecab6 100644
--- a/pkg/frontend/mysql_cmd_executor_test.go
+++ b/pkg/frontend/mysql_cmd_executor_test.go
@@ -17,14 +17,11 @@ package frontend
import (
"context"
"fmt"
- "github.com/matrixorigin/matrixone/pkg/config"
- "github.com/matrixorigin/matrixone/pkg/util/trace"
"testing"
- "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
-
- "github.com/fagongzi/goetty/buf"
+ "github.com/fagongzi/goetty/v2/buf"
"github.com/golang/mock/gomock"
+ "github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -33,6 +30,8 @@ import (
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
+ "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
+ "github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
"github.com/matrixorigin/matrixone/pkg/vm/process"
@@ -63,7 +62,7 @@ func Test_mce(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
use_t := mock_frontend.NewMockComputationWrapper(ctrl)
stmts, err := parsers.Parse(dialect.MYSQL, "use T")
@@ -292,7 +291,7 @@ func Test_mce_selfhandle(t *testing.T) {
eng.EXPECT().StartTxn(nil).Return(txn, nil).AnyTimes()
ioses := mock_frontend.NewMockIOSession(ctrl)
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
@@ -337,7 +336,7 @@ func Test_mce_selfhandle(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
@@ -439,7 +438,7 @@ func Test_getDataFromPipeline(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
@@ -703,7 +702,7 @@ func Test_handleSelectVariables(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
@@ -741,7 +740,7 @@ func Test_handleShowVariables(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
@@ -788,7 +787,7 @@ func Test_handleShowCreateTable(t *testing.T) {
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
eng := mock_frontend.NewMockEngine(ctrl)
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
@@ -823,7 +822,7 @@ func Test_handleShowCreateDatabase(t *testing.T) {
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
eng := mock_frontend.NewMockEngine(ctrl)
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
@@ -854,7 +853,7 @@ func Test_handleShowColumns(t *testing.T) {
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
eng := mock_frontend.NewMockEngine(ctrl)
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
@@ -891,7 +890,7 @@ func runTestHandle(funName string, t *testing.T, handleFun func(*MysqlCmdExecuto
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
@@ -983,7 +982,7 @@ func Test_CMD_FIELD_LIST(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pu, err := getParameterUnit("test/system_vars_config.toml", eng)
if err != nil {
diff --git a/pkg/frontend/mysql_protocol.go b/pkg/frontend/mysql_protocol.go
index d44281696753adbea166d186025bbb8a0192894e..8e3536b773b26162964bf9f8babe3cb74ba5fd83 100644
--- a/pkg/frontend/mysql_protocol.go
+++ b/pkg/frontend/mysql_protocol.go
@@ -25,7 +25,7 @@ import (
"time"
"unicode"
- "github.com/fagongzi/goetty"
+ "github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/defines"
@@ -1031,7 +1031,7 @@ func (mp *MysqlProtocolImpl) negotiateAuthenticationMethod() ([]byte, error) {
return nil, err
}
- read, err := mp.tcpConn.Read()
+ read, err := mp.tcpConn.Read(goetty.ReadOptions{})
if err != nil {
return nil, err
}
@@ -1472,7 +1472,8 @@ func (mp *MysqlProtocolImpl) flushOutBuffer() error {
if mp.bytesInOutBuffer >= mp.untilBytesInOutbufToFlush {
mp.flushCount++
mp.writeBytes += uint64(mp.bytesInOutBuffer)
- err := mp.tcpConn.Flush()
+ // FIXME: use a suitable timeout value
+ err := mp.tcpConn.Flush(0)
if err != nil {
return err
}
@@ -1489,16 +1490,16 @@ func (mp *MysqlProtocolImpl) openPacket() error {
outbuf := mp.tcpConn.OutBuf()
n := 4
- outbuf.Expansion(n)
+ outbuf.Grow(n)
writeIdx := outbuf.GetWriteIndex()
mp.beginWriteIndex = writeIdx
writeIdx += n
mp.bytesInOutBuffer += n
- err := outbuf.SetWriterIndex(writeIdx)
+ outbuf.SetWriteIndex(writeIdx)
if mp.enableLog {
logutil.Infof("openPacket curWriteIdx %d\n", outbuf.GetWriteIndex())
}
- return err
+ return nil
}
// fill the packet with data
@@ -1528,16 +1529,13 @@ func (mp *MysqlProtocolImpl) fillPacket(elems ...byte) error {
if curLen < 0 {
return fmt.Errorf("needLen %d < 0. hasDataLen %d n - i %d", curLen, hasDataLen, n-i)
}
- outbuf.Expansion(curLen)
+ outbuf.Grow(curLen)
buf = outbuf.RawBuf()
writeIdx := outbuf.GetWriteIndex()
copy(buf[writeIdx:], elems[i:i+curLen])
writeIdx += curLen
mp.bytesInOutBuffer += curLen
- err = outbuf.SetWriterIndex(writeIdx)
- if err != nil {
- return err
- }
+ outbuf.SetWriteIndex(writeIdx)
if mp.enableLog {
logutil.Infof("fillPacket curWriteIdx %d\n", outbuf.GetWriteIndex())
}
@@ -1718,7 +1716,7 @@ func (mp *MysqlProtocolImpl) writePackets(payload []byte) error {
//send packet
var packet = append(header[:], payload[i:i+curLen]...)
- err := mp.tcpConn.WriteAndFlush(packet)
+ err := mp.tcpConn.Write(packet, goetty.WriteOptions{Flush: true})
if err != nil {
return err
}
@@ -1734,7 +1732,7 @@ func (mp *MysqlProtocolImpl) writePackets(payload []byte) error {
header[3] = mp.sequenceId
//send header / zero-sized packet
- err := mp.tcpConn.WriteAndFlush(header[:])
+ err := mp.tcpConn.Write(header[:], goetty.WriteOptions{Flush: true})
if err != nil {
return err
}
diff --git a/pkg/frontend/mysql_protocol_test.go b/pkg/frontend/mysql_protocol_test.go
index 07ee221da55b08c4845879793a0465a6abd76763..04c87fce5c9d32f404192210d04dec0bebbeca92 100644
--- a/pkg/frontend/mysql_protocol_test.go
+++ b/pkg/frontend/mysql_protocol_test.go
@@ -16,6 +16,7 @@ package frontend
import (
"bytes"
+ "database/sql"
"encoding/binary"
"errors"
"fmt"
@@ -26,22 +27,19 @@ import (
"testing"
"time"
- "github.com/fagongzi/goetty/buf"
+ "github.com/fagongzi/goetty/v2"
+ "github.com/fagongzi/goetty/v2/buf"
+ _ "github.com/go-sql-driver/mysql"
"github.com/golang/mock/gomock"
fuzz "github.com/google/gofuzz"
- "github.com/matrixorigin/matrixone/pkg/container/types"
- mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
- "github.com/stretchr/testify/require"
-
- "database/sql"
-
- "github.com/fagongzi/goetty"
- _ "github.com/go-sql-driver/mysql"
"github.com/matrixorigin/matrixone/pkg/config"
+ "github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/defines"
+ mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/vm/mempool"
"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
"github.com/smartystreets/goconvey/convey"
+ "github.com/stretchr/testify/require"
)
type TestRoutineManager struct {
@@ -277,15 +275,13 @@ func TestMysqlClientProtocol_Handshake(t *testing.T) {
rm := NewRoutineManager(pu)
rm.SetSkipCheckUser(true)
- encoder, decoder := NewSqlCodec()
-
wg := sync.WaitGroup{}
wg.Add(1)
//running server
go func() {
defer wg.Done()
- echoServer(rm.Handler, rm, encoder, decoder)
+ echoServer(rm.Handler, rm, NewSqlCodec())
}()
to := NewTimeout(1*time.Minute, false)
@@ -971,7 +967,7 @@ func (tRM *TestRoutineManager) resultsetHandler(rs goetty.IOSession, msg interfa
payload := packet.Payload
for uint32(length) == MaxPayloadSize {
var err error
- msg, err = pro.tcpConn.Read()
+ msg, err = pro.tcpConn.Read(goetty.ReadOptions{})
if err != nil {
return errors.New("read msg error")
}
@@ -1206,7 +1202,6 @@ func TestMysqlResultSet(t *testing.T) {
config.Mempool = mempool.New( /*int(config.GlobalSystemVariables.GetMempoolMaxSize()), int(config.GlobalSystemVariables.GetMempoolFactor())*/ )
pu := config.NewParameterUnit(&config.GlobalSystemVariables, config.HostMmu, config.Mempool, config.StorageEngine, config.ClusterNodes)
- encoder, decoder := NewSqlCodec()
trm := NewTestRoutineManager(pu)
wg := sync.WaitGroup{}
@@ -1214,7 +1209,7 @@ func TestMysqlResultSet(t *testing.T) {
go func() {
defer wg.Done()
- echoServer(trm.resultsetHandler, trm, encoder, decoder)
+ echoServer(trm.resultsetHandler, trm, NewSqlCodec())
}()
to := NewTimeout(1*time.Minute, false)
@@ -1423,7 +1418,7 @@ func Test_writePackets(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1440,7 +1435,7 @@ func Test_writePackets(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
cnt := 0
- ioses.EXPECT().WriteAndFlush(gomock.Any()).DoAndReturn(func(msg interface{}) error {
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(msg interface{}, opts goetty.WriteOptions) error {
if cnt == 0 {
cnt++
return nil
@@ -1465,7 +1460,7 @@ func Test_writePackets(t *testing.T) {
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
- ioses.EXPECT().WriteAndFlush(gomock.Any()).DoAndReturn(func(msg interface{}) error {
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(msg interface{}, opts goetty.WriteOptions) error {
return fmt.Errorf("write and flush failed.")
}).AnyTimes()
@@ -1507,7 +1502,7 @@ func Test_openpacket(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().Flush().Return(nil).AnyTimes()
+ ioses.EXPECT().Flush(gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1553,7 +1548,7 @@ func Test_openpacket(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().Flush().Return(nil).AnyTimes()
+ ioses.EXPECT().Flush(gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1657,7 +1652,7 @@ func Test_resultset(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1678,7 +1673,7 @@ func Test_resultset(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1699,8 +1694,7 @@ func Test_resultset(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
-
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1726,8 +1720,7 @@ func Test_send_packet(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
-
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1745,8 +1738,7 @@ func Test_send_packet(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
-
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
@@ -1899,8 +1891,8 @@ func Test_analyse41resp(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
- ioses.EXPECT().Read().Return(new(Packet), nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Read(gomock.Any()).Return(new(Packet), nil).AnyTimes()
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
t.Error(err)
@@ -1999,7 +1991,7 @@ func Test_handleHandshake(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
var IO IOPackageImpl
var SV = &config.SystemVariables{}
@@ -2029,7 +2021,7 @@ func Test_handleHandshake_Recover(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ioses := mock_frontend.NewMockIOSession(ctrl)
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
convey.Convey("handleHandshake succ", t, func() {
var IO IOPackageImpl
diff --git a/pkg/frontend/protocol.go b/pkg/frontend/protocol.go
index bf541504ebd7b6d7789defd342e2fc641ddcb555..2364ca74de3066dcee53253cdf13aad0935e2630 100644
--- a/pkg/frontend/protocol.go
+++ b/pkg/frontend/protocol.go
@@ -20,7 +20,7 @@ import (
"net"
"sync"
- "github.com/fagongzi/goetty"
+ "github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/logutil"
)
@@ -211,7 +211,7 @@ func (cpi *ProtocolImpl) SetTcpConnection(tcp goetty.IOSession) {
}
func (cpi *ProtocolImpl) Peer() (string, string) {
- addr := cpi.tcpConn.RemoteAddr()
+ addr := cpi.tcpConn.RemoteAddress()
host, port, err := net.SplitHostPort(addr)
if err != nil {
logutil.Errorf("get peer host:port failed. error:%v ", err)
diff --git a/pkg/frontend/protocol_test.go b/pkg/frontend/protocol_test.go
index 5a51ad4882220912a9fa5a7f55212d30832ef5a5..30204f9819ded2b1bb3debd9db6b43b35ca7a169 100644
--- a/pkg/frontend/protocol_test.go
+++ b/pkg/frontend/protocol_test.go
@@ -18,9 +18,9 @@ import (
"errors"
"testing"
- "github.com/fagongzi/goetty"
- "github.com/fagongzi/goetty/buf"
- "github.com/fagongzi/goetty/codec/simple"
+ "github.com/fagongzi/goetty/v2"
+ "github.com/fagongzi/goetty/v2/buf"
+ "github.com/fagongzi/goetty/v2/codec/simple"
"github.com/golang/mock/gomock"
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/smartystreets/goconvey/convey"
@@ -40,8 +40,7 @@ func Test_protocol(t *testing.T) {
convey.So(res.GetCategory(), convey.ShouldEqual, 2)
cpi := &ProtocolImpl{}
- encoder, decoder := simple.NewStringCodec()
- io := goetty.NewIOSession(goetty.WithCodec(encoder, decoder))
+ io := goetty.NewIOSession(goetty.WithSessionCodec(simple.NewStringCodec()))
cpi.tcpConn = io
str1, str2 := cpi.Peer()
@@ -62,7 +61,7 @@ func Test_SendResponse(t *testing.T) {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mp := &MysqlProtocolImpl{}
mp.io = iopackage
diff --git a/pkg/frontend/routine_manager.go b/pkg/frontend/routine_manager.go
index 54733c2366800b1b20be784a044af737c3ba290a..cedcc636868e3e830496006d20f64b42c2e87a2e 100644
--- a/pkg/frontend/routine_manager.go
+++ b/pkg/frontend/routine_manager.go
@@ -18,7 +18,7 @@ import (
"errors"
"sync"
- "github.com/fagongzi/goetty"
+ "github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/logutil"
)
@@ -66,7 +66,7 @@ func (rm *RoutineManager) Created(rs goetty.IOSession) {
rm.rwlock.Lock()
defer rm.rwlock.Unlock()
-
+ rs.Ref()
rm.clients[rs] = routine
}
@@ -131,7 +131,7 @@ func (rm *RoutineManager) Handler(rs goetty.IOSession, msg interface{}, received
payload := packet.Payload
for uint32(length) == MaxPayloadSize {
var err error
- msg, err = protocol.tcpConn.Read()
+ msg, err = protocol.tcpConn.Read(goetty.ReadOptions{})
if err != nil {
return errors.New("read msg error")
}
diff --git a/pkg/frontend/server.go b/pkg/frontend/server.go
index ef69bf34af9087599610fc65d565655f73ee788f..9368afe67e8334fd29fd2271aa4279de9c6dba9e 100644
--- a/pkg/frontend/server.go
+++ b/pkg/frontend/server.go
@@ -18,7 +18,7 @@ import (
"fmt"
"sync/atomic"
- "github.com/fagongzi/goetty"
+ "github.com/fagongzi/goetty/v2"
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/logutil"
)
@@ -59,14 +59,15 @@ func nextConnectionID() uint32 {
}
func NewMOServer(addr string, pu *config.ParameterUnit) *MOServer {
- encoder, decoder := NewSqlCodec()
+ codec := NewSqlCodec()
rm := NewRoutineManager(pu)
// TODO asyncFlushBatch
- app, err := goetty.NewTCPApplication(addr, rm.Handler,
+ app, err := goetty.NewApplication(addr, rm.Handler,
+ goetty.WithAppLogger(logutil.GetGlobalLogger()),
goetty.WithAppSessionOptions(
- goetty.WithCodec(encoder, decoder),
- goetty.WithLogger(logutil.GetGlobalLogger()),
- goetty.WithBufSize(1024*1024, 1024*1024)),
+ goetty.WithSessionCodec(codec),
+ goetty.WithSessionLogger(logutil.GetGlobalLogger()),
+ goetty.WithSessionRWBUfferSize(1024*1024, 1024*1024)),
goetty.WithAppSessionAware(rm))
if err != nil {
logutil.Panicf("start server failed with %+v", err)
diff --git a/pkg/frontend/session_test.go b/pkg/frontend/session_test.go
index decd879baa4e1ce49ff399a1b6c19ce2549ecdcf..724ef21e125806234d23a0530d91d07e63e9e963 100644
--- a/pkg/frontend/session_test.go
+++ b/pkg/frontend/session_test.go
@@ -16,14 +16,15 @@ package frontend
import (
"errors"
- "github.com/fagongzi/goetty/buf"
+ "testing"
+
+ "github.com/fagongzi/goetty/v2/buf"
"github.com/golang/mock/gomock"
"github.com/matrixorigin/matrixone/pkg/config"
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/moengine"
"github.com/smartystreets/goconvey/convey"
- "testing"
)
func TestTxnHandler_NewTxn(t *testing.T) {
@@ -126,7 +127,7 @@ func TestSession_TxnBegin(t *testing.T) {
genSession := func(ctrl *gomock.Controller, gSysVars *GlobalSystemVariables) *Session {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
proto := NewMysqlClientProtocol(0, ioses, 1024, nil)
return NewSession(proto, nil, nil, nil, gSysVars)
}
@@ -162,7 +163,7 @@ func TestVariables(t *testing.T) {
genSession := func(ctrl *gomock.Controller, gSysVars *GlobalSystemVariables) *Session {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
proto := NewMysqlClientProtocol(0, ioses, 1024, nil)
return NewSession(proto, nil, nil, nil, gSysVars)
}
@@ -429,7 +430,7 @@ func TestSession_TxnCompilerContext(t *testing.T) {
genSession := func(ctrl *gomock.Controller, gSysVars *GlobalSystemVariables) *Session {
ioses := mock_frontend.NewMockIOSession(ctrl)
ioses.EXPECT().OutBuf().Return(buf.NewByteBuf(1024)).AnyTimes()
- ioses.EXPECT().WriteAndFlush(gomock.Any()).Return(nil).AnyTimes()
+ ioses.EXPECT().Write(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
proto := NewMysqlClientProtocol(0, ioses, 1024, nil)
return NewSession(proto, nil, nil, nil, gSysVars)
}
diff --git a/pkg/frontend/test/iosession_mock.go b/pkg/frontend/test/iosession_mock.go
index 1fbf5abb62eaa83837e26c848791ea19ac491baf..59e57a145423a8f315024f906c1cb4c6192bf34e 100644
--- a/pkg/frontend/test/iosession_mock.go
+++ b/pkg/frontend/test/iosession_mock.go
@@ -1,7 +1,7 @@
// Code generated by MockGen. DO NOT EDIT.
-// Source: session.go
+// Source: ../../../vendor/github.com/fagongzi/goetty/v2/session.go
-// Package mock_goetty is a generated GoMock package.
+// Package mock_frontend is a generated GoMock package.
package mock_frontend
import (
@@ -9,8 +9,9 @@ import (
reflect "reflect"
time "time"
- buf "github.com/fagongzi/goetty/buf"
+ buf "github.com/fagongzi/goetty/v2/buf"
gomock "github.com/golang/mock/gomock"
+ goetty "github.com/fagongzi/goetty/v2"
)
// MockIOSession is a mock of IOSession interface.
@@ -51,12 +52,11 @@ func (mr *MockIOSessionMockRecorder) Close() *gomock.Call {
}
// Connect mocks base method.
-func (m *MockIOSession) Connect(addr string, timeout time.Duration) (bool, error) {
+func (m *MockIOSession) Connect(addr string, timeout time.Duration) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Connect", addr, timeout)
- ret0, _ := ret[0].(bool)
- ret1, _ := ret[1].(error)
- return ret0, ret1
+ ret0, _ := ret[0].(error)
+ return ret0
}
// Connect indicates an expected call of Connect.
@@ -79,32 +79,32 @@ func (mr *MockIOSessionMockRecorder) Connected() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connected", reflect.TypeOf((*MockIOSession)(nil).Connected))
}
-// Flush mocks base method.
-func (m *MockIOSession) Flush() error {
+// Disconnect mocks base method.
+func (m *MockIOSession) Disconnect() error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "Flush")
+ ret := m.ctrl.Call(m, "Disconnect")
ret0, _ := ret[0].(error)
return ret0
}
-// Flush indicates an expected call of Flush.
-func (mr *MockIOSessionMockRecorder) Flush() *gomock.Call {
+// Disconnect indicates an expected call of Disconnect.
+func (mr *MockIOSessionMockRecorder) Disconnect() *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockIOSession)(nil).Flush))
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockIOSession)(nil).Disconnect))
}
-// GetAttr mocks base method.
-func (m *MockIOSession) GetAttr(key string) interface{} {
+// Flush mocks base method.
+func (m *MockIOSession) Flush(timeout time.Duration) error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "GetAttr", key)
- ret0, _ := ret[0].(interface{})
+ ret := m.ctrl.Call(m, "Flush", timeout)
+ ret0, _ := ret[0].(error)
return ret0
}
-// GetAttr indicates an expected call of GetAttr.
-func (mr *MockIOSessionMockRecorder) GetAttr(key interface{}) *gomock.Call {
+// Flush indicates an expected call of Flush.
+func (mr *MockIOSessionMockRecorder) Flush(timeout interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAttr", reflect.TypeOf((*MockIOSession)(nil).GetAttr), key)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockIOSession)(nil).Flush), timeout)
}
// ID mocks base method.
@@ -121,20 +121,6 @@ func (mr *MockIOSessionMockRecorder) ID() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockIOSession)(nil).ID))
}
-// InBuf mocks base method.
-func (m *MockIOSession) InBuf() *buf.ByteBuf {
- m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "InBuf")
- ret0, _ := ret[0].(*buf.ByteBuf)
- return ret0
-}
-
-// InBuf indicates an expected call of InBuf.
-func (mr *MockIOSessionMockRecorder) InBuf() *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InBuf", reflect.TypeOf((*MockIOSession)(nil).InBuf))
-}
-
// OutBuf mocks base method.
func (m *MockIOSession) OutBuf() *buf.ByteBuf {
m.ctrl.T.Helper()
@@ -150,12 +136,11 @@ func (mr *MockIOSessionMockRecorder) OutBuf() *gomock.Call {
}
// RawConn mocks base method.
-func (m *MockIOSession) RawConn() (net.Conn, error) {
+func (m *MockIOSession) RawConn() net.Conn {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RawConn")
ret0, _ := ret[0].(net.Conn)
- ret1, _ := ret[1].(error)
- return ret0, ret1
+ return ret0
}
// RawConn indicates an expected call of RawConn.
@@ -165,84 +150,68 @@ func (mr *MockIOSessionMockRecorder) RawConn() *gomock.Call {
}
// Read mocks base method.
-func (m *MockIOSession) Read() (interface{}, error) {
+func (m *MockIOSession) Read(option goetty.ReadOptions) (any, error) {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "Read")
- ret0, _ := ret[0].(interface{})
+ ret := m.ctrl.Call(m, "Read", option)
+ ret0, _ := ret[0].(any)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Read indicates an expected call of Read.
-func (mr *MockIOSessionMockRecorder) Read() *gomock.Call {
+func (mr *MockIOSessionMockRecorder) Read(option interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockIOSession)(nil).Read))
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockIOSession)(nil).Read), option)
}
-// RemoteAddr mocks base method.
-func (m *MockIOSession) RemoteAddr() string {
+// Ref mocks base method.
+func (m *MockIOSession) Ref() {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "RemoteAddr")
- ret0, _ := ret[0].(string)
- return ret0
+ m.ctrl.Call(m, "Ref")
}
-// RemoteAddr indicates an expected call of RemoteAddr.
-func (mr *MockIOSessionMockRecorder) RemoteAddr() *gomock.Call {
+// Ref indicates an expected call of Ref.
+func (mr *MockIOSessionMockRecorder) Ref() *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoteAddr", reflect.TypeOf((*MockIOSession)(nil).RemoteAddr))
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ref", reflect.TypeOf((*MockIOSession)(nil).Ref))
}
-// RemoteIP mocks base method.
-func (m *MockIOSession) RemoteIP() string {
+// RemoteAddress mocks base method.
+func (m *MockIOSession) RemoteAddress() string {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "RemoteIP")
+ ret := m.ctrl.Call(m, "RemoteAddress")
ret0, _ := ret[0].(string)
return ret0
}
-// RemoteIP indicates an expected call of RemoteIP.
-func (mr *MockIOSessionMockRecorder) RemoteIP() *gomock.Call {
+// RemoteAddress indicates an expected call of RemoteAddress.
+func (mr *MockIOSessionMockRecorder) RemoteAddress() *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoteIP", reflect.TypeOf((*MockIOSession)(nil).RemoteIP))
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoteAddress", reflect.TypeOf((*MockIOSession)(nil).RemoteAddress))
}
-// SetAttr mocks base method.
-func (m *MockIOSession) SetAttr(key string, value interface{}) {
+// UseConn mocks base method.
+func (m *MockIOSession) UseConn(arg0 net.Conn) {
m.ctrl.T.Helper()
- m.ctrl.Call(m, "SetAttr", key, value)
+ m.ctrl.Call(m, "UseConn", arg0)
}
-// SetAttr indicates an expected call of SetAttr.
-func (mr *MockIOSessionMockRecorder) SetAttr(key, value interface{}) *gomock.Call {
+// UseConn indicates an expected call of UseConn.
+func (mr *MockIOSessionMockRecorder) UseConn(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAttr", reflect.TypeOf((*MockIOSession)(nil).SetAttr), key, value)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseConn", reflect.TypeOf((*MockIOSession)(nil).UseConn), arg0)
}
// Write mocks base method.
-func (m *MockIOSession) Write(msg interface{}) error {
+func (m *MockIOSession) Write(msg any, options goetty.WriteOptions) error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "Write", msg)
+ ret := m.ctrl.Call(m, "Write", msg, options)
ret0, _ := ret[0].(error)
return ret0
}
// Write indicates an expected call of Write.
-func (mr *MockIOSessionMockRecorder) Write(msg interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockIOSession)(nil).Write), msg)
-}
-
-// WriteAndFlush mocks base method.
-func (m *MockIOSession) WriteAndFlush(msg interface{}) error {
- m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "WriteAndFlush", msg)
- ret0, _ := ret[0].(error)
- return ret0
-}
-
-// WriteAndFlush indicates an expected call of WriteAndFlush.
-func (mr *MockIOSessionMockRecorder) WriteAndFlush(msg interface{}) *gomock.Call {
+func (mr *MockIOSessionMockRecorder) Write(msg, options interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteAndFlush", reflect.TypeOf((*MockIOSession)(nil).WriteAndFlush), msg)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockIOSession)(nil).Write), msg, options)
}