diff --git a/README.md b/README.md index 694f0091533f644b58438c6487dc53348add6e60..b54568ce0317a443d56b1d6d437e5f43334013bd 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Finished List: * [AccessLogFilter](https://github.com/apache/dubbo-go/pull/214) * [TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237) * [ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246) + * [GenericServiceFilter](https://github.com/apache/dubbo-go/pull/291) - Invoke * [generic invoke](https://github.com/apache/dubbo-go/pull/122) diff --git a/cluster/router/condition_router.go b/cluster/router/condition_router.go index bd3ee21c8c63475888c12cfd0b0317e0492a7246..40208405ecc2f0850ab49f7d9f7c756d848d6134 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition_router.go @@ -24,7 +24,7 @@ import ( ) import ( - "github.com/dubbogo/gost/container/gxset" + gxset "github.com/dubbogo/gost/container/set" gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) diff --git a/common/constant/default.go b/common/constant/default.go index 6e0f8488783ebe66939436ca14670395e2719be7..992fc32748bb4fc7777cffecc9137663c681c3f7 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -46,7 +46,7 @@ const ( const ( DEFAULT_KEY = "default" PREFIX_DEFAULT_KEY = "default." - DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute,pshutdown" + DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,generic_service,execute,pshutdown" DEFAULT_REFERENCE_FILTERS = "cshutdown" GENERIC_REFERENCE_FILTERS = "generic" GENERIC = "$invoke" diff --git a/common/extension/filter.go b/common/extension/filter.go index 93f7f8cf7ccc4108fe1120b685fad36a2f9f83df..0b5c4b40aa78f7ea489a306f4a52efbb07243b41 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -19,12 +19,11 @@ package extension import ( "github.com/apache/dubbo-go/filter" - "github.com/apache/dubbo-go/filter/common" ) var ( filters = make(map[string]func() filter.Filter) - rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler) + rejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler) ) func SetFilter(name string, v func() filter.Filter) { @@ -38,11 +37,11 @@ func GetFilter(name string) filter.Filter { return filters[name]() } -func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) { +func SetRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) { rejectedExecutionHandler[name] = creator } -func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler { +func GetRejectedExecutionHandler(name string) filter.RejectedExecutionHandler { creator, ok := rejectedExecutionHandler[name] if !ok { panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " + diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go index 151c33ad5e64ffa4059489e2cbcfae6f2e823328..8c131fafa3159047d25b43ae0a57d674418a2170 100644 --- a/common/extension/tps_limit.go +++ b/common/extension/tps_limit.go @@ -18,19 +18,19 @@ package extension import ( - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) var ( - tpsLimitStrategy = make(map[string]tps.TpsLimitStrategyCreator) - tpsLimiter = make(map[string]func() tps.TpsLimiter) + tpsLimitStrategy = make(map[string]filter.TpsLimitStrategyCreator) + tpsLimiter = make(map[string]func() filter.TpsLimiter) ) -func SetTpsLimiter(name string, creator func() tps.TpsLimiter) { +func SetTpsLimiter(name string, creator func() filter.TpsLimiter) { tpsLimiter[name] = creator } -func GetTpsLimiter(name string) tps.TpsLimiter { +func GetTpsLimiter(name string) filter.TpsLimiter { creator, ok := tpsLimiter[name] if !ok { panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " + @@ -39,11 +39,11 @@ func GetTpsLimiter(name string) tps.TpsLimiter { return creator() } -func SetTpsLimitStrategy(name string, creator tps.TpsLimitStrategyCreator) { +func SetTpsLimitStrategy(name string, creator filter.TpsLimitStrategyCreator) { tpsLimitStrategy[name] = creator } -func GetTpsLimitStrategyCreator(name string) tps.TpsLimitStrategyCreator { +func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator { creator, ok := tpsLimitStrategy[name] if !ok { panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " + diff --git a/common/url.go b/common/url.go index fadec7acf0ebb2b4dc4e816502b1b1697ba0e9bc..9ecb14013d8460f778cdfb7552e6b058220c5d6c 100644 --- a/common/url.go +++ b/common/url.go @@ -31,7 +31,7 @@ import ( ) import ( - "github.com/dubbogo/gost/container/gxset" + gxset "github.com/dubbogo/gost/container/set" "github.com/jinzhu/copier" perrors "github.com/pkg/errors" "github.com/satori/go.uuid" @@ -553,7 +553,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL { return true }) //loadBalance,cluster,retries strategy config - methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY}) + methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY}) //remote timestamp if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 { diff --git a/common/url_test.go b/common/url_test.go index 4d60d7f13f5d139e964b0837380f1054871c5d15..c70c58bc215b6449311d43f9f9cffeb89623f80c 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) { serviceUrlParams.Set("test2", "1") serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin") serviceUrlParams.Set(constant.RETRIES_KEY, "2") - serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2") + serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2") referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"})) serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams)) @@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) { assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, "")) - assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, "")) + assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "")) } func TestURL_SetParams(t *testing.T) { diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index fedb2c15ecdab62d17f0a4e83c45522f1c18acb0..83e2589c7b2869a5822c5e90de1699b3bd27df92 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -25,7 +25,7 @@ import ( ) import ( - "github.com/dubbogo/gost/container/gxset" + gxset "github.com/dubbogo/gost/container/set" ) import ( diff --git a/config/method_config.go b/config/method_config.go index 876abeeae0c7d37070c5938107d1bb1dd5dbbaa9..e10548e667e6a16d33690f011ebc9958af1eea71 100644 --- a/config/method_config.go +++ b/config/method_config.go @@ -37,6 +37,7 @@ type MethodConfig struct { ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"` ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"` Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } func (c *MethodConfig) Prefix() string { diff --git a/config/reference_config.go b/config/reference_config.go index 4e0c56c0bc25e3b71b8edf015580cbe5ac5f0d9c..fde3f7daa5fcdb573e8b5a51606e2be7ef96dc1d 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -40,27 +40,28 @@ import ( ) type ReferenceConfig struct { - context context.Context - pxy *proxy.Proxy - id string - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Check *bool `yaml:"check" json:"check,omitempty" property:"check"` - Url string `yaml:"url" json:"url,omitempty" property:"url"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version"` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Async bool `yaml:"async" json:"async,omitempty" property:"async"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - invoker protocol.Invoker - urls []*common.URL - Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` - Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + context context.Context + pxy *proxy.Proxy + id string + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Check *bool `yaml:"check" json:"check,omitempty" property:"check"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version"` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + invoker protocol.Invoker + urls []*common.URL + Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` + Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` } func (c *ReferenceConfig) Prefix() string { @@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + if len(refconfig.RequestTimeout) != 0 { + urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout) + } //getty invoke async or sync urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky)) @@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries) urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky)) + if len(v.RequestTimeout) != 0 { + urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout) + } } return urlMap diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index f44ea449fd16235050f6a7ba7823a87e24791780..2034186c0fa0ccf21c3f6fb9df0f5cfd69315113 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -41,9 +41,11 @@ references: interface : "com.ikurento.user.UserProvider" url: "dubbo://127.0.0.1:20000/UserProvider" cluster: "failover" + timeout: "3s" methods : - name: "GetUser" retries: "3" + timeout: "5s" params: "serviceid": "soa.com.ikurento.user.UserProvider" @@ -54,12 +56,21 @@ shutdown_conf: step_timeout: 10s protocol_conf: + # when you choose the Dubbo protocol, the following configuration takes effect dubbo: reconnect_interval: 0 + # reconnect_interval is the actual number of connections a session can use connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - pool_size: 64 + # heartbeat_period is heartbeat interval between server and client connection. + # Effective by client configuration + heartbeat_period: "30s" + # when the session is inactive for more than session_timeout, the session may be closed + session_timeout: "30s" + # a reference has the size of the session connection pool + # that is the maximum number of sessions it may have + pool_size: 4 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty pool_ttl: 600 # gr_pool_size is recommended to be set to [cpu core number] * 100 gr_pool_size: 1200 @@ -67,6 +78,8 @@ protocol_conf: queue_len: 64 # queue_number is recommended to be set to gr_pool_size / 20 queue_number: 60 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty getty_session_param: compress_encoding: false tcp_no_delay: true @@ -78,5 +91,7 @@ protocol_conf: tcp_read_timeout: "1s" tcp_write_timeout: "5s" wait_timeout: "1s" - max_msg_len: 1024 + # maximum len of data per request + # this refers to the total amount of data requested or returned + max_msg_len: 102400 session_name: "client" diff --git a/config_center/apollo/impl_test.go b/config_center/apollo/impl_test.go index e898be91ee356180f5967f9dd5a02df0dbcfb311..2a891a212e3d37a0a2a3217b9883d173d6373859 100644 --- a/config_center/apollo/impl_test.go +++ b/config_center/apollo/impl_test.go @@ -21,9 +21,11 @@ import ( "fmt" "net/http" "net/http/httptest" + "os" "strings" "sync" "testing" + "time" ) import ( @@ -171,6 +173,7 @@ func Test_GetConfig(t *testing.T) { mapContent, err := configuration.Parser().Parse(configs) assert.NoError(t, err) assert.Equal(t, "ikurento.com", mapContent["application.organization"]) + deleteMockJson(t) } func Test_GetConfigItem(t *testing.T) { @@ -180,6 +183,7 @@ func Test_GetConfigItem(t *testing.T) { configuration.SetParser(&parser.DefaultConfigurationParser{}) assert.NoError(t, err) assert.Equal(t, "ikurento.com", configs) + deleteMockJson(t) } func initMockApollo(t *testing.T) *apolloConfiguration { @@ -216,6 +220,7 @@ func TestAddListener(t *testing.T) { listener.wg.Wait() assert.Equal(t, "registries.hangzhouzk.username", listener.event) assert.Greater(t, listener.count, 0) + deleteMockJson(t) } func TestRemoveListener(t *testing.T) { @@ -244,6 +249,7 @@ func TestRemoveListener(t *testing.T) { }) assert.Equal(t, listenerCount, 0) assert.Equal(t, listener.count, 0) + deleteMockJson(t) } type apolloDataListener struct { @@ -260,3 +266,10 @@ func (l *apolloDataListener) Process(configType *config_center.ConfigChangeEvent l.count++ l.event = configType.Key } + +func deleteMockJson(t *testing.T) { + //because the file write in another goroutine,so have a break ... + time.Sleep(100 * time.Millisecond) + remove := os.Remove("mockDubbog.properties.json") + t.Log("remove result:", remove) +} diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index e85b4d3ec9d5e6f9f7163cefce3f328f8dcc225a..8e8fe5cc1ab91eb779a73f85e3a71984f0ba6798 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -21,7 +21,7 @@ import ( ) import ( - "github.com/dubbogo/gost/container/gxset" + gxset "github.com/dubbogo/gost/container/set" gxnet "github.com/dubbogo/gost/net" ) diff --git a/filter/common/impl/rejected_execution_handler_mock.go b/filter/common/rejected_execution_handler_mock.go similarity index 99% rename from filter/common/impl/rejected_execution_handler_mock.go rename to filter/common/rejected_execution_handler_mock.go index dace1894668d3a4a154a87bfbdbcc860a97a11ec..a5631af9f7600cae772437877b1ac9139655cc5f 100644 --- a/filter/common/impl/rejected_execution_handler_mock.go +++ b/filter/common/rejected_execution_handler_mock.go @@ -18,7 +18,7 @@ // Source: rejected_execution_handler.go // Package filter is a generated GoMock package. -package impl +package common import ( reflect "reflect" diff --git a/filter/common/impl/rejected_execution_handler_only_log.go b/filter/common/rejected_execution_handler_only_log.go similarity index 93% rename from filter/common/impl/rejected_execution_handler_only_log.go rename to filter/common/rejected_execution_handler_only_log.go index 8943433af1ba908fc834740163df78e3b2b6443a..65abe677f129fa2fbe412c7e3ea2b23de2f1ade6 100644 --- a/filter/common/impl/rejected_execution_handler_only_log.go +++ b/filter/common/rejected_execution_handler_only_log.go @@ -15,9 +15,10 @@ * limitations under the License. */ -package impl +package common import ( + "github.com/apache/dubbo-go/filter" "sync" ) @@ -26,7 +27,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - filterCommon "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) @@ -61,7 +61,7 @@ func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL return &protocol.RPCResult{} } -func GetOnlyLogRejectedExecutionHandler() filterCommon.RejectedExecutionHandler { +func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler { onlyLogHandlerOnce.Do(func() { onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} }) diff --git a/filter/common/impl/rejected_execution_handler_only_log_test.go b/filter/common/rejected_execution_handler_only_log_test.go similarity index 98% rename from filter/common/impl/rejected_execution_handler_only_log_test.go rename to filter/common/rejected_execution_handler_only_log_test.go index da54d8a106338dd4f21f9b01e66b031e3c311e01..0efc3d813771577d38fd5e7989255fc097b49a08 100644 --- a/filter/common/impl/rejected_execution_handler_only_log_test.go +++ b/filter/common/rejected_execution_handler_only_log_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package common import ( "net/url" diff --git a/filter/impl/access_log_filter.go b/filter/filter/access_log_filter.go similarity index 99% rename from filter/impl/access_log_filter.go rename to filter/filter/access_log_filter.go index 89fa34952f99057f1d8bb35794a57f9905f5f169..cce2c5050fcbc60bc45b7bc2751685a8d9677b76 100644 --- a/filter/impl/access_log_filter.go +++ b/filter/filter/access_log_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "os" diff --git a/filter/impl/access_log_filter_test.go b/filter/filter/access_log_filter_test.go similarity index 99% rename from filter/impl/access_log_filter_test.go rename to filter/filter/access_log_filter_test.go index 834d531f05f952c41abfe8e1c56c20c0285926b8..2c17021a9f17d3d99c48e5763087c0b03b490b93 100644 --- a/filter/impl/access_log_filter_test.go +++ b/filter/filter/access_log_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "context" diff --git a/filter/impl/active_filter.go b/filter/filter/active_filter.go similarity index 99% rename from filter/impl/active_filter.go rename to filter/filter/active_filter.go index 36a4e1a767ab7170ce8e5bebf2cfa4403f6ad4ff..e0f73c2b2facd53b23e491be2e5b123b5d33087d 100644 --- a/filter/impl/active_filter.go +++ b/filter/filter/active_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "github.com/apache/dubbo-go/common/extension" diff --git a/filter/impl/echo_filter.go b/filter/filter/echo_filter.go similarity index 99% rename from filter/impl/echo_filter.go rename to filter/filter/echo_filter.go index 18e42c8cb2b15acb27573c5e24f11a8b69e0d496..1515c0a99c77d7a4d9af93e141cbed9b529158d7 100644 --- a/filter/impl/echo_filter.go +++ b/filter/filter/echo_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "github.com/apache/dubbo-go/common/constant" diff --git a/filter/impl/echo_filter_test.go b/filter/filter/echo_filter_test.go similarity index 98% rename from filter/impl/echo_filter_test.go rename to filter/filter/echo_filter_test.go index e2e592974701ad18c5b01e884485c022ee2320b8..d57d54329f52955d283366f6edc1376a1a474bde 100644 --- a/filter/impl/echo_filter_test.go +++ b/filter/filter/echo_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "testing" diff --git a/filter/impl/execute_limit_filter.go b/filter/filter/execute_limit_filter.go similarity index 98% rename from filter/impl/execute_limit_filter.go rename to filter/filter/execute_limit_filter.go index 156af1b140283dd76c4867ca26e9b42ce8eb25c0..4b5ea7491c19a726f1d90b7588ac5a480cd38590 100644 --- a/filter/impl/execute_limit_filter.go +++ b/filter/filter/execute_limit_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "strconv" @@ -32,7 +32,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" - _ "github.com/apache/dubbo-go/filter/common/impl" + _ "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) diff --git a/filter/impl/execute_limit_filter_test.go b/filter/filter/execute_limit_filter_test.go similarity index 99% rename from filter/impl/execute_limit_filter_test.go rename to filter/filter/execute_limit_filter_test.go index 5d729c0e6a1205902856eccfa6aa96b0bee0e790..326b13677b157fbba2495caf5699c246d0d62879 100644 --- a/filter/impl/execute_limit_filter_test.go +++ b/filter/filter/execute_limit_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "net/url" diff --git a/filter/impl/generic_filter.go b/filter/filter/generic_filter.go similarity index 90% rename from filter/impl/generic_filter.go rename to filter/filter/generic_filter.go index 35aadb11a444bda56109e238b17267f71ec2606b..9fb26f15aec7027d46526ed61a46d088a0b6b5e8 100644 --- a/filter/impl/generic_filter.go +++ b/filter/filter/generic_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "reflect" @@ -47,22 +47,21 @@ type GenericFilter struct{} func (ef *GenericFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { oldArguments := invocation.Arguments() - var newParams []hessian.Object + if oldParams, ok := oldArguments[2].([]interface{}); ok { + newParams := make([]hessian.Object, 0, len(oldParams)) for i := range oldParams { newParams = append(newParams, hessian.Object(struct2MapAll(oldParams[i]))) } - } else { - return invoker.Invoke(invocation) - } - newArguments := []interface{}{ - oldArguments[0], - oldArguments[1], - newParams, + newArguments := []interface{}{ + oldArguments[0], + oldArguments[1], + newParams, + } + newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) + newInvocation.SetReply(invocation.Reply()) + return invoker.Invoke(newInvocation) } - newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) - newInvocation.SetReply(invocation.Reply()) - return invoker.Invoke(newInvocation) } return invoker.Invoke(invocation) } diff --git a/filter/impl/generic_filter_test.go b/filter/filter/generic_filter_test.go similarity index 99% rename from filter/impl/generic_filter_test.go rename to filter/filter/generic_filter_test.go index 9797c40df1f57017241675013620a53320e475ad..d5298adbd404d7a525bf66ef248cf54b525a6647 100644 --- a/filter/impl/generic_filter_test.go +++ b/filter/filter/generic_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "reflect" diff --git a/filter/filter/generic_service_filter.go b/filter/filter/generic_service_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..514a51f0b0f4c5d16109b97f74d1095e1842f658 --- /dev/null +++ b/filter/filter/generic_service_filter.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "reflect" + "strings" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/mitchellh/mapstructure" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" + invocation2 "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + GENERIC_SERVICE = "generic_service" + GENERIC_SERIALIZATION_DEFAULT = "true" +) + +func init() { + extension.SetFilter(GENERIC_SERVICE, GetGenericServiceFilter) +} + +type GenericServiceFilter struct{} + +func (ef *GenericServiceFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + logger.Infof("invoking generic service filter.") + logger.Debugf("generic service filter methodName:%v,args:%v", invocation.MethodName(), len(invocation.Arguments())) + + if invocation.MethodName() != constant.GENERIC || len(invocation.Arguments()) != 3 { + return invoker.Invoke(invocation) + } + + var ( + ok bool + err error + methodName string + newParams []interface{} + genericKey string + argsType []reflect.Type + oldParams []hessian.Object + ) + + url := invoker.GetUrl() + methodName = invocation.Arguments()[0].(string) + // get service + svc := common.ServiceMap.GetService(url.Protocol, strings.TrimPrefix(url.Path, "/")) + // get method + method := svc.Method()[methodName] + if method == nil { + logger.Errorf("[Generic Service Filter] Don't have this method: %s", methodName) + return &protocol.RPCResult{} + } + argsType = method.ArgsType() + genericKey = invocation.AttachmentsByKey(constant.GENERIC_KEY, GENERIC_SERIALIZATION_DEFAULT) + if genericKey == GENERIC_SERIALIZATION_DEFAULT { + oldParams, ok = invocation.Arguments()[2].([]hessian.Object) + } else { + logger.Errorf("[Generic Service Filter] Don't support this generic: %s", genericKey) + return &protocol.RPCResult{} + } + if !ok { + logger.Errorf("[Generic Service Filter] wrong serialization") + return &protocol.RPCResult{} + } + if len(oldParams) != len(argsType) { + logger.Errorf("[Generic Service Filter] method:%s invocation arguments number was wrong", methodName) + return &protocol.RPCResult{} + } + // oldParams convert to newParams + newParams = make([]interface{}, len(oldParams)) + for i := range argsType { + newParam := reflect.New(argsType[i]).Interface() + err = mapstructure.Decode(oldParams[i], newParam) + newParam = reflect.ValueOf(newParam).Elem().Interface() + if err != nil { + logger.Errorf("[Generic Service Filter] decode arguments map to struct wrong: error{%v}", perrors.WithStack(err)) + return &protocol.RPCResult{} + } + newParams[i] = newParam + } + newInvocation := invocation2.NewRPCInvocation(methodName, newParams, invocation.Attachments()) + newInvocation.SetReply(invocation.Reply()) + return invoker.Invoke(newInvocation) +} + +func (ef *GenericServiceFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 && result.Result() != nil { + v := reflect.ValueOf(result.Result()) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + result.SetResult(struct2MapAll(v.Interface())) + } + return result +} + +func GetGenericServiceFilter() filter.Filter { + return &GenericServiceFilter{} +} diff --git a/filter/filter/generic_service_filter_test.go b/filter/filter/generic_service_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..599a6a66d07ee0ed95545680ccb195f1a2fdeb68 --- /dev/null +++ b/filter/filter/generic_service_filter_test.go @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter + +import ( + "context" + "errors" + "reflect" + "testing" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +type TestStruct struct { + AaAa string + BaBa string `m:"baBa"` + XxYy struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + } `m:"xxYy"` +} + +func (c *TestStruct) JavaClassName() string { + return "com.test.testStruct" +} + +type TestService struct { +} + +func (ts *TestService) MethodOne(ctx context.Context, test1 *TestStruct, test2 []TestStruct, + test3 interface{}, test4 []interface{}, test5 *string) (*TestStruct, error) { + if test1 == nil { + return nil, errors.New("param test1 is nil") + } + if test2 == nil { + return nil, errors.New("param test2 is nil") + } + if test3 == nil { + return nil, errors.New("param test3 is nil") + } + if test4 == nil { + return nil, errors.New("param test4 is nil") + } + if test5 == nil { + return nil, errors.New("param test5 is nil") + } + return &TestStruct{}, nil +} + +func (s *TestService) Reference() string { + return "com.test.Path" +} + +func TestGenericServiceFilter_Invoke(t *testing.T) { + hessian.RegisterPOJO(&TestStruct{}) + methodName := "$invoke" + m := make(map[string]interface{}) + m["AaAa"] = "nihao" + x := make(map[string]interface{}) + x["xxXX"] = "nihaoxxx" + m["XxYy"] = x + aurguments := []interface{}{ + "MethodOne", + nil, + []hessian.Object{ + hessian.Object(m), + hessian.Object(append(make([]map[string]interface{}, 1), m)), + hessian.Object("111"), + hessian.Object(append(make([]map[string]interface{}, 1), m)), + hessian.Object("222")}, + } + s := &TestService{} + _, _ = common.ServiceMap.Register("testprotocol", s) + rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) + filter := GetGenericServiceFilter() + url, _ := common.NewURL(context.Background(), "testprotocol://127.0.0.1:20000/com.test.Path") + result := filter.Invoke(&proxy_factory.ProxyInvoker{BaseInvoker: *protocol.NewBaseInvoker(url)}, rpcInvocation) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) +} + +func TestGenericServiceFilter_ResponseTestStruct(t *testing.T) { + ts := &TestStruct{ + AaAa: "aaa", + BaBa: "bbb", + XxYy: struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + }{}, + } + result := &protocol.RPCResult{ + Rest: ts, + } + aurguments := []interface{}{ + "MethodOne", + nil, + []hessian.Object{nil}, + } + filter := GetGenericServiceFilter() + methodName := "$invoke" + rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) + r := filter.OnResponse(result, nil, rpcInvocation) + assert.NotNil(t, r.Result()) + assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.Map) +} + +func TestGenericServiceFilter_ResponseString(t *testing.T) { + str := "111" + result := &protocol.RPCResult{ + Rest: str, + } + aurguments := []interface{}{ + "MethodOne", + nil, + []hessian.Object{nil}, + } + filter := GetGenericServiceFilter() + methodName := "$invoke" + rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil) + r := filter.OnResponse(result, nil, rpcInvocation) + assert.NotNil(t, r.Result()) + assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.String) +} diff --git a/filter/impl/graceful_shutdown_filter.go b/filter/filter/graceful_shutdown_filter.go similarity index 95% rename from filter/impl/graceful_shutdown_filter.go rename to filter/filter/graceful_shutdown_filter.go index b912ea88e4ba4741b7d7fe36b8bbd3ba158abe63..c682c7ef79deef2e66178cf1c43ec87992e960dc 100644 --- a/filter/impl/graceful_shutdown_filter.go +++ b/filter/filter/graceful_shutdown_filter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "sync/atomic" @@ -27,7 +27,6 @@ import ( "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/filter" - "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" ) @@ -78,7 +77,7 @@ func (gf *gracefulShutdownFilter) rejectNewRequest() bool { return gf.shutdownConfig.RejectRequest } -func (gf *gracefulShutdownFilter) getRejectHandler() common.RejectedExecutionHandler { +func (gf *gracefulShutdownFilter) getRejectHandler() filter.RejectedExecutionHandler { handler := constant.DEFAULT_KEY if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 { handler = gf.shutdownConfig.RejectRequestHandler diff --git a/filter/impl/graceful_shutdown_filter_test.go b/filter/filter/graceful_shutdown_filter_test.go similarity index 89% rename from filter/impl/graceful_shutdown_filter_test.go rename to filter/filter/graceful_shutdown_filter_test.go index 21da167ea0f201ea357c51cab0ecb4f8ebec0957..af57cd4ec8db7a32abc9cdc09724035d8f67cd36 100644 --- a/filter/impl/graceful_shutdown_filter_test.go +++ b/filter/filter/graceful_shutdown_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "net/url" @@ -31,8 +31,8 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config" - filterCommon "github.com/apache/dubbo-go/filter/common" - "github.com/apache/dubbo-go/filter/common/impl" + "github.com/apache/dubbo-go/filter" + common2 "github.com/apache/dubbo-go/filter/common" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -66,8 +66,8 @@ func TestGenericFilter_Invoke(t *testing.T) { assert.True(t, shutdownFilter.rejectNewRequest()) result = shutdownFilter.OnResponse(nil, protocol.NewBaseInvoker(*invokeUrl), invoc) - rejectHandler := &impl.OnlyLogRejectedExecutionHandler{} - extension.SetRejectedExecutionHandler("mock", func() filterCommon.RejectedExecutionHandler { + rejectHandler := &common2.OnlyLogRejectedExecutionHandler{} + extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler { return rejectHandler }) assert.True(t, providerConfig.ShutdownConfig.RequestsFinished) diff --git a/filter/impl/hystrix_filter.go b/filter/filter/hystrix_filter.go similarity index 99% rename from filter/impl/hystrix_filter.go rename to filter/filter/hystrix_filter.go index 3fd9f87168616b69d5ec72460767890d6956c154..a7c57b4dd6c4a50f8ff90c6e22ff27cc5ef6658e 100644 --- a/filter/impl/hystrix_filter.go +++ b/filter/filter/hystrix_filter.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package impl +package filter import ( "fmt" diff --git a/filter/impl/hystrix_filter_test.go b/filter/filter/hystrix_filter_test.go similarity index 99% rename from filter/impl/hystrix_filter_test.go rename to filter/filter/hystrix_filter_test.go index d3a5183ede25d8a325bb1c73020edddd2ffbc638..37432940300e558eee971448c5829b2d6c8f2696 100644 --- a/filter/impl/hystrix_filter_test.go +++ b/filter/filter/hystrix_filter_test.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package impl +package filter import ( "regexp" diff --git a/filter/impl/token_filter.go b/filter/filter/token_filter.go similarity index 99% rename from filter/impl/token_filter.go rename to filter/filter/token_filter.go index d10dff5b761d0fbe40ff3a14a93ee8962d000e02..07b80f3402dbd63243b1c48e2d98c80a1f704362 100644 --- a/filter/impl/token_filter.go +++ b/filter/filter/token_filter.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package impl +package filter import ( "strings" diff --git a/filter/impl/token_filter_test.go b/filter/filter/token_filter_test.go similarity index 99% rename from filter/impl/token_filter_test.go rename to filter/filter/token_filter_test.go index 1473f274037699260725ff9ebb1b3d1377efb326..4434865de7918e41720fdd74eace32e9483901b6 100644 --- a/filter/impl/token_filter_test.go +++ b/filter/filter/token_filter_test.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package impl +package filter import ( "net/url" diff --git a/filter/impl/tps_limit_filter.go b/filter/filter/tps_limit_filter.go similarity index 95% rename from filter/impl/tps_limit_filter.go rename to filter/filter/tps_limit_filter.go index 3cb7381c8616abd61fe2ac306b59694a92715dda..ccccec00d4741481534185eaab290fc717864bd8 100644 --- a/filter/impl/tps_limit_filter.go +++ b/filter/filter/tps_limit_filter.go @@ -15,15 +15,15 @@ * limitations under the License. */ -package impl +package filter import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/filter" - _ "github.com/apache/dubbo-go/filter/common/impl" - _ "github.com/apache/dubbo-go/filter/impl/tps/impl" + _ "github.com/apache/dubbo-go/filter/common" + _ "github.com/apache/dubbo-go/filter/tps" "github.com/apache/dubbo-go/protocol" ) diff --git a/filter/impl/tps_limit_filter_test.go b/filter/filter/tps_limit_filter_test.go similarity index 84% rename from filter/impl/tps_limit_filter_test.go rename to filter/filter/tps_limit_filter_test.go index debdbd00dec97ed67d789bfc45103993c014ab4a..6acaab7036307299da18aefbccf011923df8e287 100644 --- a/filter/impl/tps_limit_filter_test.go +++ b/filter/filter/tps_limit_filter_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package filter import ( "net/url" @@ -23,6 +23,9 @@ import ( ) import ( + "github.com/apache/dubbo-go/filter" + common2 "github.com/apache/dubbo-go/filter/common" + "github.com/apache/dubbo-go/filter/tps" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -31,10 +34,6 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - filterCommon "github.com/apache/dubbo-go/filter/common" - filterCommonImpl "github.com/apache/dubbo-go/filter/common/impl" - "github.com/apache/dubbo-go/filter/impl/tps" - "github.com/apache/dubbo-go/filter/impl/tps/impl" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -56,9 +55,9 @@ func TestTpsLimitFilter_Invoke_With_No_TpsLimiter(t *testing.T) { func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockLimiter := impl.NewMockTpsLimiter(ctrl) + mockLimiter := tps.NewMockTpsLimiter(ctrl) mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(true).Times(1) - extension.SetTpsLimiter(constant.DEFAULT_KEY, func() tps.TpsLimiter { + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter { return mockLimiter }) @@ -77,17 +76,17 @@ func TestGenericFilter_Invoke_With_Default_TpsLimiter(t *testing.T) { func TestGenericFilter_Invoke_With_Default_TpsLimiter_Not_Allow(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockLimiter := impl.NewMockTpsLimiter(ctrl) + mockLimiter := tps.NewMockTpsLimiter(ctrl) mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(false).Times(1) - extension.SetTpsLimiter(constant.DEFAULT_KEY, func() tps.TpsLimiter { + extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter { return mockLimiter }) mockResult := &protocol.RPCResult{} - mockRejectedHandler := filterCommonImpl.NewMockRejectedExecutionHandler(ctrl) + mockRejectedHandler := common2.NewMockRejectedExecutionHandler(ctrl) mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1) - extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() filterCommon.RejectedExecutionHandler { + extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() filter.RejectedExecutionHandler { return mockRejectedHandler }) diff --git a/filter/common/rejected_execution_handler.go b/filter/rejected_execution_handler.go similarity index 98% rename from filter/common/rejected_execution_handler.go rename to filter/rejected_execution_handler.go index b993b8444c14c13ce9a8861c113dc02ca5fd335a..ce95b54b14d01e0aec6f6089799df8378b5bcca5 100644 --- a/filter/common/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package common +package filter import ( "github.com/apache/dubbo-go/common" diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go b/filter/tps/tps_limit_fix_window_strategy.go similarity index 96% rename from filter/impl/tps/impl/tps_limit_fix_window_strategy.go rename to filter/tps/tps_limit_fix_window_strategy.go index 285ecfa658cf838cc1140ba716bd72e1976b86fe..6ea5dc10333739848a96881b6dcf7e4bb54ccbe9 100644 --- a/filter/impl/tps/impl/tps_limit_fix_window_strategy.go +++ b/filter/tps/tps_limit_fix_window_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "sync/atomic" @@ -25,7 +25,7 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) const ( @@ -79,7 +79,7 @@ func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool { type fixedWindowStrategyCreator struct{} -func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { return &FixedWindowTpsLimitStrategyImpl{ rate: int32(rate), interval: int64(interval) * int64(time.Millisecond), // convert to ns diff --git a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go b/filter/tps/tps_limit_fix_window_strategy_test.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go rename to filter/tps/tps_limit_fix_window_strategy_test.go index 7ef539ed3b2b93da5c56a05f606e75282226d1ef..5eaf2f707dcc9dd6cf325988242623dd5161c1a8 100644 --- a/filter/impl/tps/impl/tps_limit_fix_window_strategy_test.go +++ b/filter/tps/tps_limit_fix_window_strategy_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "testing" diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go b/filter/tps/tps_limit_sliding_window_strategy.go similarity index 96% rename from filter/impl/tps/impl/tps_limit_sliding_window_strategy.go rename to filter/tps/tps_limit_sliding_window_strategy.go index d1a5db6e259ffa63282065f881f6cc8360c8d25b..40ea2d14be91a948752455ad8e1a7e611354017a 100644 --- a/filter/impl/tps/impl/tps_limit_sliding_window_strategy.go +++ b/filter/tps/tps_limit_sliding_window_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "container/list" @@ -25,7 +25,7 @@ import ( import ( "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) func init() { @@ -82,7 +82,7 @@ func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool { type slidingWindowStrategyCreator struct{} -func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { return &SlidingWindowTpsLimitStrategyImpl{ rate: rate, interval: int64(interval) * int64(time.Millisecond), diff --git a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go b/filter/tps/tps_limit_sliding_window_strategy_test.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go rename to filter/tps/tps_limit_sliding_window_strategy_test.go index 075f1d9d2be2d18edfee7dc8691b71da65f5da45..57342d1c443993c49c6124f0ef28dae5ebb203e8 100644 --- a/filter/impl/tps/impl/tps_limit_sliding_window_strategy_test.go +++ b/filter/tps/tps_limit_sliding_window_strategy_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "testing" diff --git a/filter/impl/tps/impl/tps_limit_strategy_mock.go b/filter/tps/tps_limit_strategy_mock.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_strategy_mock.go rename to filter/tps/tps_limit_strategy_mock.go index a653fb287a2d89d8c6151889ca14b4b7b4832505..72c658fb9a5d48b6080900a4645d318dfd2b0c21 100644 --- a/filter/impl/tps/impl/tps_limit_strategy_mock.go +++ b/filter/tps/tps_limit_strategy_mock.go @@ -18,7 +18,7 @@ // Source: tps_limit_strategy.go // Package filter is a generated GoMock package. -package impl +package tps import ( gomock "github.com/golang/mock/gomock" diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go b/filter/tps/tps_limit_thread_safe_fix_window_strategy.go similarity index 95% rename from filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go rename to filter/tps/tps_limit_thread_safe_fix_window_strategy.go index 9a1b21a3349845e32cb0fe38b07a7f932ec4f454..faec9b6ec1466e0c2c7d5df9ca0dd82a965494ec 100644 --- a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy.go +++ b/filter/tps/tps_limit_thread_safe_fix_window_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "sync" @@ -23,7 +23,7 @@ import ( import ( "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" ) func init() { @@ -62,7 +62,7 @@ type threadSafeFixedWindowStrategyCreator struct { fixedWindowStrategyCreator *fixedWindowStrategyCreator } -func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { fixedWindowStrategy := creator.fixedWindowStrategyCreator.Create(rate, interval).(*FixedWindowTpsLimitStrategyImpl) return &ThreadSafeFixedWindowTpsLimitStrategyImpl{ fixedWindow: fixedWindowStrategy, diff --git a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go b/filter/tps/tps_limit_thread_safe_fix_window_strategy_test.go similarity index 99% rename from filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go rename to filter/tps/tps_limit_thread_safe_fix_window_strategy_test.go index 129493962403e0028b09f9646054fda236c99ff7..90cd15201cd71aafcc50a1dfb801ece7a5dee26a 100644 --- a/filter/impl/tps/impl/tps_limit_thread_safe_fix_window_strategy_test.go +++ b/filter/tps/tps_limit_thread_safe_fix_window_strategy_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package impl +package tps import ( "testing" diff --git a/filter/impl/tps/impl/tps_limiter_method_service.go b/filter/tps/tps_limiter_method_service.go similarity index 96% rename from filter/impl/tps/impl/tps_limiter_method_service.go rename to filter/tps/tps_limiter_method_service.go index 426ae5994867c5a09653641870ebcef531c0d43c..ac4498a33d195128ad89828f9696b90cbd2db082 100644 --- a/filter/impl/tps/impl/tps_limiter_method_service.go +++ b/filter/tps/tps_limiter_method_service.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package impl +package tps import ( "fmt" @@ -30,7 +30,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" + "github.com/apache/dubbo-go/filter" "github.com/apache/dubbo-go/protocol" ) @@ -127,7 +127,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio limitState, found := limiter.tpsState.Load(limitTarget) if found { - return limitState.(tps.TpsLimitStrategy).IsAllowable() + return limitState.(filter.TpsLimitStrategy).IsAllowable() } limitRate := getLimitConfig(methodLimitRateConfig, url, invocation, @@ -149,7 +149,7 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval))) - return limitState.(tps.TpsLimitStrategy).IsAllowable() + return limitState.(filter.TpsLimitStrategy).IsAllowable() } func getLimitConfig(methodLevelConfig string, @@ -178,7 +178,7 @@ func getLimitConfig(methodLevelConfig string, var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl var methodServiceTpsLimiterOnce sync.Once -func GetMethodServiceTpsLimiter() tps.TpsLimiter { +func GetMethodServiceTpsLimiter() filter.TpsLimiter { methodServiceTpsLimiterOnce.Do(func() { methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ tpsState: concurrent.NewMap(), diff --git a/filter/impl/tps/impl/tps_limiter_method_service_test.go b/filter/tps/tps_limiter_method_service_test.go similarity index 97% rename from filter/impl/tps/impl/tps_limiter_method_service_test.go rename to filter/tps/tps_limiter_method_service_test.go index e747d4682d0a8bdee03da6f012fb76b7bd1e02af..441224a3e35147b85c3553871dcaa1fefd09db04 100644 --- a/filter/impl/tps/impl/tps_limiter_method_service_test.go +++ b/filter/tps/tps_limiter_method_service_test.go @@ -15,13 +15,14 @@ * limitations under the License. */ -package impl +package tps import ( "net/url" "testing" ) import ( + "github.com/apache/dubbo-go/filter" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -30,7 +31,6 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter/impl/tps" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -144,10 +144,10 @@ type mockStrategyCreator struct { rate int interval int t *testing.T - strategy tps.TpsLimitStrategy + strategy filter.TpsLimitStrategy } -func (creator *mockStrategyCreator) Create(rate int, interval int) tps.TpsLimitStrategy { +func (creator *mockStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy { assert.Equal(creator.t, creator.rate, rate) assert.Equal(creator.t, creator.interval, interval) return creator.strategy diff --git a/filter/impl/tps/impl/tps_limiter_mock.go b/filter/tps/tps_limiter_mock.go similarity index 99% rename from filter/impl/tps/impl/tps_limiter_mock.go rename to filter/tps/tps_limiter_mock.go index acd3a15d18baf10838faf57e141afe1711f0aebb..463b0988acbeb17a967c9803337a61c4914bce42 100644 --- a/filter/impl/tps/impl/tps_limiter_mock.go +++ b/filter/tps/tps_limiter_mock.go @@ -18,7 +18,7 @@ // Source: tps_limiter.go // Package filter is a generated GoMock package. -package impl +package tps import ( reflect "reflect" diff --git a/filter/impl/tps/tps_limit_strategy.go b/filter/tps_limit_strategy.go similarity index 98% rename from filter/impl/tps/tps_limit_strategy.go rename to filter/tps_limit_strategy.go index c55f008a09b3743f728ab0506c6b0095cbfd181c..1051c3d96d37619e0e507cc845f144a45a9bb421 100644 --- a/filter/impl/tps/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package tps +package filter /* * please register your implementation by invoking SetTpsLimitStrategy diff --git a/filter/impl/tps/tps_limiter.go b/filter/tps_limiter.go similarity index 98% rename from filter/impl/tps/tps_limiter.go rename to filter/tps_limiter.go index 0622a957a8ba14fdebb52ff44ecf72da17703163..1d2b2341ac7d9b12f75d373909b0baa58bc7295f 100644 --- a/filter/impl/tps/tps_limiter.go +++ b/filter/tps_limiter.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package tps +package filter import ( "github.com/apache/dubbo-go/common" diff --git a/go.mod b/go.mod index c2a61f2db1484338bba7dd1bf00a9ff9de2125df..4d65602d9dc40fc200e140fd4a376534a8ea6a73 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect - github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa + github.com/apache/dubbo-go-hessian2 v1.3.1-0.20200106042936-0e1fc43955b2 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible @@ -13,7 +13,7 @@ require ( github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/creasty/defaults v1.3.0 github.com/dubbogo/getty v1.3.1 - github.com/dubbogo/gost v1.3.0 + github.com/dubbogo/gost v1.5.1 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-errors/errors v1.0.1 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect @@ -31,6 +31,7 @@ require ( github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 diff --git a/go.sum b/go.sum index d19e1b2b91c5049195a44ec923512c8ecb577bbb..90f64a1f71feb5f4b22be52df90fc4321db41b4a 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa h1:11TO1wiM5bvGAVrmfN5atD8gZqUSPE1TBoIs8sI6Abk= -github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/apache/dubbo-go-hessian2 v1.3.1-0.20200106042936-0e1fc43955b2 h1:SFqItOYknm1g4iKgjp2GT7aKBtsgejD3hdVq0UnLUv0= +github.com/apache/dubbo-go-hessian2 v1.3.1-0.20200106042936-0e1fc43955b2/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -104,11 +104,9 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dubbogo/getty v1.3.1 h1:9fehwTo/D6+z6/+kADMbhbKeMkP80o/3g+XwV5lFLTY= github.com/dubbogo/getty v1.3.1/go.mod h1:dtLOEb1v6EMHsQNYRWEACiRLmTWB2kJGUAj1aXayPOg= -github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= -github.com/dubbogo/gost v1.3.0 h1:n90mIUWCPD69BqW8wJ43NDy0RgNxx02aAG4QJcJ785U= -github.com/dubbogo/gost v1.3.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= -github.com/dubbogo/gost v1.4.1 h1:O50pNGzo8P7+YnE9dyZzy7yrApgMjyWvTFts7c1NNuk= +github.com/dubbogo/gost v1.5.1 h1:oG5dzaWf1KYynBaBoUIOkgT+YD0niHV6xxI0Odq7hDg= +github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= @@ -450,6 +448,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8= +github.com/tevid/gohamcrest v1.1.1 h1:ou+xSqlIw1xfGTg1uq1nif/htZ2S3EzRqLm2BP+tYU0= github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 81f392565f701d990dc1783d5d467814a0fba5bf..3e2a243103b888d8b94c2e50fe00daabb3d5a032 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") p.Service.Method = request.method + p.Service.Timeout = c.opts.RequestTimeout + var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") + if len(timeout) != 0 { + if t, err := time.ParseDuration(timeout); err == nil { + p.Service.Timeout = t + } + } + p.Header.SerialID = byte(S_Dubbo) p.Body = hessian.NewRequest(request.args, request.atta) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 758363117f1720a7fe89eb9745b415e506315db8..6b41d5e7d76d31ea23f08b77c841d0f87986bef7 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -89,11 +89,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { return perrors.Errorf("opts[0] is not of type *Client") } - pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) - if !ok { - return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + if p.Header.Type&hessian.PackageRequest != 0x00 { + // size of this array must be '7' + // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 + p.Body = make([]interface{}, 7) + } else { + pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) + if !ok { + return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } // read body diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 59d1ea05160696754b46dfead5713684aa7a94f7..eed22a29cde59b520f1b6ebf9b5baafabcac931f 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -19,10 +19,12 @@ package dubbo import ( "sync" + "time" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" @@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { + //default requestTimeout + var requestTimeout = config.GetConsumerConfig().RequestTimeout + + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + invoker := NewDubboInvoker(url, NewClient(Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, - RequestTimeout: config.GetConsumerConfig().RequestTimeout, + RequestTimeout: requestTimeout, })) dp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index df9ab28e0e4b896b11b2345a83cae14401a70759..2e4b3999dfc08262a2cfb80f29c9a9e7bc2decf8 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -85,11 +85,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { } if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) - if p.Err != nil { - logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + if p.Header.Type&hessian.PackageResponse != 0x00 { + logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + if p.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + } + h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) + } else { + logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) + p.Header.ResponseStatus = hessian.Response_OK + reply(session, p, hessian.PackageHeartbeat) } - h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) return } logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) @@ -199,7 +205,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { // heartbeat if p.Header.Type&hessian.PackageHeartbeat != 0x00 { logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - h.reply(session, p, hessian.PackageHeartbeat) + reply(session, p, hessian.PackageHeartbeat) return } @@ -226,7 +232,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if !twoway { return } - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) } }() @@ -241,7 +247,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { logger.Errorf(err.Error()) p.Header.ResponseStatus = hessian.Response_OK p.Body = err - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) return } invoker := exporter.(protocol.Exporter).GetInvoker() @@ -266,7 +272,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if !twoway { return } - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) } func (h *RpcServerHandler) OnCron(session getty.Session) { @@ -294,7 +300,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { +func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ SerialID: req.Header.SerialID, diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 930382cca8bac6955b516a88e93ce26d73e235fe..e9dff1cfc77fb34ba75e604334d9c7ab5cfa36d7 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -62,8 +62,10 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface return nil, 0, perrors.WithStack(err) } - pkg.Err = pkg.Body.(*hessian.Response).Exception - pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + if pkg.Header.Type&hessian.PackageRequest == 0x00 { + pkg.Err = pkg.Body.(*hessian.Response).Exception + pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + } return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index c18345d413edb2d263f1acaef1741514b665f042..a2e7afe69a138e0cd6dbbe05b3f386647895ee15 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -20,10 +20,12 @@ package jsonrpc import ( "strings" "sync" + "time" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" @@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { + //default requestTimeout + var requestTimeout = config.GetConsumerConfig().RequestTimeout + + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, - HTTPTimeout: config.GetConsumerConfig().RequestTimeout, + HTTPTimeout: requestTimeout, })) jp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 0752f5c3c3b174b188d5f1ba547f3155ef7fcfb1..5246bfc15be1237576555539d9a13b9e848e3497 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -26,7 +26,7 @@ import ( ) import ( - "github.com/dubbogo/gost/container/gxset" + gxset "github.com/dubbogo/gost/container/set" ) import (