From 412bafb4e6e65302ab7cd858580724c02b4c0b81 Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Mon, 6 Jan 2020 16:18:54 +0800 Subject: [PATCH] add code --- common/url_test.go | 4 ++-- config/method_config.go | 2 +- config/reference_config.go | 4 ++-- config/testdata/consumer_config.yml | 23 +++++++++++++++++++---- protocol/dubbo/client.go | 8 ++++++++ protocol/dubbo/dubbo_protocol.go | 4 ++-- protocol/jsonrpc/jsonrpc_protocol.go | 12 +++++++++++- 7 files changed, 45 insertions(+), 12 deletions(-) diff --git a/common/url_test.go b/common/url_test.go index 41fd374a4..ad56aa2d5 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -227,7 +227,7 @@ func TestMergeUrl(t *testing.T) { serviceUrlParams.Set("test2", "1") serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") serviceUrlParams.Set(constant.RETRIES_KEY, "2") - serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2") + serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2") referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"})) serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) @@ -236,7 +236,7 @@ func TestMergeUrl(t *testing.T) { assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, "")) - assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, "")) + assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "")) } func TestURL_SetParams(t *testing.T) { diff --git a/config/method_config.go b/config/method_config.go index 769069a78..32b14a610 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -36,7 +36,7 @@ type MethodConfig struct { TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"` ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` - RequestTimeout string `yaml:"request_timeout" json:"request_timeout,omitempty" property:"request_timeout"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } func (c *MethodConfig) Prefix() string { diff --git a/config/reference_config.go b/config/reference_config.go index 65517f69f..a31ae22da 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -60,7 +60,7 @@ type ReferenceConfig struct { invoker protocol.Invoker urls []*common.URL Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` - RequestTimeout string `yaml:"request_timeout" json:"request_timeout,omitempty" property:"request_timeout"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } func (c *ReferenceConfig) Prefix() string { @@ -169,7 +169,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) - if refconfig.RequestTimeout != "" { + if len(refconfig.RequestTimeout) != 0 { urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout) } diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index f44ea449f..2034186c0 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -41,9 +41,11 @@ references: interface : "com.ikurento.user.UserProvider" url: "dubbo://127.0.0.1:20000/UserProvider" cluster: "failover" + timeout: "3s" methods : - name: "GetUser" retries: "3" + timeout: "5s" params: "serviceid": "soa.com.ikurento.user.UserProvider" @@ -54,12 +56,21 @@ shutdown_conf: step_timeout: 10s protocol_conf: + # when you choose the Dubbo protocol, the following configuration takes effect dubbo: reconnect_interval: 0 + # reconnect_interval is the actual number of connections a session can use connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - pool_size: 64 + # heartbeat_period is heartbeat interval between server and client connection. + # Effective by client configuration + heartbeat_period: "30s" + # when the session is inactive for more than session_timeout, the session may be closed + session_timeout: "30s" + # a reference has the size of the session connection pool + # that is the maximum number of sessions it may have + pool_size: 4 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty pool_ttl: 600 # gr_pool_size is recommended to be set to [cpu core number] * 100 gr_pool_size: 1200 @@ -67,6 +78,8 @@ protocol_conf: queue_len: 64 # queue_number is recommended to be set to gr_pool_size / 20 queue_number: 60 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty getty_session_param: compress_encoding: false tcp_no_delay: true @@ -78,5 +91,7 @@ protocol_conf: tcp_read_timeout: "1s" tcp_write_timeout: "5s" wait_timeout: "1s" - max_msg_len: 1024 + # maximum len of data per request + # this refers to the total amount of data requested or returned + max_msg_len: 102400 session_name: "client" diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 817dbab57..ca04c2db8 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") p.Service.Method = request.method + p.Service.Timeout = c.opts.RequestTimeout + var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") + if len(timeout) != 0 { + if t, err := time.ParseDuration(timeout); err == nil { + p.Service.Timeout = t + } + } + p.Header.SerialID = byte(S_Dubbo) p.Body = hessian.NewRequest(request.args, request.atta) diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 08e28ba89..76ad2f56a 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -69,8 +69,8 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { - //default - requestTimeout := config.GetConsumerConfig().RequestTimeout + //default requestTimeout + var requestTimeout = config.GetConsumerConfig().RequestTimeout requestTimeoutStr := url.GetParam(constant.REQUEST_TIMEOUT, config.GetConsumerConfig().Request_Timeout) if t, err := time.ParseDuration(requestTimeoutStr); err == nil { diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index c18345d41..2be7e7396 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -18,8 +18,10 @@ package jsonrpc import ( + "github.com/apache/dubbo-go/common/constant" "strings" "sync" + "time" ) import ( @@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { + //default requestTimeout + var requestTimeout = config.GetConsumerConfig().RequestTimeout + + requestTimeoutStr := url.GetParam(constant.REQUEST_TIMEOUT, config.GetConsumerConfig().Request_Timeout) + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, - HTTPTimeout: config.GetConsumerConfig().RequestTimeout, + HTTPTimeout: requestTimeout, })) jp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) -- GitLab