Skip to content
Snippets Groups Projects
Commit e31ffd67 authored by fangyincheng's avatar fangyincheng
Browse files

Merge remote-tracking branch 'apache/develop' into develop

parents 92af46e8 2c86ec0a
No related branches found
No related tags found
No related merge requests found
...@@ -73,6 +73,7 @@ Finished List: ...@@ -73,6 +73,7 @@ Finished List:
* [AccessLogFilter](https://github.com/apache/dubbo-go/pull/214) * [AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)
* [TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237) * [TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)
* [ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246) * [ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246)
* [GenericServiceFilter](https://github.com/apache/dubbo-go/pull/291)
- Invoke - Invoke
* [generic invoke](https://github.com/apache/dubbo-go/pull/122) * [generic invoke](https://github.com/apache/dubbo-go/pull/122)
......
...@@ -523,7 +523,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { ...@@ -523,7 +523,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL {
return true return true
}) })
//loadBalance,cluster,retries strategy config //loadBalance,cluster,retries strategy config
methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY}) methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY})
//remote timestamp //remote timestamp
if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 { if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 {
......
...@@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) { ...@@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) {
serviceUrlParams.Set("test2", "1") serviceUrlParams.Set("test2", "1")
serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin")
serviceUrlParams.Set(constant.RETRIES_KEY, "2") serviceUrlParams.Set(constant.RETRIES_KEY, "2")
serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2") serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2")
referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"})) referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"}))
serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams))
...@@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) { ...@@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, "")) assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, ""))
assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, "")) assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, ""))
} }
func TestURL_SetParams(t *testing.T) { func TestURL_SetParams(t *testing.T) {
......
...@@ -37,6 +37,7 @@ type MethodConfig struct { ...@@ -37,6 +37,7 @@ type MethodConfig struct {
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
} }
func (c *MethodConfig) Prefix() string { func (c *MethodConfig) Prefix() string {
......
...@@ -40,27 +40,28 @@ import ( ...@@ -40,27 +40,28 @@ import (
) )
type ReferenceConfig struct { type ReferenceConfig struct {
context context.Context context context.Context
pxy *proxy.Proxy pxy *proxy.Proxy
id string id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"` Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"` Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"` Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"` Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"` Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker invoker protocol.Invoker
urls []*common.URL urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
} }
func (c *ReferenceConfig) Prefix() string { func (c *ReferenceConfig) Prefix() string {
...@@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { ...@@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
if len(refconfig.RequestTimeout) != 0 {
urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout)
}
//getty invoke async or sync //getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))
urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky)) urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky))
...@@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { ...@@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky)) urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
if len(v.RequestTimeout) != 0 {
urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
}
} }
return urlMap return urlMap
......
...@@ -41,9 +41,11 @@ references: ...@@ -41,9 +41,11 @@ references:
interface : "com.ikurento.user.UserProvider" interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000/UserProvider" url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover" cluster: "failover"
timeout: "3s"
methods : methods :
- name: "GetUser" - name: "GetUser"
retries: "3" retries: "3"
timeout: "5s"
params: params:
"serviceid": "serviceid":
"soa.com.ikurento.user.UserProvider" "soa.com.ikurento.user.UserProvider"
...@@ -54,12 +56,21 @@ shutdown_conf: ...@@ -54,12 +56,21 @@ shutdown_conf:
step_timeout: 10s step_timeout: 10s
protocol_conf: protocol_conf:
# when you choose the Dubbo protocol, the following configuration takes effect
dubbo: dubbo:
reconnect_interval: 0 reconnect_interval: 0
# reconnect_interval is the actual number of connections a session can use
connection_number: 2 connection_number: 2
heartbeat_period: "5s" # heartbeat_period is heartbeat interval between server and client connection.
session_timeout: "20s" # Effective by client configuration
pool_size: 64 heartbeat_period: "30s"
# when the session is inactive for more than session_timeout, the session may be closed
session_timeout: "30s"
# a reference has the size of the session connection pool
# that is the maximum number of sessions it may have
pool_size: 4
# dubbo-go uses getty as the network connection library.
# The following is the relevant configuration of getty
pool_ttl: 600 pool_ttl: 600
# gr_pool_size is recommended to be set to [cpu core number] * 100 # gr_pool_size is recommended to be set to [cpu core number] * 100
gr_pool_size: 1200 gr_pool_size: 1200
...@@ -67,6 +78,8 @@ protocol_conf: ...@@ -67,6 +78,8 @@ protocol_conf:
queue_len: 64 queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20 # queue_number is recommended to be set to gr_pool_size / 20
queue_number: 60 queue_number: 60
# dubbo-go uses getty as the network connection library.
# The following is the relevant configuration of getty
getty_session_param: getty_session_param:
compress_encoding: false compress_encoding: false
tcp_no_delay: true tcp_no_delay: true
...@@ -78,5 +91,7 @@ protocol_conf: ...@@ -78,5 +91,7 @@ protocol_conf:
tcp_read_timeout: "1s" tcp_read_timeout: "1s"
tcp_write_timeout: "5s" tcp_write_timeout: "5s"
wait_timeout: "1s" wait_timeout: "1s"
max_msg_len: 1024 # maximum len of data per request
# this refers to the total amount of data requested or returned
max_msg_len: 102400
session_name: "client" session_name: "client"
...@@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac ...@@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
p.Service.Method = request.method p.Service.Method = request.method
p.Service.Timeout = c.opts.RequestTimeout p.Service.Timeout = c.opts.RequestTimeout
var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
p.Service.Timeout = t
}
}
p.Header.SerialID = byte(S_Dubbo) p.Header.SerialID = byte(S_Dubbo)
p.Body = hessian.NewRequest(request.args, request.atta) p.Body = hessian.NewRequest(request.args, request.atta)
......
...@@ -19,10 +19,12 @@ package dubbo ...@@ -19,10 +19,12 @@ package dubbo
import ( import (
"sync" "sync"
"time"
) )
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config"
...@@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { ...@@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
} }
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
invoker := NewDubboInvoker(url, NewClient(Options{ invoker := NewDubboInvoker(url, NewClient(Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: config.GetConsumerConfig().RequestTimeout, RequestTimeout: requestTimeout,
})) }))
dp.SetInvokers(invoker) dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String()) logger.Infof("Refer service: %s", url.String())
......
...@@ -20,10 +20,12 @@ package jsonrpc ...@@ -20,10 +20,12 @@ package jsonrpc
import ( import (
"strings" "strings"
"sync" "sync"
"time"
) )
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config"
...@@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { ...@@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
} }
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{
HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout,
HTTPTimeout: config.GetConsumerConfig().RequestTimeout, HTTPTimeout: requestTimeout,
})) }))
jp.SetInvokers(invoker) jp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String()) logger.Infof("Refer service: %s", url.String())
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment