Skip to content
Snippets Groups Projects
Commit 4a49f8c5 authored by 邹毅贤's avatar 邹毅贤
Browse files

add async callback

parent 23a25a7d
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,7 @@ import (
)
type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy
GetInvoker(url common.URL) protocol.Invoker
}
......
......@@ -54,11 +54,11 @@ type DefaultProxyFactory struct {
func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory {
return &DefaultProxyFactory{}
}
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy {
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy {
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
return proxy.NewProxy(invoker, nil, attachments)
return proxy.NewProxy(invoker, callBack, attachments)
}
func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
return &ProxyInvoker{
......
......@@ -33,7 +33,7 @@ import (
func Test_GetProxy(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions()
proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url)
proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), nil, url)
assert.NotNil(t, proxy)
}
......
......@@ -22,6 +22,7 @@ import (
"reflect"
"strings"
"sync"
"time"
"unicode"
"unicode/utf8"
)
......@@ -39,6 +40,26 @@ type RPCService interface {
Reference() string // rpc service id or reference id
}
// callback interface for async
type AsyncCallbackService interface {
CallBack(response CallResponse) // callback
}
type Options struct {
// connect timeout
ConnectTimeout time.Duration
// request timeout
RequestTimeout time.Duration
}
type CallResponse struct {
Opts Options
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
}
// for lowercase func
// func MethodMapper() map[string][string] {
// return map[string][string]{}
......
......@@ -55,7 +55,7 @@ type ReferenceConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
......@@ -140,8 +140,10 @@ func (refconfig *ReferenceConfig) Refer() {
}
}
callback := GetCallback(refconfig.id)
//create proxy
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url)
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, callback, url)
}
// @v is service provider implemented RPCService
......@@ -169,7 +171,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))
//application info
urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name)
......
......@@ -93,6 +93,7 @@ func doInitConsumer() {
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Async: true,
Methods: []*MethodConfig{
{
Name: "GetUser",
......
......@@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService {
func GetProviderService(name string) common.RPCService {
return proServices[name]
}
func GetCallback(name string) func(common.CallResponse) {
service := GetConsumerService(name)
if sv, ok := service.(common.AsyncCallbackService); ok {
return sv.CallBack
}
return nil
}
......@@ -107,6 +107,7 @@ func setClientGrpool() {
}
type Options struct {
common.Options
// connect timeout
ConnectTimeout time.Duration
// request timeout
......@@ -114,14 +115,10 @@ type Options struct {
}
type CallResponse struct {
Opts Options
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
common.CallResponse
}
type AsyncCallback func(response CallResponse)
type AsyncCallback func(response common.CallResponse)
type Client struct {
opts Options
......
......@@ -26,6 +26,7 @@ import (
import (
"github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common"
perrors "github.com/pkg/errors"
)
......@@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse {
}
}
func (r PendingResponse) GetCallResponse() CallResponse {
return CallResponse{
func (r PendingResponse) GetCallResponse() common.CallResponse {
return common.CallResponse{
Cause: r.err,
Start: r.start,
ReadStart: r.readStart,
......
......@@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
response := NewResponse(inv.Reply(), nil)
if async {
if callBack, ok := inv.CallBack().(func(response CallResponse)); ok {
if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok {
result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
} else {
result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment