Skip to content
Snippets Groups Projects
Commit 5713270a authored by 邹毅贤's avatar 邹毅贤
Browse files

modify async callback

parent 85abcfd7
No related branches found
No related tags found
No related merge requests found
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"time"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
) )
...@@ -40,25 +39,17 @@ type RPCService interface { ...@@ -40,25 +39,17 @@ type RPCService interface {
Reference() string // rpc service id or reference id Reference() string // rpc service id or reference id
} }
// callback interface for async //AsyncCallbackService callback interface for async
type AsyncCallbackService interface { type AsyncCallbackService interface {
CallBack(response CallResponse) // callback CallBack(response CallbackResponse) // callback
} }
type Options struct { //CallbackResponse for different protocol
// connect timeout type CallbackResponse interface {
ConnectTimeout time.Duration
// request timeout
RequestTimeout time.Duration
} }
type CallResponse struct { //AsyncCallback async callback method
Opts Options type AsyncCallback func(response CallbackResponse)
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
}
// for lowercase func // for lowercase func
// func MethodMapper() map[string][string] { // func MethodMapper() map[string][string] {
......
...@@ -44,7 +44,7 @@ func GetProviderService(name string) common.RPCService { ...@@ -44,7 +44,7 @@ func GetProviderService(name string) common.RPCService {
return proServices[name] return proServices[name]
} }
func GetCallback(name string) func(common.CallResponse) { func GetCallback(name string) func(response common.CallbackResponse) {
service := GetConsumerService(name) service := GetConsumerService(name)
if sv, ok := service.(common.AsyncCallbackService); ok { if sv, ok := service.(common.AsyncCallbackService); ok {
return sv.CallBack return sv.CallBack
......
...@@ -107,19 +107,22 @@ func setClientGrpool() { ...@@ -107,19 +107,22 @@ func setClientGrpool() {
} }
type Options struct { type Options struct {
common.Options
// connect timeout // connect timeout
ConnectTimeout time.Duration ConnectTimeout time.Duration
// request timeout // request timeout
RequestTimeout time.Duration RequestTimeout time.Duration
} }
type CallResponse struct { //AsyncCallbackResponse async response for dubbo
common.CallResponse type AsyncCallbackResponse struct {
common.CallbackResponse
Opts Options
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
} }
type AsyncCallback func(response common.CallResponse)
type Client struct { type Client struct {
opts Options opts Options
conf ClientConfig conf ClientConfig
...@@ -196,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error { ...@@ -196,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error {
return perrors.WithStack(c.call(ct, request, response, nil)) return perrors.WithStack(c.call(ct, request, response, nil))
} }
func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error { func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error {
return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) return perrors.WithStack(c.call(CT_TwoWay, request, response, callback))
} }
func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error { func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {
p := &DubboPackage{} p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
......
...@@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) { ...@@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) {
user := &User{} user := &User{}
lock := sync.Mutex{} lock := sync.Mutex{}
lock.Lock() lock.Lock()
err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallResponse) { err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock() lock.Unlock()
}, NewResponse(user, nil)) }, NewResponse(user, nil))
assert.NoError(t, err) assert.NoError(t, err)
......
...@@ -110,7 +110,7 @@ type PendingResponse struct { ...@@ -110,7 +110,7 @@ type PendingResponse struct {
err error err error
start time.Time start time.Time
readStart time.Time readStart time.Time
callback AsyncCallback callback common.AsyncCallback
response *Response response *Response
done chan struct{} done chan struct{}
} }
...@@ -123,8 +123,8 @@ func NewPendingResponse() *PendingResponse { ...@@ -123,8 +123,8 @@ func NewPendingResponse() *PendingResponse {
} }
} }
func (r PendingResponse) GetCallResponse() common.CallResponse { func (r PendingResponse) GetCallResponse() common.CallbackResponse {
return common.CallResponse{ return AsyncCallbackResponse{
Cause: r.err, Cause: r.err,
Start: r.start, Start: r.start,
ReadStart: r.readStart, ReadStart: r.readStart,
......
...@@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { ...@@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
} }
response := NewResponse(inv.Reply(), nil) response := NewResponse(inv.Reply(), nil)
if async { if async {
if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok { if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
} else { } else {
result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
......
...@@ -67,8 +67,9 @@ func TestDubboInvoker_Invoke(t *testing.T) { ...@@ -67,8 +67,9 @@ func TestDubboInvoker_Invoke(t *testing.T) {
// AsyncCall // AsyncCall
lock := sync.Mutex{} lock := sync.Mutex{}
lock.Lock() lock.Lock()
inv.SetCallBack(func(response common.CallResponse) { inv.SetCallBack(func(response common.CallbackResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock() lock.Unlock()
}) })
res = invoker.Invoke(inv) res = invoker.Invoke(inv)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment