From 498453c7c2d5556734d3dce43d7351eadb06f804 Mon Sep 17 00:00:00 2001 From: Joe Zou <yixian.zou@gmail.com> Date: Wed, 4 Dec 2019 17:02:05 +0800 Subject: [PATCH] modify async callback --- common/rpc_service.go | 21 ++++++--------------- config/service.go | 2 +- protocol/dubbo/client.go | 17 ++++++++++------- protocol/dubbo/client_test.go | 5 +++-- protocol/dubbo/codec.go | 6 +++--- protocol/dubbo/dubbo_invoker.go | 2 +- protocol/dubbo/dubbo_invoker_test.go | 5 +++-- 7 files changed, 27 insertions(+), 31 deletions(-) diff --git a/common/rpc_service.go b/common/rpc_service.go index a642ea0ae..4c9f083dd 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -22,7 +22,6 @@ import ( "reflect" "strings" "sync" - "time" "unicode" "unicode/utf8" ) @@ -40,25 +39,17 @@ type RPCService interface { Reference() string // rpc service id or reference id } -// callback interface for async +//AsyncCallbackService callback interface for async type AsyncCallbackService interface { - CallBack(response CallResponse) // callback + CallBack(response CallbackResponse) // callback } -type Options struct { - // connect timeout - ConnectTimeout time.Duration - // request timeout - RequestTimeout time.Duration +//CallbackResponse for different protocol +type CallbackResponse interface { } -type CallResponse struct { - Opts Options - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} -} +//AsyncCallback async callback method +type AsyncCallback func(response CallbackResponse) // for lowercase func // func MethodMapper() map[string][string] { diff --git a/config/service.go b/config/service.go index ec21bac88..f1b51790c 100644 --- a/config/service.go +++ b/config/service.go @@ -44,7 +44,7 @@ func GetProviderService(name string) common.RPCService { return proServices[name] } -func GetCallback(name string) func(common.CallResponse) { +func GetCallback(name string) func(response common.CallbackResponse) { service := GetConsumerService(name) if sv, ok := service.(common.AsyncCallbackService); ok { return sv.CallBack diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 1e00f699b..1365838f3 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -107,19 +107,22 @@ func setClientGrpool() { } type Options struct { - common.Options // connect timeout ConnectTimeout time.Duration // request timeout RequestTimeout time.Duration } -type CallResponse struct { - common.CallResponse +//AsyncCallbackResponse async response for dubbo +type AsyncCallbackResponse struct { + common.CallbackResponse + Opts Options + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} } -type AsyncCallback func(response common.CallResponse) - type Client struct { opts Options conf ClientConfig @@ -196,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error { return perrors.WithStack(c.call(ct, request, response, nil)) } -func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error { +func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error { return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) } -func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error { +func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error { p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index 90dbf9c5f..3f8a8ee98 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) { + r := response.(AsyncCallbackResponse) + assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }, NewResponse(user, nil)) assert.NoError(t, err) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 1874b94c4..758363117 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -110,7 +110,7 @@ type PendingResponse struct { err error start time.Time readStart time.Time - callback AsyncCallback + callback common.AsyncCallback response *Response done chan struct{} } @@ -123,8 +123,8 @@ func NewPendingResponse() *PendingResponse { } } -func (r PendingResponse) GetCallResponse() common.CallResponse { - return common.CallResponse{ +func (r PendingResponse) GetCallResponse() common.CallbackResponse { + return AsyncCallbackResponse{ Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index df27da4e8..da1212610 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } response := NewResponse(inv.Reply(), nil) if async { - if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok { + if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index d2553884d..e22903c1f 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -67,8 +67,9 @@ func TestDubboInvoker_Invoke(t *testing.T) { // AsyncCall lock := sync.Mutex{} lock.Lock() - inv.SetCallBack(func(response common.CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) + inv.SetCallBack(func(response common.CallbackResponse) { + r := response.(AsyncCallbackResponse) + assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(inv) -- GitLab