diff --git a/common/url.go b/common/url.go
index e7762355a56837796e0eedc4f67a96d3a851c632..7a854293142bb237df7faa0a081104bfabdd0bb4 100644
--- a/common/url.go
+++ b/common/url.go
@@ -523,7 +523,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL {
return true
})
//loadBalance,cluster,retries strategy config
- methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY})
+ methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY})
//remote timestamp
if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 {
diff --git a/common/url_test.go b/common/url_test.go
index 4d60d7f13f5d139e964b0837380f1054871c5d15..c70c58bc215b6449311d43f9f9cffeb89623f80c 100644
--- a/common/url_test.go
+++ b/common/url_test.go
@@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) {
serviceUrlParams.Set("test2", "1")
serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin")
serviceUrlParams.Set(constant.RETRIES_KEY, "2")
- serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2")
+ serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2")
referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"}))
serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams))
@@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, ""))
- assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, ""))
+ assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, ""))
}
func TestURL_SetParams(t *testing.T) {
diff --git a/config/method_config.go b/config/method_config.go
index 876abeeae0c7d37070c5938107d1bb1dd5dbbaa9..e10548e667e6a16d33690f011ebc9958af1eea71 100644
--- a/config/method_config.go
+++ b/config/method_config.go
@@ -37,6 +37,7 @@ type MethodConfig struct {
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
+ RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}
func (c *MethodConfig) Prefix() string {
diff --git a/config/reference_config.go b/config/reference_config.go
index 4e0c56c0bc25e3b71b8edf015580cbe5ac5f0d9c..fde3f7daa5fcdb573e8b5a51606e2be7ef96dc1d 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -40,27 +40,28 @@ import (
)
type ReferenceConfig struct {
- context context.Context
- pxy *proxy.Proxy
- id string
- InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
- Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
- Url string `yaml:"url" json:"url,omitempty" property:"url"`
- Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
- Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
- Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
- Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
- Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
- Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
- Group string `yaml:"group" json:"group,omitempty" property:"group"`
- Version string `yaml:"version" json:"version,omitempty" property:"version"`
- Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
- Async bool `yaml:"async" json:"async,omitempty" property:"async"`
- Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
- invoker protocol.Invoker
- urls []*common.URL
- Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
- Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
+ context context.Context
+ pxy *proxy.Proxy
+ id string
+ InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
+ Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
+ Url string `yaml:"url" json:"url,omitempty" property:"url"`
+ Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
+ Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
+ Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
+ Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
+ Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
+ Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
+ Group string `yaml:"group" json:"group,omitempty" property:"group"`
+ Version string `yaml:"version" json:"version,omitempty" property:"version"`
+ Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
+ Async bool `yaml:"async" json:"async,omitempty" property:"async"`
+ Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
+ invoker protocol.Invoker
+ urls []*common.URL
+ Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
+ Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
+ RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}
func (c *ReferenceConfig) Prefix() string {
@@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
+ if len(refconfig.RequestTimeout) != 0 {
+ urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout)
+ }
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))
urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky))
@@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
+ if len(v.RequestTimeout) != 0 {
+ urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
+ }
}
return urlMap
diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml
index f44ea449fd16235050f6a7ba7823a87e24791780..2034186c0fa0ccf21c3f6fb9df0f5cfd69315113 100644
--- a/config/testdata/consumer_config.yml
+++ b/config/testdata/consumer_config.yml
@@ -41,9 +41,11 @@ references:
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
+ timeout: "3s"
methods :
- name: "GetUser"
retries: "3"
+ timeout: "5s"
params:
"serviceid":
"soa.com.ikurento.user.UserProvider"
@@ -54,12 +56,21 @@ shutdown_conf:
step_timeout: 10s
protocol_conf:
+ # when you choose the Dubbo protocol, the following configuration takes effect
dubbo:
reconnect_interval: 0
+ # reconnect_interval is the actual number of connections a session can use
connection_number: 2
- heartbeat_period: "5s"
- session_timeout: "20s"
- pool_size: 64
+ # heartbeat_period is heartbeat interval between server and client connection.
+ # Effective by client configuration
+ heartbeat_period: "30s"
+ # when the session is inactive for more than session_timeout, the session may be closed
+ session_timeout: "30s"
+ # a reference has the size of the session connection pool
+ # that is the maximum number of sessions it may have
+ pool_size: 4
+ # dubbo-go uses getty as the network connection library.
+ # The following is the relevant configuration of getty
pool_ttl: 600
# gr_pool_size is recommended to be set to [cpu core number] * 100
gr_pool_size: 1200
@@ -67,6 +78,8 @@ protocol_conf:
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 60
+ # dubbo-go uses getty as the network connection library.
+ # The following is the relevant configuration of getty
getty_session_param:
compress_encoding: false
tcp_no_delay: true
@@ -78,5 +91,7 @@ protocol_conf:
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
- max_msg_len: 1024
+ # maximum len of data per request
+ # this refers to the total amount of data requested or returned
+ max_msg_len: 102400
session_name: "client"
diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go
index 81f392565f701d990dc1783d5d467814a0fba5bf..3e2a243103b888d8b94c2e50fe00daabb3d5a032 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/client.go
@@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
p.Service.Method = request.method
+
p.Service.Timeout = c.opts.RequestTimeout
+ var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
+ if len(timeout) != 0 {
+ if t, err := time.ParseDuration(timeout); err == nil {
+ p.Service.Timeout = t
+ }
+ }
+
p.Header.SerialID = byte(S_Dubbo)
p.Body = hessian.NewRequest(request.args, request.atta)
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 59d1ea05160696754b46dfead5713684aa7a94f7..eed22a29cde59b520f1b6ebf9b5baafabcac931f 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -19,10 +19,12 @@ package dubbo
import (
"sync"
+ "time"
)
import (
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
@@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
+ //default requestTimeout
+ var requestTimeout = config.GetConsumerConfig().RequestTimeout
+
+ requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
+ if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
+ requestTimeout = t
+ }
+
invoker := NewDubboInvoker(url, NewClient(Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
- RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+ RequestTimeout: requestTimeout,
}))
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go
index c18345d413edb2d263f1acaef1741514b665f042..a2e7afe69a138e0cd6dbbe05b3f386647895ee15 100644
--- a/protocol/jsonrpc/jsonrpc_protocol.go
+++ b/protocol/jsonrpc/jsonrpc_protocol.go
@@ -20,10 +20,12 @@ package jsonrpc
import (
"strings"
"sync"
+ "time"
)
import (
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
@@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
+ //default requestTimeout
+ var requestTimeout = config.GetConsumerConfig().RequestTimeout
+
+ requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
+ if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
+ requestTimeout = t
+ }
+
invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{
HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout,
- HTTPTimeout: config.GetConsumerConfig().RequestTimeout,
+ HTTPTimeout: requestTimeout,
}))
jp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())