Skip to content
Snippets Groups Projects
Commit 58e7e86d authored by cvictory's avatar cvictory
Browse files

fix review issue: add comment and some optimization

parent d48fd1e3
No related branches found
No related tags found
No related merge requests found
Showing with 142 additions and 94 deletions
......@@ -17,12 +17,9 @@
package config
import (
"github.com/creasty/defaults"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/creasty/defaults"
)
// ApplicationConfig ...
......@@ -40,7 +37,7 @@ func (*ApplicationConfig) Prefix() string {
return constant.DUBBO + ".application."
}
// Id ...
// ID ...
func (c *ApplicationConfig) Id() string {
return ""
}
......
......@@ -42,10 +42,11 @@ const (
func init() {
codec := &DubboCodec{}
// this is for registry dubboCodec of dubbo protocol
remoting.NewCodec("dubbo", codec)
}
// DubboPackage ...
// DubboPackage. this is for hessian encode/decode. If we refactor hessian, it will also be refactored.
type DubboPackage struct {
Header hessian.DubboHeader
Service hessian.Service
......@@ -53,6 +54,7 @@ type DubboPackage struct {
Err error
}
// String of DubboPackage
func (p DubboPackage) String() string {
return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
}
......@@ -103,9 +105,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) err
return perrors.WithStack(err)
}
// DubboCodec. It is implements remoting.Codec
type DubboCodec struct {
}
// encode request for transport
func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
if request.Event {
return c.encodeHeartbeartReqeust(request)
......@@ -127,12 +131,13 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000"))
if err != nil {
panic(err)
// it will be wrapped in readwrite.Write .
return nil, err
}
p.Service.Timeout = time.Duration(timeout)
p.Header.SerialID = byte(S_Dubbo)
p.Header.ID = request.Id
p.Header.ID = request.ID
if request.TwoWay {
p.Header.Type = hessian.PackageRequest_TwoWay
} else {
......@@ -150,10 +155,12 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
return bytes.NewBuffer(pkg), nil
}
// encode heartbeart request
func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
pkg := &DubboPackage{}
pkg.Body = []interface{}{}
pkg.Header.ID = request.Id
pkg.Header.ID = request.ID
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
......@@ -166,6 +173,8 @@ func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.
return bytes.NewBuffer(byt), nil
}
// encode response
func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
var ptype = hessian.PackageResponse
if response.IsHeartbeat() {
......@@ -175,7 +184,7 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
Header: hessian.DubboHeader{
SerialID: response.SerialID,
Type: ptype,
ID: response.Id,
ID: response.ID,
ResponseStatus: response.Status,
},
}
......@@ -196,19 +205,21 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
return bytes.NewBuffer(pkg), nil
}
// Decode data, including request and response.
func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
if c.isRequest(data) {
req, len, err := c.decodeRequest(data)
if err != nil {
return remoting.DecodeResult{}, len, err
return remoting.DecodeResult{}, len, perrors.WithStack(err)
}
return remoting.DecodeResult{IsRequest: true, Result: req}, len, err
return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
} else {
resp, len, err := c.decodeResponse(data)
if err != nil {
return remoting.DecodeResult{}, len, err
return remoting.DecodeResult{}, len, perrors.WithStack(err)
}
return remoting.DecodeResult{IsRequest: false, Result: resp}, len, err
return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
}
}
func (c *DubboCodec) isRequest(data []byte) bool {
......@@ -218,6 +229,7 @@ func (c *DubboCodec) isRequest(data []byte) bool {
return true
}
// decode request
func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
pkg := &DubboPackage{
Body: make([]interface{}, 7),
......@@ -236,7 +248,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
return request, 0, perrors.WithStack(err)
}
request = &remoting.Request{
Id: pkg.Header.ID,
ID: pkg.Header.ID,
SerialID: pkg.Header.SerialID,
TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
......@@ -282,6 +294,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
// decode response
func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error) {
pkg := &DubboPackage{}
buf := bytes.NewBuffer(data)
......@@ -289,6 +302,7 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error
err := pkg.Unmarshal(buf, response)
if err != nil {
originErr := perrors.Cause(err)
// if the data is very big, so the receive need much times.
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return nil, 0, originErr
}
......@@ -297,7 +311,7 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error
return nil, 0, perrors.WithStack(err)
}
response = &remoting.Response{
Id: pkg.Header.ID,
ID: pkg.Header.ID,
//Version: pkg.Header.,
SerialID: pkg.Header.SerialID,
Status: pkg.Header.ResponseStatus,
......
......@@ -48,17 +48,19 @@ var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
)
// DubboInvoker ...
// DubboInvoker. It is implement of protocol.Invoker. One dubboInvoker refer to one service and ip.
type DubboInvoker struct {
protocol.BaseInvoker
// the exchange layer, it is focus on network communication.
client *remoting.ExchangeClient
quitOnce sync.Once
timeout time.Duration
// timeout for service(interface) level.
timeout time.Duration
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}
// NewDubboInvoker ...
// NewDubboInvoker constructor
func NewDubboInvoker(url common.URL, client *remoting.ExchangeClient) *DubboInvoker {
requestTimeout := config.GetConsumerConfig().RequestTimeout
......@@ -143,6 +145,7 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
return t
}
}
// set timeout into invocation at method level
invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}
......
......@@ -70,7 +70,7 @@ func TestDubboInvoker_Invoke(t *testing.T) {
r := response.(remoting.AsyncCallbackResponse)
rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{Id: "1", Name: "username"}, *(rst.Rest.(*User)))
//assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
//assert.Equal(t, User{ID: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock()
})
res = invoker.Invoke(context.Background(), inv)
......
......@@ -41,6 +41,8 @@ const (
)
var (
// Make the connection can be shared.
// It will create one connection for one address (ip+port)
exchangeClientMap *sync.Map = new(sync.Map)
)
......
......@@ -19,12 +19,10 @@ package invocation
import (
"bytes"
"github.com/apache/dubbo-go/common/constant"
"reflect"
"sync"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)
......@@ -145,12 +143,12 @@ func (r *RPCInvocation) SetCallBack(c interface{}) {
func (r *RPCInvocation) ServiceKey() string {
intf := r.AttachmentsByKey(constant.INTERFACE_KEY, "")
if intf == "" {
if len(intf) == 0 {
return ""
}
buf := &bytes.Buffer{}
group := r.AttachmentsByKey(constant.GROUP_KEY, "")
if group != "" {
if len(group) != 0 {
buf.WriteString(group)
buf.WriteString("/")
}
......@@ -158,7 +156,7 @@ func (r *RPCInvocation) ServiceKey() string {
buf.WriteString(intf)
version := r.AttachmentsByKey(constant.VERSION_KEY, "")
if version != "" && version != "0.0.0" {
if len(version) != 0 && version != "0.0.0" {
buf.WriteString(":")
buf.WriteString(version)
}
......
......@@ -22,19 +22,15 @@ import (
"strings"
"testing"
"time"
)
import (
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
type (
......@@ -71,7 +67,7 @@ func TestHTTPClient_Call(t *testing.T) {
// call GetUser
ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser",
})
......@@ -85,7 +81,7 @@ func TestHTTPClient_Call(t *testing.T) {
// call GetUser0
ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser0",
})
......@@ -98,7 +94,7 @@ func TestHTTPClient_Call(t *testing.T) {
// call GetUser1
ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser1",
})
......@@ -110,7 +106,7 @@ func TestHTTPClient_Call(t *testing.T) {
// call GetUser2
ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser2",
})
......@@ -122,7 +118,7 @@ func TestHTTPClient_Call(t *testing.T) {
// call GetUser3
ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser3",
})
......@@ -134,7 +130,7 @@ func TestHTTPClient_Call(t *testing.T) {
// call GetUser4
ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser4",
})
......@@ -145,7 +141,7 @@ func TestHTTPClient_Call(t *testing.T) {
assert.Equal(t, &User{Id: "", Name: ""}, reply)
ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": "GetUser4",
})
......
......@@ -19,13 +19,12 @@ package jsonrpc
import (
"context"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -54,7 +53,7 @@ func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invoca
url := ji.GetUrl()
req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments())
ctxNew := context.WithValue(ctx, constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Proxy-ID": "dubbogo",
"X-Services": url.Path,
"X-Method": inv.MethodName(),
})
......
......@@ -24,39 +24,46 @@ import (
)
var (
// generate request ID for global use
sequence atomic.Uint64
)
func init() {
// init request ID
sequence.Store(0)
}
func SequenceId() uint64 {
// increse 2 for every request.
return sequence.Add(2)
}
// Request ...
type Request struct {
Id int64
Version string
ID int64
// protocol version
Version string
// serial ID
SerialID byte
Data interface{}
TwoWay bool
Event bool
broken bool
// Data
Data interface{}
TwoWay bool
Event bool
// it is used to judge the request is unbroken
// broken bool
}
// NewRequest ...
// NewRequest
func NewRequest(version string) *Request {
return &Request{
Id: int64(SequenceId()),
ID: int64(SequenceId()),
Version: version,
}
}
// Response ...
type Response struct {
Id int64
ID int64
Version string
SerialID byte
Status uint8
......@@ -65,14 +72,15 @@ type Response struct {
Result interface{}
}
// NewResponse ...
// NewResponse
func NewResponse(id int64, version string) *Response {
return &Response{
Id: id,
ID: id,
Version: version,
}
}
// the response is heartbeat
func (response *Response) IsHeartbeat() bool {
return response.Event && response.Result == nil
}
......@@ -92,6 +100,7 @@ type AsyncCallbackResponse struct {
Reply interface{}
}
// the client sends requst to server, there is one pendingResponse at client side to wait the response from server
type PendingResponse struct {
seq int64
Err error
......
......@@ -26,29 +26,39 @@ import (
)
var (
// store requestID and response
pendingResponses *sync.Map = new(sync.Map)
)
type SequenceType int64
type ExchangeClient struct {
ConnectTimeout time.Duration
address string
client Client
}
// It is interface of client for network communication.
// If you use getty as network communication, you should define GettyClient that implements this interface.
type Client interface {
SetExchangeClient(client *ExchangeClient)
// responseHandler is used to deal with msg
SetResponseHandler(responseHandler ResponseHandler)
// connect url
Connect(url common.URL) error
// close
Close()
// send request to server.
Request(request *Request, timeout time.Duration, response *PendingResponse) error
}
// This is abstraction level. it is like facade.
type ExchangeClient struct {
ConnectTimeout time.Duration
address string
client Client
}
// handle the message from server
type ResponseHandler interface {
Handler(response *Response)
}
// create ExchangeClient
func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient {
exchangeClient := &ExchangeClient{
ConnectTimeout: connectTimeout,
......@@ -67,6 +77,7 @@ func NewExchangeClient(url common.URL, client Client, connectTimeout time.Durati
return exchangeClient
}
// two way request
func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
result *protocol.RPCResult) error {
request := NewRequest("2.0.2")
......@@ -74,8 +85,8 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo
request.Event = false
request.TwoWay = true
rsp := NewPendingResponse(request.Id)
rsp.response = NewResponse(request.Id, "2.0.2")
rsp := NewPendingResponse(request.ID)
rsp.response = NewResponse(request.ID, "2.0.2")
rsp.Reply = (*invocation).Reply()
AddPendingResponse(rsp)
......@@ -88,6 +99,7 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo
return nil
}
// async two way request
func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
callback common.AsyncCallback, result *protocol.RPCResult) error {
request := NewRequest("2.0.2")
......@@ -95,8 +107,8 @@ func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url
request.Event = false
request.TwoWay = true
rsp := NewPendingResponse(request.Id)
rsp.response = NewResponse(request.Id, "2.0.2")
rsp := NewPendingResponse(request.ID)
rsp.response = NewResponse(request.ID, "2.0.2")
rsp.Callback = callback
rsp.Reply = (*invocation).Reply()
AddPendingResponse(rsp)
......@@ -110,15 +122,15 @@ func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url
return nil
}
// oneway
// oneway request
func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time.Duration) error {
request := NewRequest("2.0.2")
request.Data = invocation
request.Event = false
request.TwoWay = false
rsp := NewPendingResponse(request.Id)
rsp.response = NewResponse(request.Id, "2.0.2")
rsp := NewPendingResponse(request.ID)
rsp.response = NewResponse(request.ID, "2.0.2")
err := client.client.Request(request, timeout, rsp)
if err != nil {
......@@ -127,13 +139,15 @@ func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time
return nil
}
// close client
func (client *ExchangeClient) Close() {
client.client.Close()
}
// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {
pendingResponse := removePendingResponse(SequenceType(response.Id))
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
......@@ -149,10 +163,12 @@ func (client *ExchangeClient) Handler(response *Response) {
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
......@@ -164,6 +180,7 @@ func removePendingResponse(seq SequenceType) *PendingResponse {
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
......
......@@ -20,16 +20,21 @@ import (
"github.com/apache/dubbo-go/common"
)
// It is interface of server for network communication.
// If you use getty as network communication, you should define GettyServer that implements this interface.
type Server interface {
//invoke once for connection
Start()
//it is for destroy
Stop()
}
// This is abstraction level. it is like facade.
type ExchangeServer struct {
Server Server
}
// Create ExchangeServer
func NewExchangeServer(url common.URL, server Server) *ExchangeServer {
exchangServer := &ExchangeServer{
Server: server,
......@@ -37,10 +42,12 @@ func NewExchangeServer(url common.URL, server Server) *ExchangeServer {
return exchangServer
}
// start server
func (server *ExchangeServer) Start() {
server.Server.Start()
}
// stop server
func (server *ExchangeServer) Stop() {
server.Server.Stop()
}
......@@ -16,7 +16,8 @@
*/
package getty
// copy from dubbo/dubbo_codec.go
// copy from dubbo/dubbo_codec.go .
// it is used to unit test.
import (
"bufio"
"bytes"
......@@ -129,7 +130,8 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer
timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000"))
if err != nil {
panic(err)
// it will be wrapped in readwrite.Write .
return nil, err
}
p.Service.Timeout = time.Duration(timeout)
//var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
......@@ -140,7 +142,7 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer
//}
p.Header.SerialID = byte(S_Dubbo)
p.Header.ID = request.Id
p.Header.ID = request.ID
if request.TwoWay {
p.Header.Type = hessian.PackageRequest_TwoWay
} else {
......@@ -161,7 +163,7 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer
func (c *DubboTestCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
pkg := &DubboPackage{}
pkg.Body = []interface{}{}
pkg.Header.ID = request.Id
pkg.Header.ID = request.ID
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
......@@ -183,7 +185,7 @@ func (c *DubboTestCodec) EncodeResponse(response *remoting.Response) (*bytes.Buf
Header: hessian.DubboHeader{
SerialID: response.SerialID,
Type: ptype,
ID: response.Id,
ID: response.ID,
ResponseStatus: response.Status,
},
}
......@@ -249,7 +251,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err
return request, 0, perrors.WithStack(err)
}
request = &remoting.Request{
Id: pkg.Header.ID,
ID: pkg.Header.ID,
SerialID: pkg.Header.SerialID,
TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
......@@ -330,7 +332,7 @@ func (c *DubboTestCodec) decodeResponse(data []byte) (*remoting.Response, int, e
return response, 0, perrors.WithStack(err)
}
response = &remoting.Response{
Id: pkg.Header.ID,
ID: pkg.Header.ID,
//Version: pkg.Header.,
SerialID: pkg.Header.SerialID,
Status: pkg.Header.ResponseStatus,
......
......@@ -44,6 +44,7 @@ var (
clientGrpool *gxsync.TaskPool
)
// it is init client for single protocol.
func initClient(protocol string) {
if protocol == "" {
return
......@@ -84,7 +85,7 @@ func initClient(protocol string) {
rand.Seed(time.Now().UnixNano())
}
// SetClientConf ...
// SetClientConf: config ClientConf
func SetClientConf(c ClientConfig) {
clientConf = &c
err := clientConf.CheckValidity()
......@@ -102,13 +103,14 @@ func setClientGrpool() {
}
}
// Options ...
// Options : param config
type Options struct {
// connect timeout
// remove request timeout, it will be calulate for every request
ConnectTimeout time.Duration
}
// Client ...
// Client : some configuration for network communication.
type Client struct {
addr string
opts Options
......@@ -119,7 +121,7 @@ type Client struct {
ExchangeClient *remoting.ExchangeClient
}
// NewClient ...
// create client
func NewClient(opt Options) *Client {
switch {
case opt.ConnectTimeout == 0:
......@@ -153,6 +155,7 @@ func (c *Client) Connect(url common.URL) error {
return err
}
// close network connection
func (c *Client) Close() {
if c.pool != nil {
c.pool.close()
......@@ -160,6 +163,7 @@ func (c *Client) Close() {
c.pool = nil
}
// send request
func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error {
var (
......@@ -212,7 +216,7 @@ func (c *Client) heartbeat(session getty.Session) error {
req := remoting.NewRequest("2.0.2")
req.TwoWay = true
req.Event = true
resp := remoting.NewPendingResponse(req.Id)
resp := remoting.NewPendingResponse(req.ID)
remoting.AddPendingResponse(resp)
return c.transfer(session, req, 3*time.Second)
}
......
......@@ -123,7 +123,7 @@ func testGetBigPkg(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 8*time.Second, pendingResponse)
......@@ -145,7 +145,7 @@ func testGetUser(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 3*time.Second, pendingResponse)
......@@ -167,8 +167,8 @@ func testGetUser0(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
rsp := remoting.NewPendingResponse(request.Id)
rsp.SetResponse(remoting.NewResponse(request.Id, "2.0.2"))
rsp := remoting.NewPendingResponse(request.ID)
rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2"))
remoting.AddPendingResponse(rsp)
rsp.Reply = user
err = c.Request(request, 3*time.Second, rsp)
......@@ -187,7 +187,7 @@ func testGetUser1(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
user := &User{}
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
......@@ -206,7 +206,7 @@ func testGetUser2(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 3*time.Second, pendingResponse)
assert.EqualError(t, err, "error")
......@@ -223,7 +223,7 @@ func testGetUser3(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
user2 := []interface{}{}
pendingResponse.Reply = &user2
remoting.AddPendingResponse(pendingResponse)
......@@ -242,7 +242,7 @@ func testGetUser4(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
user2 := []interface{}{}
pendingResponse.Reply = &user2
remoting.AddPendingResponse(pendingResponse)
......@@ -262,7 +262,7 @@ func testGetUser5(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
user3 := map[interface{}]interface{}{}
pendingResponse.Reply = &user3
remoting.AddPendingResponse(pendingResponse)
......@@ -285,7 +285,7 @@ func testGetUser6(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 3*time.Second, pendingResponse)
......@@ -306,7 +306,7 @@ func testGetUser61(t *testing.T, c *Client) {
request.Data = invocation
request.Event = false
request.TwoWay = true
pendingResponse := remoting.NewPendingResponse(request.Id)
pendingResponse := remoting.NewPendingResponse(request.ID)
pendingResponse.Reply = user
remoting.AddPendingResponse(pendingResponse)
err = c.Request(request, 3*time.Second, pendingResponse)
......@@ -325,8 +325,8 @@ func testClient_AsyncCall(t *testing.T, svr *Server, url common.URL, client *Cli
request.Data = invocation
request.Event = false
request.TwoWay = true
rsp := remoting.NewPendingResponse(request.Id)
rsp.SetResponse(remoting.NewResponse(request.Id, "2.0.2"))
rsp := remoting.NewPendingResponse(request.ID)
rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2"))
remoting.AddPendingResponse(rsp)
rsp.Reply = user
rsp.Callback = func(response common.CallbackResponse) {
......
......@@ -101,7 +101,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
req := result.Result.(*remoting.Request)
if req.Event {
logger.Debugf("get rpc heartbeat request{%#v}", req)
resp := remoting.NewResponse(req.Id, req.Version)
resp := remoting.NewResponse(req.ID, req.Version)
resp.Status = hessian.Response_OK
resp.Event = req.Event
resp.SerialID = req.SerialID
......@@ -230,7 +230,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
req := decodeResult.Result.(*remoting.Request)
resp := remoting.NewResponse(req.Id, req.Version)
resp := remoting.NewResponse(req.ID, req.Version)
resp.Status = hessian.Response_OK
resp.Event = req.Event
resp.SerialID = req.SerialID
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment