diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 2567e0ee09cf7fa5aef7fde46872eb88205d8e45..cd6ef3d1db126db02e304cbf7c579bf6cde3014f 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -23,7 +23,7 @@ import ( ) type ProxyFactory interface { - GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy GetInvoker(url common.URL) protocol.Invoker } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index bafba60b400ec59d99e2d68ecf4d067c906ba6fb..bf87e6380dccbee810dda135f79b02b64172ee5d 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -54,11 +54,11 @@ type DefaultProxyFactory struct { func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } -func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - return proxy.NewProxy(invoker, nil, attachments) + return proxy.NewProxy(invoker, callBack, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go index b6a6b675baf992b2d64ffd19291ee2dc009bd1e3..cba7757e4bb51447be28f1d531f2a6fa1aa33a9e 100644 --- a/common/proxy/proxy_factory/default_test.go +++ b/common/proxy/proxy_factory/default_test.go @@ -33,7 +33,7 @@ import ( func Test_GetProxy(t *testing.T) { proxyFactory := NewDefaultProxyFactory() url := common.NewURLWithOptions() - proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), nil, url) assert.NotNil(t, proxy) } diff --git a/common/rpc_service.go b/common/rpc_service.go index 4741a6fa3c0daef97f044f639a5e64a38fe4a187..a642ea0ae9d8a46041d4d9dbc3e389b8e8823292 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -22,6 +22,7 @@ import ( "reflect" "strings" "sync" + "time" "unicode" "unicode/utf8" ) @@ -39,6 +40,26 @@ type RPCService interface { Reference() string // rpc service id or reference id } +// callback interface for async +type AsyncCallbackService interface { + CallBack(response CallResponse) // callback +} + +type Options struct { + // connect timeout + ConnectTimeout time.Duration + // request timeout + RequestTimeout time.Duration +} + +type CallResponse struct { + Opts Options + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} +} + // for lowercase func // func MethodMapper() map[string][string] { // return map[string][string]{} diff --git a/config/reference_config.go b/config/reference_config.go index 8703c459bab306f98beb1668a1f9438126586f24..1e469d037ceb91d8270fa2c8f60cc3431a04fba6 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -55,7 +55,7 @@ type ReferenceConfig struct { Group string `yaml:"group" json:"group,omitempty" property:"group"` Version string `yaml:"version" json:"version,omitempty" property:"version"` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - async bool `yaml:"async" json:"async,omitempty" property:"async"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL @@ -140,8 +140,10 @@ func (refconfig *ReferenceConfig) Refer() { } } + callback := GetCallback(refconfig.id) + //create proxy - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, callback, url) } // @v is service provider implemented RPCService @@ -169,7 +171,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) //getty invoke async or sync - urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) + urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) //application info urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index a81dbf06cef7d275cf6af4a7f651ff8d1600a3c9..2cb9793c1815889fde2333a682ade75278866579 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -93,6 +93,7 @@ func doInitConsumer() { Retries: "3", Group: "huadong_idc", Version: "1.0.0", + Async: true, Methods: []*MethodConfig{ { Name: "GetUser", diff --git a/config/service.go b/config/service.go index 2bceac4a8c20bb598dc2607c90c8206e4a448808..ec21bac883caac02ef46582cf958586b0a418432 100644 --- a/config/service.go +++ b/config/service.go @@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService { func GetProviderService(name string) common.RPCService { return proServices[name] } + +func GetCallback(name string) func(common.CallResponse) { + service := GetConsumerService(name) + if sv, ok := service.(common.AsyncCallbackService); ok { + return sv.CallBack + } + return nil +} diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index ba74d86c0c38ba02ec5e87423e0fe8990dae486b..1e00f699b2b59b2bdef861d6ce5811a2879851f7 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -107,6 +107,7 @@ func setClientGrpool() { } type Options struct { + common.Options // connect timeout ConnectTimeout time.Duration // request timeout @@ -114,14 +115,10 @@ type Options struct { } type CallResponse struct { - Opts Options - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} + common.CallResponse } -type AsyncCallback func(response CallResponse) +type AsyncCallback func(response common.CallResponse) type Client struct { opts Options diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index a878ffd91e29d6949870ec25fed9481f301b435a..1874b94c473d68751dd00a5748608254969b8599 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -26,6 +26,7 @@ import ( import ( "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go/common" perrors "github.com/pkg/errors" ) @@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse { } } -func (r PendingResponse) GetCallResponse() CallResponse { - return CallResponse{ +func (r PendingResponse) GetCallResponse() common.CallResponse { + return common.CallResponse{ Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index bc321a97a4271c147d9317145d9f1aa76ca27902..df27da4e88215b32a53e1cc53743f8de544f2d50 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } response := NewResponse(inv.Reply(), nil) if async { - if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { + if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))