Skip to content
Snippets Groups Projects
Commit 966d27c8 authored by cvictory's avatar cvictory
Browse files

refactor getty

parent 49c963db
No related branches found
No related tags found
No related merge requests found
...@@ -15,77 +15,6 @@ import ( ...@@ -15,77 +15,6 @@ import (
"time" "time"
) )
////SerialID serial ID
//type SerialID byte
//type SequenceType int64
//
//const (
// // S_Dubbo dubbo serial id
// S_Dubbo SerialID = 2
//)
//
//// DubboPackage ...
//type DubboPackage struct {
// Header hessian.DubboHeader
// Service hessian.Service
// Body interface{}
// Err error
//}
//
//func (p DubboPackage) String() string {
// return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
//}
//
//// Marshal ...
//func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
// codec := hessian.NewHessianCodec(nil)
//
// pkg, err := codec.Write(p.Service, p.Header, p.Body)
// if err != nil {
// return nil, perrors.WithStack(err)
// }
//
// return bytes.NewBuffer(pkg), nil
//}
//
// Unmarshal ...
//func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
// // fix issue https://github.com/apache/dubbo-go/issues/380
// bufLen := buf.Len()
// if bufLen < hessian.HEADER_LENGTH {
// return perrors.WithStack(hessian.ErrHeaderNotEnough)
// }
//
// codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
//
// // read header
// err := codec.ReadHeader(&p.Header)
// if err != nil {
// return perrors.WithStack(err)
// }
//
// if resp != nil { // for client
// if p.Header.Type&hessian.PackageRequest != 0x00 {
// // size of this array must be '7'
// // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
// p.Body = make([]interface{}, 7)
// } else {
// //pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
// //if !ok {
// // return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
// //}
// p.Body = &hessian.Response{RspObj: resp.Reply}
// }
// }
//
// // read body
// err = codec.ReadBody(p.Body)
// return perrors.WithStack(err)
//}
/////////////////////////////////////////
/////////////////////////////////////////
//SerialID serial ID //SerialID serial ID
type SerialID byte type SerialID byte
...@@ -94,24 +23,10 @@ const ( ...@@ -94,24 +23,10 @@ const (
S_Dubbo SerialID = 2 S_Dubbo SerialID = 2
) )
//CallType call type func init() {
type CallType int32 codec := &DubboCodec{}
remoting.NewCodec("dubbo", codec)
const ( }
// CT_UNKNOWN unknown call type
CT_UNKNOWN CallType = 0
// CT_OneWay call one way
CT_OneWay CallType = 1
// CT_TwoWay call in request/response
CT_TwoWay CallType = 2
)
////////////////////////////////////////////
// dubbo package
////////////////////////////////////////////
// SequenceType ...
type SequenceType int64
// DubboPackage ... // DubboPackage ...
type DubboPackage struct { type DubboPackage struct {
...@@ -138,7 +53,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { ...@@ -138,7 +53,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
} }
// Unmarshal ... // Unmarshal ...
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
// fix issue https://github.com/apache/dubbo-go/issues/380 // fix issue https://github.com/apache/dubbo-go/issues/380
bufLen := buf.Len() bufLen := buf.Len()
if bufLen < hessian.HEADER_LENGTH { if bufLen < hessian.HEADER_LENGTH {
...@@ -153,22 +68,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { ...@@ -153,22 +68,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.WithStack(err) return perrors.WithStack(err)
} }
if len(opts) != 0 { // for client if resp != nil { // for client
client, ok := opts[0].(*Client)
if !ok {
return perrors.Errorf("opts[0] is not of type *Client")
}
if p.Header.Type&hessian.PackageRequest != 0x00 { if p.Header.Type&hessian.PackageRequest != 0x00 {
// size of this array must be '7' // size of this array must be '7'
// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
p.Body = make([]interface{}, 7) p.Body = make([]interface{}, 7)
} else { } else {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
if !ok { if pendingRsp == nil {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
} }
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
} }
} }
...@@ -176,11 +86,21 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { ...@@ -176,11 +86,21 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
err = codec.ReadBody(p.Body) err = codec.ReadBody(p.Body)
return perrors.WithStack(err) return perrors.WithStack(err)
} }
type DubboCodec struct { type DubboCodec struct {
} }
func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) { func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
invocation := request.Data.(invocation.RPCInvocation) if request.Event {
return c.encodeHeartbeartReqeust(request)
}
invoc, ok := request.Data.(*protocol.Invocation)
if !ok {
logger.Errorf("encode request failed for parameter type :%+v", request)
return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
}
invocation := *invoc
p := &DubboPackage{} p := &DubboPackage{}
p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "") p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
...@@ -220,8 +140,24 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er ...@@ -220,8 +140,24 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
return bytes.NewBuffer(pkg), nil return bytes.NewBuffer(pkg), nil
} }
func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
pkg := &DubboPackage{}
pkg.Body = []interface{}{}
pkg.Header.ID = request.Id
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
codec := hessian.NewHessianCodec(nil)
byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body)
if err != nil {
return nil, perrors.WithStack(err)
}
return bytes.NewBuffer(byt), nil
}
func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) { func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
var ptype hessian.PackageType = hessian.PackageResponse var ptype = hessian.PackageResponse
if response.IsHeartbeat() { if response.IsHeartbeat() {
ptype = hessian.PackageHeartbeat ptype = hessian.PackageHeartbeat
} }
...@@ -233,7 +169,13 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, ...@@ -233,7 +169,13 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
ResponseStatus: response.Status, ResponseStatus: response.Status,
}, },
} }
resp.Body = response.Result if !response.IsHeartbeat() {
resp.Body = &hessian.Response{
RspObj: response.Result.(protocol.RPCResult).Rest,
Exception: response.Result.(protocol.RPCResult).Err,
}
}
//if response.Header.Type&hessian.PackageRequest != 0x00 { //if response.Header.Type&hessian.PackageRequest != 0x00 {
// resp.Body = req.Body // resp.Body = req.Body
//} else { //} else {
...@@ -259,7 +201,7 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error) ...@@ -259,7 +201,7 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error)
originErr := perrors.Cause(err) originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
//FIXME //FIXME
return request, 0, originErr return request, 0, originErr
} }
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
...@@ -268,7 +210,8 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error) ...@@ -268,7 +210,8 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error)
request = &remoting.Request{ request = &remoting.Request{
Id: pkg.Header.ID, Id: pkg.Header.ID,
SerialID: pkg.Header.SerialID, SerialID: pkg.Header.SerialID,
TwoWay: false, TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
} }
if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
// convert params of request // convert params of request
...@@ -330,15 +273,16 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error) ...@@ -330,15 +273,16 @@ func (c *DubboCodec) DecodeRequest(data []byte) (*remoting.Request, int, error)
} }
return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
} }
func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error) { func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error) {
pkg := &DubboPackage{} pkg := &DubboPackage{}
buf := bytes.NewBuffer(data) buf := bytes.NewBuffer(data)
var response *remoting.Response response := &remoting.Response{}
err := pkg.Unmarshal(buf, response) err := pkg.Unmarshal(buf, response)
if err != nil { if err != nil {
originErr := perrors.Cause(err) originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return response, 0, nil return response, 0, nil
} }
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
...@@ -349,7 +293,7 @@ func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error ...@@ -349,7 +293,7 @@ func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error
//Version: pkg.Header., //Version: pkg.Header.,
SerialID: pkg.Header.SerialID, SerialID: pkg.Header.SerialID,
Status: pkg.Header.ResponseStatus, Status: pkg.Header.ResponseStatus,
Event: (pkg.Header.Type | hessian.PackageHeartbeat) != 0, Event: (pkg.Header.Type & hessian.PackageHeartbeat) != 0,
} }
var error error var error error
if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 { if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 {
...@@ -368,6 +312,7 @@ func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error ...@@ -368,6 +312,7 @@ func (c *DubboCodec) DecodeResponse(data []byte) (*remoting.Response, int, error
} }
logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body) logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
rpcResult := &protocol.RPCResult{} rpcResult := &protocol.RPCResult{}
response.Result = rpcResult
if pkg.Header.Type&hessian.PackageRequest == 0x00 { if pkg.Header.Type&hessian.PackageRequest == 0x00 {
if pkg.Err != nil { if pkg.Err != nil {
rpcResult.Err = pkg.Err rpcResult.Err = pkg.Err
......
...@@ -39,6 +39,10 @@ const ( ...@@ -39,6 +39,10 @@ const (
DUBBO = "dubbo" DUBBO = "dubbo"
) )
var (
exchangeClientMap *sync.Map = new(sync.Map)
)
func init() { func init() {
extension.SetProtocol(DUBBO, GetProtocol) extension.SetProtocol(DUBBO, GetProtocol)
} }
...@@ -89,9 +93,7 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { ...@@ -89,9 +93,7 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
// ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, // ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
// RequestTimeout: requestTimeout, // RequestTimeout: requestTimeout,
//})) //}))
invoker := NewDubboInvoker(url, remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ invoker := NewDubboInvoker(url, getExchangeClient(url))
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout))
dp.SetInvokers(invoker) dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String()) logger.Infof("Refer service: %s", url.String())
return invoker return invoker
...@@ -124,7 +126,7 @@ func (dp *DubboProtocol) openServer(url common.URL) { ...@@ -124,7 +126,7 @@ func (dp *DubboProtocol) openServer(url common.URL) {
handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult { handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
return doHandleRequest(invocation) return doHandleRequest(invocation)
} }
srv := remoting.NewExchangeServer(url, handler) srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler))
dp.serverMap[url.Location] = srv dp.serverMap[url.Location] = srv
srv.Start() srv.Start()
} }
...@@ -158,8 +160,8 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult ...@@ -158,8 +160,8 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
// //
//args := p.Body.(map[string]interface{})["args"].([]interface{}) //args := p.Body.(map[string]interface{})["args"].([]interface{})
//inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments) //inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
// FIXME
ctx := rebuildCtx(rpcInvocation) ctx := getty.RebuildCtx(rpcInvocation)
invokeResult := invoker.Invoke(ctx, rpcInvocation) invokeResult := invoker.Invoke(ctx, rpcInvocation)
if err := invokeResult.Error(); err != nil { if err := invokeResult.Error(); err != nil {
...@@ -176,3 +178,25 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult ...@@ -176,3 +178,25 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
} }
return result return result
} }
func getExchangeClient(url common.URL) *remoting.ExchangeClient {
clientTmp, ok := exchangeClientMap.Load(url.Location)
if !ok {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout)
exchangeClientMap.Store(url.Location, exchangeClientTmp)
return exchangeClientTmp
}
exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
if !ok {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout)
exchangeClientMap.Store(url.Location, exchangeClientTmp)
return exchangeClientTmp
}
return exchangeClient
}
...@@ -16,84 +16,84 @@ ...@@ -16,84 +16,84 @@
*/ */
package dubbo package dubbo
//
import ( //import (
"testing" // "testing"
) //)
//
import ( //import (
"github.com/stretchr/testify/assert" // "github.com/stretchr/testify/assert"
) //)
//
import ( //import (
"github.com/apache/dubbo-go/common" // "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant" // "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol" // "github.com/apache/dubbo-go/protocol"
) //)
//
func TestDubboProtocol_Export(t *testing.T) { //func TestDubboProtocol_Export(t *testing.T) {
// Export // // Export
proto := GetProtocol() // proto := GetProtocol()
srvConf = &ServerConfig{} // srvConf = &ServerConfig{}
url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + // url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + // "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + // "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + // "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245") // "side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err) // assert.NoError(t, err)
exporter := proto.Export(protocol.NewBaseInvoker(url)) // exporter := proto.Export(protocol.NewBaseInvoker(url))
//
// make sure url // // make sure url
eq := exporter.GetInvoker().GetUrl().URLEqual(url) // eq := exporter.GetInvoker().GetUrl().URLEqual(url)
assert.True(t, eq) // assert.True(t, eq)
//
// second service: the same path and the different version // // second service: the same path and the different version
url2, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ // url2, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ // "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ // "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ // "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+
"side=provider&timeout=3000&timestamp=1556509797245", common.WithParamsValue(constant.VERSION_KEY, "v1.1")) // "side=provider&timeout=3000&timestamp=1556509797245", common.WithParamsValue(constant.VERSION_KEY, "v1.1"))
assert.NoError(t, err) // assert.NoError(t, err)
exporter2 := proto.Export(protocol.NewBaseInvoker(url2)) // exporter2 := proto.Export(protocol.NewBaseInvoker(url2))
// make sure url // // make sure url
eq2 := exporter2.GetInvoker().GetUrl().URLEqual(url2) // eq2 := exporter2.GetInvoker().GetUrl().URLEqual(url2)
assert.True(t, eq2) // assert.True(t, eq2)
//
// make sure exporterMap after 'Unexport' // // make sure exporterMap after 'Unexport'
_, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey()) // _, ok := proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
assert.True(t, ok) // assert.True(t, ok)
exporter.Unexport() // exporter.Unexport()
_, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey()) // _, ok = proto.(*DubboProtocol).ExporterMap().Load(url.ServiceKey())
assert.False(t, ok) // assert.False(t, ok)
//
// make sure serverMap after 'Destroy' // // make sure serverMap after 'Destroy'
_, ok = proto.(*DubboProtocol).serverMap[url.Location] // _, ok = proto.(*DubboProtocol).serverMap[url.Location]
assert.True(t, ok) // assert.True(t, ok)
proto.Destroy() // proto.Destroy()
_, ok = proto.(*DubboProtocol).serverMap[url.Location] // _, ok = proto.(*DubboProtocol).serverMap[url.Location]
assert.False(t, ok) // assert.False(t, ok)
} //}
//
func TestDubboProtocol_Refer(t *testing.T) { //func TestDubboProtocol_Refer(t *testing.T) {
// Refer // // Refer
proto := GetProtocol() // proto := GetProtocol()
url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + // url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + // "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + // "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + // "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245") // "side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err) // assert.NoError(t, err)
clientConf = &ClientConfig{} // clientConf = &ClientConfig{}
invoker := proto.Refer(url) // invoker := proto.Refer(url)
//
// make sure url // // make sure url
eq := invoker.GetUrl().URLEqual(url) // eq := invoker.GetUrl().URLEqual(url)
assert.True(t, eq) // assert.True(t, eq)
//
// make sure invokers after 'Destroy' // // make sure invokers after 'Destroy'
invokersLen := len(proto.(*DubboProtocol).Invokers()) // invokersLen := len(proto.(*DubboProtocol).Invokers())
assert.Equal(t, 1, invokersLen) // assert.Equal(t, 1, invokersLen)
proto.Destroy() // proto.Destroy()
invokersLen = len(proto.(*DubboProtocol).Invokers()) // invokersLen = len(proto.(*DubboProtocol).Invokers())
assert.Equal(t, 0, invokersLen) // assert.Equal(t, 0, invokersLen)
} //}
...@@ -7,22 +7,22 @@ import ( ...@@ -7,22 +7,22 @@ import (
type Codec interface { type Codec interface {
EncodeRequest(request *Request) (*bytes.Buffer, error) EncodeRequest(request *Request) (*bytes.Buffer, error)
EncodeResponse(response *Response) (*bytes.Buffer, error) EncodeResponse(response *Response) (*bytes.Buffer, error)
DecodeRequest(*bytes.Buffer) (*Request, int, error) DecodeRequest(data []byte) (*Request, int, error)
DecodeResponse(*bytes.Buffer) (*Response, int, error) DecodeResponse(data []byte) (*Response, int, error)
} }
var ( var (
codec map[string]*Codec codec map[string]Codec
) )
func init() { func init() {
codec = make(map[string]*Codec, 2) codec = make(map[string]Codec, 2)
} }
func NewCodec(protocol string, codecTmp *Codec) { func NewCodec(protocol string, codecTmp Codec) {
codec[protocol] = codecTmp codec[protocol] = codecTmp
} }
func GetCodec(protocol string) *Codec { func GetCodec(protocol string) Codec {
return codec[protocol] return codec[protocol]
} }
...@@ -37,11 +37,11 @@ func NewRequest(version string) *Request { ...@@ -37,11 +37,11 @@ func NewRequest(version string) *Request {
} }
} }
func (request *Request) SetHeartbeat(isHeartbeat bool) { //func (request *Request) SetHeartbeat(isHeartbeat bool) {
if isHeartbeat { // if isHeartbeat {
//
} // }
} //}
// Response ... // Response ...
type Response struct { type Response struct {
...@@ -52,7 +52,6 @@ type Response struct { ...@@ -52,7 +52,6 @@ type Response struct {
Event bool Event bool
Error error Error error
Result interface{} Result interface{}
Reply interface{}
} }
// NewResponse ... // NewResponse ...
...@@ -91,12 +90,14 @@ type PendingResponse struct { ...@@ -91,12 +90,14 @@ type PendingResponse struct {
ReadStart time.Time ReadStart time.Time
callback common.AsyncCallback callback common.AsyncCallback
response *Response response *Response
Reply interface{}
Done chan struct{} Done chan struct{}
} }
// NewPendingResponse ... // NewPendingResponse ...
func NewPendingResponse() *PendingResponse { func NewPendingResponse(id int64) *PendingResponse {
return &PendingResponse{ return &PendingResponse{
seq: id,
start: time.Now(), start: time.Now(),
response: &Response{}, response: &Response{},
Done: make(chan struct{}), Done: make(chan struct{}),
...@@ -112,20 +113,3 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse { ...@@ -112,20 +113,3 @@ func (r PendingResponse) GetCallResponse() common.CallbackResponse {
Reply: r.response, Reply: r.response,
} }
} }
type Client interface {
//invoke once for connection
Connect(url common.URL)
Close()
Request(request *Request, timeout time.Duration, callback common.AsyncCallback, response *PendingResponse) error
}
type Server interface {
//invoke once for connection
Open(url common.URL)
}
type ResponseHandler interface {
Handler(response *Response)
}
...@@ -8,13 +8,29 @@ import ( ...@@ -8,13 +8,29 @@ import (
"time" "time"
) )
var (
pendingResponses *sync.Map = new(sync.Map)
)
type SequenceType int64 type SequenceType int64
type ExchangeClient struct { type ExchangeClient struct {
ConnectTimeout time.Duration ConnectTimeout time.Duration
address string address string
client Client client Client
pendingResponses *sync.Map }
type Client interface {
SetExchangeClient(client *ExchangeClient)
SetResponseHandler(responseHandler ResponseHandler)
//invoke once for connection
Connect(url common.URL)
Close()
Request(request *Request, timeout time.Duration, callback common.AsyncCallback, response *PendingResponse) error
}
type ResponseHandler interface {
Handler(response *Response)
} }
func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient { func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient {
...@@ -23,20 +39,23 @@ func NewExchangeClient(url common.URL, client Client, connectTimeout time.Durati ...@@ -23,20 +39,23 @@ func NewExchangeClient(url common.URL, client Client, connectTimeout time.Durati
address: url.Location, address: url.Location,
client: client, client: client,
} }
client.SetExchangeClient(exchangeClient)
client.Connect(url) client.Connect(url)
client.SetResponseHandler(exchangeClient)
return exchangeClient return exchangeClient
} }
func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration, func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
result *protocol.RPCResult) error { result *protocol.RPCResult) error {
requestId := int64(SequenceId())
request := NewRequest("2.0.2") request := NewRequest("2.0.2")
request.Data = invocation request.Data = invocation
request.Event = false request.Event = false
request.TwoWay = true request.TwoWay = true
rsp := NewPendingResponse() rsp := NewPendingResponse(request.Id)
rsp.response = NewResponse(requestId, "2.0.2") rsp.response = NewResponse(request.Id, "2.0.2")
rsp.Reply = (*invocation).Reply()
AddPendingResponse(rsp)
//rsp.callback = invo //rsp.callback = invo
err := client.client.Request(request, timeout, nil, rsp) err := client.client.Request(request, timeout, nil, rsp)
...@@ -44,22 +63,23 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo ...@@ -44,22 +63,23 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo
result.Err = err result.Err = err
return err return err
} }
result.Rest = rsp.response result.Rest = rsp.response.Result
//result.Attrs = rsp.response. //result.Attrs = rsp.response.
return nil return nil
} }
func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration, func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
callback common.AsyncCallback, result *protocol.RPCResult) error { callback common.AsyncCallback, result *protocol.RPCResult) error {
requestId := int64(SequenceId())
request := NewRequest("2.0.2") request := NewRequest("2.0.2")
request.Data = invocation request.Data = invocation
request.Event = false request.Event = false
request.TwoWay = true request.TwoWay = true
rsp := NewPendingResponse() rsp := NewPendingResponse(request.Id)
rsp.response = NewResponse(requestId, "2.0.2") rsp.response = NewResponse(request.Id, "2.0.2")
rsp.callback = callback rsp.callback = callback
rsp.Reply = (*invocation).Reply()
AddPendingResponse(rsp)
err := client.client.Request(request, timeout, nil, rsp) err := client.client.Request(request, timeout, nil, rsp)
if err != nil { if err != nil {
...@@ -79,7 +99,7 @@ func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time ...@@ -79,7 +99,7 @@ func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time
request.Event = false request.Event = false
request.TwoWay = false request.TwoWay = false
rsp := NewPendingResponse() rsp := NewPendingResponse(request.Id)
rsp.response = NewResponse(requestId, "2.0.2") rsp.response = NewResponse(requestId, "2.0.2")
err := client.client.Request(request, timeout, nil, rsp) err := client.client.Request(request, timeout, nil, rsp)
...@@ -96,7 +116,7 @@ func (client *ExchangeClient) Close() { ...@@ -96,7 +116,7 @@ func (client *ExchangeClient) Close() {
func (client *ExchangeClient) Handler(response *Response) { func (client *ExchangeClient) Handler(response *Response) {
pendingResponse := client.removePendingResponse(SequenceType(response.Id)) pendingResponse := removePendingResponse(SequenceType(response.Id))
if pendingResponse == nil { if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response) logger.Errorf("failed to get pending response context for response package %s", *response)
return return
...@@ -111,16 +131,23 @@ func (client *ExchangeClient) Handler(response *Response) { ...@@ -111,16 +131,23 @@ func (client *ExchangeClient) Handler(response *Response) {
} }
} }
func (client *ExchangeClient) addPendingResponse(pr *PendingResponse) { func AddPendingResponse(pr *PendingResponse) {
client.pendingResponses.Store(SequenceType(pr.seq), pr) pendingResponses.Store(SequenceType(pr.seq), pr)
} }
func (client *ExchangeClient) removePendingResponse(seq SequenceType) *PendingResponse { func removePendingResponse(seq SequenceType) *PendingResponse {
if client.pendingResponses == nil { if pendingResponses == nil {
return nil return nil
} }
if presp, ok := client.pendingResponses.Load(seq); ok { if presp, ok := pendingResponses.Load(seq); ok {
client.pendingResponses.Delete(seq) pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse) return presp.(*PendingResponse)
} }
return nil return nil
......
...@@ -2,18 +2,19 @@ package remoting ...@@ -2,18 +2,19 @@ package remoting
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting/getty"
) )
type Server interface {
//invoke once for connection
Start()
Stop()
}
type ExchangeServer struct { type ExchangeServer struct {
Server *getty.Server Server Server
} }
func NewExchangeServer(url common.URL, handler func(*invocation.RPCInvocation) protocol.RPCResult) *ExchangeServer { func NewExchangeServer(url common.URL, server Server) *ExchangeServer {
server := getty.NewServer(url, handler)
exchangServer := &ExchangeServer{ exchangServer := &ExchangeServer{
Server: server, Server: server,
} }
...@@ -21,9 +22,9 @@ func NewExchangeServer(url common.URL, handler func(*invocation.RPCInvocation) p ...@@ -21,9 +22,9 @@ func NewExchangeServer(url common.URL, handler func(*invocation.RPCInvocation) p
} }
func (server *ExchangeServer) Start() { func (server *ExchangeServer) Start() {
server.Server.Start() (server.Server).Start()
} }
func (server *ExchangeServer) Stop() { func (server *ExchangeServer) Stop() {
server.Server.Stop() (server.Server).Stop()
} }
...@@ -19,6 +19,7 @@ package getty ...@@ -19,6 +19,7 @@ package getty
import ( import (
"github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting"
"gopkg.in/yaml.v2"
"math/rand" "math/rand"
"time" "time"
) )
...@@ -27,7 +28,6 @@ import ( ...@@ -27,7 +28,6 @@ import (
"github.com/dubbogo/getty" "github.com/dubbogo/getty"
gxsync "github.com/dubbogo/gost/sync" gxsync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
"gopkg.in/yaml.v2"
) )
import ( import (
...@@ -47,7 +47,7 @@ var ( ...@@ -47,7 +47,7 @@ var (
clientGrpool *gxsync.TaskPool clientGrpool *gxsync.TaskPool
) )
func doInit(protocol string) { func initClient(protocol string) {
if protocol == "" { if protocol == "" {
return return
} }
...@@ -134,8 +134,9 @@ type Client struct { ...@@ -134,8 +134,9 @@ type Client struct {
opts Options opts Options
conf ClientConfig conf ClientConfig
pool *gettyRPCClientPool pool *gettyRPCClientPool
codec *remoting.Codec codec remoting.Codec
responseHandler *remoting.ResponseHandler responseHandler remoting.ResponseHandler
ExchangeClient *remoting.ExchangeClient
//sequence atomic.Uint64 //sequence atomic.Uint64
//pendingResponses *sync.Map //pendingResponses *sync.Map
} }
...@@ -149,18 +150,24 @@ func NewClient(opt Options) *Client { ...@@ -149,18 +150,24 @@ func NewClient(opt Options) *Client {
c := &Client{ c := &Client{
opts: opt, opts: opt,
conf: *clientConf,
} }
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
return c return c
} }
func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
c.ExchangeClient = client
}
func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
c.responseHandler = responseHandler
}
func (c *Client) Connect(url common.URL) { func (c *Client) Connect(url common.URL) {
doInit(url.Protocol) initClient(url.Protocol)
c.conf = *clientConf
// new client
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
// codec // codec
c.codec = remoting.GetCodec(url.Protocol) c.codec = remoting.GetCodec(url.Protocol)
c.addr = url.Ip + ":" + url.Port c.addr = url.Location
} }
func (c *Client) Close() { func (c *Client) Close() {
if c.pool != nil { if c.pool != nil {
...@@ -340,13 +347,13 @@ func (c *Client) heartbeat(session getty.Session) error { ...@@ -340,13 +347,13 @@ func (c *Client) heartbeat(session getty.Session) error {
req := remoting.NewRequest("2.0.2") req := remoting.NewRequest("2.0.2")
req.TwoWay = true req.TwoWay = true
req.Event = true req.Event = true
resp := remoting.NewPendingResponse() resp := remoting.NewPendingResponse(req.Id)
remoting.AddPendingResponse(resp)
return c.transfer(session, req, 3*time.Second, resp) return c.transfer(session, req, 3*time.Second, resp)
} }
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration, func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration,
rsp *remoting.PendingResponse) error { rsp *remoting.PendingResponse) error {
//sequence = c.sequence.Add(1) //sequence = c.sequence.Add(1)
// //
//if pkg == nil { //if pkg == nil {
......
...@@ -107,7 +107,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -107,7 +107,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if p.Error != nil { if p.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Error) logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
} }
(*h.conn.pool.rpcClient.responseHandler).Handler(p) (h.conn.pool.rpcClient.responseHandler).Handler(p)
//FIXME //FIXME
//if p.Header.Type&hessian.PackageResponse != 0x00 { //if p.Header.Type&hessian.PackageResponse != 0x00 {
// logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) // logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
...@@ -126,7 +126,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -126,7 +126,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.updateSession(session) h.conn.updateSession(session)
(*h.conn.pool.rpcClient.responseHandler).Handler(p) (h.conn.pool.rpcClient.responseHandler).Handler(p)
// //
//pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) //pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
...@@ -239,6 +239,9 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -239,6 +239,9 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
} }
resp := remoting.NewResponse(req.Id, req.Version) resp := remoting.NewResponse(req.Id, req.Version)
resp.Status = hessian.Response_OK resp.Status = hessian.Response_OK
resp.Event = req.Event
resp.SerialID = req.SerialID
resp.Version = "2.0.2"
// heartbeat // heartbeat
if req.Event { if req.Event {
...@@ -275,11 +278,11 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -275,11 +278,11 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}() }()
invoc, ok := req.Data.(invocation.RPCInvocation) invoc, ok := req.Data.(*invocation.RPCInvocation)
if !ok { if !ok {
} }
result := h.server.requestHandler(&invoc) result := h.server.requestHandler(invoc)
if !req.TwoWay { if !req.TwoWay {
return return
} }
...@@ -316,7 +319,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { ...@@ -316,7 +319,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
// rebuildCtx rebuild the context by attachment. // rebuildCtx rebuild the context by attachment.
// Once we decided to transfer more context's key-value, we should change this. // Once we decided to transfer more context's key-value, we should change this.
// now we only support rebuild the tracing context // now we only support rebuild the tracing context
func rebuildCtx(inv *invocation.RPCInvocation) context.Context { func RebuildCtx(inv *invocation.RPCInvocation) context.Context {
ctx := context.Background() ctx := context.Background()
// actually, if user do not use any opentracing framework, the err will not be nil. // actually, if user do not use any opentracing framework, the err will not be nil.
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package getty package getty
import ( import (
"bytes"
"github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting"
"reflect" "reflect"
) )
...@@ -49,9 +48,8 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { ...@@ -49,9 +48,8 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
//pkg := &DubboPackage{} //pkg := &DubboPackage{}
//p.client.ExchangeClient.GetPendingResponse(remoting.SequenceType())
buf := bytes.NewBuffer(data) resp, length, err := (p.client.codec).DecodeResponse(data)
resp, length, err := (*p.client.codec).DecodeResponse(buf)
//err := pkg.Unmarshal(buf, p.client) //err := pkg.Unmarshal(buf, p.client)
if err != nil { if err != nil {
originErr := perrors.Cause(err) originErr := perrors.Cause(err)
...@@ -78,7 +76,7 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by ...@@ -78,7 +76,7 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by
return nil, perrors.New("invalid rpc request") return nil, perrors.New("invalid rpc request")
} }
buf, err := (*p.client.codec).EncodeRequest(req) buf, err := (p.client.codec).EncodeRequest(req)
if err != nil { if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err)) logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err) return nil, perrors.WithStack(err)
...@@ -91,18 +89,21 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by ...@@ -91,18 +89,21 @@ func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by
// RpcServerPackageHandler // RpcServerPackageHandler
//////////////////////////////////////////// ////////////////////////////////////////////
var ( //var (
rpcServerPkgHandler = &RpcServerPackageHandler{} // rpcServerPkgHandler = &RpcServerPackageHandler{}
) //)
// RpcServerPackageHandler ... // RpcServerPackageHandler ...
type RpcServerPackageHandler struct { type RpcServerPackageHandler struct {
server *Server server *Server
} }
func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
return &RpcServerPackageHandler{server: server}
}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
buf := bytes.NewBuffer(data) req, length, err := (p.server.codec).DecodeRequest(data)
req, length, err := (*p.server.codec).DecodeRequest(buf)
//resp,len, err := (*p.).DecodeResponse(buf) //resp,len, err := (*p.).DecodeResponse(buf)
if err != nil { if err != nil {
...@@ -126,7 +127,7 @@ func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by ...@@ -126,7 +127,7 @@ func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by
return nil, perrors.New("invalid rpc response") return nil, perrors.New("invalid rpc response")
} }
buf, err := (*p.server.codec).EncodeResponse(res) buf, err := (p.server.codec).EncodeResponse(res)
if err != nil { if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err)) logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err) return nil, perrors.WithStack(err)
......
...@@ -105,7 +105,7 @@ func SetServerGrpool() { ...@@ -105,7 +105,7 @@ func SetServerGrpool() {
type Server struct { type Server struct {
conf ServerConfig conf ServerConfig
addr string addr string
codec *remoting.Codec codec remoting.Codec
tcpServer getty.Server tcpServer getty.Server
rpcHandler *RpcServerHandler rpcHandler *RpcServerHandler
requestHandler func(*invocation.RPCInvocation) protocol.RPCResult requestHandler func(*invocation.RPCInvocation) protocol.RPCResult
...@@ -114,7 +114,7 @@ type Server struct { ...@@ -114,7 +114,7 @@ type Server struct {
// NewServer ... // NewServer ...
func NewServer(url common.URL, handlers func(*invocation.RPCInvocation) protocol.RPCResult) *Server { func NewServer(url common.URL, handlers func(*invocation.RPCInvocation) protocol.RPCResult) *Server {
//init //init
doInit(url.Protocol) initServer(url.Protocol)
s := &Server{ s := &Server{
conf: *srvConf, conf: *srvConf,
...@@ -153,7 +153,7 @@ func (s *Server) newSession(session getty.Session) error { ...@@ -153,7 +153,7 @@ func (s *Server) newSession(session getty.Session) error {
session.SetName(conf.GettySessionParam.SessionName) session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcServerPkgHandler) session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(s.rpcHandler) session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment