Skip to content
Snippets Groups Projects
Commit 7e55002a authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #284 from pantianying/develop_req_timeout_config

Ftr: Develop req timeout config
parents f6c8ac7a e3e5d1ac
No related branches found
No related tags found
No related merge requests found
......@@ -523,7 +523,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL {
return true
})
//loadBalance,cluster,retries strategy config
methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY})
methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY})
//remote timestamp
if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 {
......
......@@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) {
serviceUrlParams.Set("test2", "1")
serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin")
serviceUrlParams.Set(constant.RETRIES_KEY, "2")
serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2")
serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2")
referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"}))
serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams))
......@@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, ""))
assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, ""))
assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, ""))
}
func TestURL_SetParams(t *testing.T) {
......
......@@ -37,6 +37,7 @@ type MethodConfig struct {
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}
func (c *MethodConfig) Prefix() string {
......
......@@ -40,27 +40,28 @@ import (
)
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
context context.Context
pxy *proxy.Proxy
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}
func (c *ReferenceConfig) Prefix() string {
......@@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
if len(refconfig.RequestTimeout) != 0 {
urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout)
}
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))
urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky))
......@@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
if len(v.RequestTimeout) != 0 {
urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
}
}
return urlMap
......
......@@ -41,9 +41,11 @@ references:
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
timeout: "3s"
methods :
- name: "GetUser"
retries: "3"
timeout: "5s"
params:
"serviceid":
"soa.com.ikurento.user.UserProvider"
......@@ -54,12 +56,21 @@ shutdown_conf:
step_timeout: 10s
protocol_conf:
# when you choose the Dubbo protocol, the following configuration takes effect
dubbo:
reconnect_interval: 0
# reconnect_interval is the actual number of connections a session can use
connection_number: 2
heartbeat_period: "5s"
session_timeout: "20s"
pool_size: 64
# heartbeat_period is heartbeat interval between server and client connection.
# Effective by client configuration
heartbeat_period: "30s"
# when the session is inactive for more than session_timeout, the session may be closed
session_timeout: "30s"
# a reference has the size of the session connection pool
# that is the maximum number of sessions it may have
pool_size: 4
# dubbo-go uses getty as the network connection library.
# The following is the relevant configuration of getty
pool_ttl: 600
# gr_pool_size is recommended to be set to [cpu core number] * 100
gr_pool_size: 1200
......@@ -67,6 +78,8 @@ protocol_conf:
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 60
# dubbo-go uses getty as the network connection library.
# The following is the relevant configuration of getty
getty_session_param:
compress_encoding: false
tcp_no_delay: true
......@@ -78,5 +91,7 @@ protocol_conf:
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
# maximum len of data per request
# this refers to the total amount of data requested or returned
max_msg_len: 102400
session_name: "client"
......@@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
p.Service.Method = request.method
p.Service.Timeout = c.opts.RequestTimeout
var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
p.Service.Timeout = t
}
}
p.Header.SerialID = byte(S_Dubbo)
p.Body = hessian.NewRequest(request.args, request.atta)
......
......@@ -19,10 +19,12 @@ package dubbo
import (
"sync"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
......@@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
invoker := NewDubboInvoker(url, NewClient(Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: config.GetConsumerConfig().RequestTimeout,
RequestTimeout: requestTimeout,
}))
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
......
......@@ -20,10 +20,12 @@ package jsonrpc
import (
"strings"
"sync"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
......@@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{
HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout,
HTTPTimeout: config.GetConsumerConfig().RequestTimeout,
HTTPTimeout: requestTimeout,
}))
jp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment