diff --git a/README.md b/README.md index 694f0091533f644b58438c6487dc53348add6e60..b54568ce0317a443d56b1d6d437e5f43334013bd 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Finished List: * [AccessLogFilter](https://github.com/apache/dubbo-go/pull/214) * [TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237) * [ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246) + * [GenericServiceFilter](https://github.com/apache/dubbo-go/pull/291) - Invoke * [generic invoke](https://github.com/apache/dubbo-go/pull/122) diff --git a/common/url.go b/common/url.go index e7762355a56837796e0eedc4f67a96d3a851c632..7a854293142bb237df7faa0a081104bfabdd0bb4 100644 --- a/common/url.go +++ b/common/url.go @@ -523,7 +523,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { return true }) //loadBalance,cluster,retries strategy config - methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY}) + methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY}) //remote timestamp if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 { diff --git a/common/url_test.go b/common/url_test.go index 4d60d7f13f5d139e964b0837380f1054871c5d15..c70c58bc215b6449311d43f9f9cffeb89623f80c 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) { serviceUrlParams.Set("test2", "1") serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") serviceUrlParams.Set(constant.RETRIES_KEY, "2") - serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2") + serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2") referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"})) serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) @@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) { assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, "")) - assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, "")) + assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "")) } func TestURL_SetParams(t *testing.T) { diff --git a/config/method_config.go b/config/method_config.go index 876abeeae0c7d37070c5938107d1bb1dd5dbbaa9..e10548e667e6a16d33690f011ebc9958af1eea71 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -37,6 +37,7 @@ type MethodConfig struct { ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } func (c *MethodConfig) Prefix() string { diff --git a/config/reference_config.go b/config/reference_config.go index 4e0c56c0bc25e3b71b8edf015580cbe5ac5f0d9c..fde3f7daa5fcdb573e8b5a51606e2be7ef96dc1d 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -40,27 +40,28 @@ import ( ) type ReferenceConfig struct { - context context.Context - pxy *proxy.Proxy - id string - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Check *bool `yaml:"check" json:"check,omitempty" property:"check"` - Url string `yaml:"url" json:"url,omitempty" property:"url"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version"` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Async bool `yaml:"async" json:"async,omitempty" property:"async"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - invoker protocol.Invoker - urls []*common.URL - Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` - Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + context context.Context + pxy *proxy.Proxy + id string + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Check *bool `yaml:"check" json:"check,omitempty" property:"check"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version"` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + invoker protocol.Invoker + urls []*common.URL + Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` + Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } func (c *ReferenceConfig) Prefix() string { @@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + if len(refconfig.RequestTimeout) != 0 { + urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout) + } //getty invoke async or sync urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky)) @@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky)) + if len(v.RequestTimeout) != 0 { + urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout) + } } return urlMap diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index f44ea449fd16235050f6a7ba7823a87e24791780..2034186c0fa0ccf21c3f6fb9df0f5cfd69315113 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -41,9 +41,11 @@ references: interface : "com.ikurento.user.UserProvider" url: "dubbo://127.0.0.1:20000/UserProvider" cluster: "failover" + timeout: "3s" methods : - name: "GetUser" retries: "3" + timeout: "5s" params: "serviceid": "soa.com.ikurento.user.UserProvider" @@ -54,12 +56,21 @@ shutdown_conf: step_timeout: 10s protocol_conf: + # when you choose the Dubbo protocol, the following configuration takes effect dubbo: reconnect_interval: 0 + # reconnect_interval is the actual number of connections a session can use connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - pool_size: 64 + # heartbeat_period is heartbeat interval between server and client connection. + # Effective by client configuration + heartbeat_period: "30s" + # when the session is inactive for more than session_timeout, the session may be closed + session_timeout: "30s" + # a reference has the size of the session connection pool + # that is the maximum number of sessions it may have + pool_size: 4 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty pool_ttl: 600 # gr_pool_size is recommended to be set to [cpu core number] * 100 gr_pool_size: 1200 @@ -67,6 +78,8 @@ protocol_conf: queue_len: 64 # queue_number is recommended to be set to gr_pool_size / 20 queue_number: 60 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty getty_session_param: compress_encoding: false tcp_no_delay: true @@ -78,5 +91,7 @@ protocol_conf: tcp_read_timeout: "1s" tcp_write_timeout: "5s" wait_timeout: "1s" - max_msg_len: 1024 + # maximum len of data per request + # this refers to the total amount of data requested or returned + max_msg_len: 102400 session_name: "client" diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 81f392565f701d990dc1783d5d467814a0fba5bf..3e2a243103b888d8b94c2e50fe00daabb3d5a032 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") p.Service.Method = request.method + p.Service.Timeout = c.opts.RequestTimeout + var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") + if len(timeout) != 0 { + if t, err := time.ParseDuration(timeout); err == nil { + p.Service.Timeout = t + } + } + p.Header.SerialID = byte(S_Dubbo) p.Body = hessian.NewRequest(request.args, request.atta) diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 59d1ea05160696754b46dfead5713684aa7a94f7..eed22a29cde59b520f1b6ebf9b5baafabcac931f 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -19,10 +19,12 @@ package dubbo import ( "sync" + "time" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" @@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { + //default requestTimeout + var requestTimeout = config.GetConsumerConfig().RequestTimeout + + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + invoker := NewDubboInvoker(url, NewClient(Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, - RequestTimeout: config.GetConsumerConfig().RequestTimeout, + RequestTimeout: requestTimeout, })) dp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index c18345d413edb2d263f1acaef1741514b665f042..a2e7afe69a138e0cd6dbbe05b3f386647895ee15 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -20,10 +20,12 @@ package jsonrpc import ( "strings" "sync" + "time" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" @@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { + //default requestTimeout + var requestTimeout = config.GetConsumerConfig().RequestTimeout + + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, - HTTPTimeout: config.GetConsumerConfig().RequestTimeout, + HTTPTimeout: requestTimeout, })) jp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String())