diff --git a/common/logger/logger.go b/common/logger/logger.go index e8dd5a608b68c7cbde0d8c6b85c1601c847fea01..b0afd9c10224125da6af31206a9f5e1ed4dd31a5 100644 --- a/common/logger/logger.go +++ b/common/logger/logger.go @@ -85,8 +85,6 @@ func InitLog(logConfFile string) error { InitLogger(conf) - // set getty log - getty.SetLogger(logger) return nil } @@ -112,6 +110,9 @@ func InitLogger(conf *zap.Config) { } zapLogger, _ := zapLoggerConfig.Build(zap.AddCallerSkip(1)) logger = zapLogger.Sugar() + + // set getty log + getty.SetLogger(logger) } func SetLogger(log Logger) { diff --git a/config/config_loader.go b/config/config_loader.go index 3e73061890e410fe2f36cca2751b0e3ff3fb758b..0dd0fb7f963e09a4402f4fd01c8a6b91959a7ca3 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -46,11 +46,11 @@ func init() { confConFile = os.Getenv(constant.CONF_CONSUMER_FILE_PATH) confProFile = os.Getenv(constant.CONF_PROVIDER_FILE_PATH) - if errCon := consumerInit(confConFile); errCon != nil { + if errCon := ConsumerInit(confConFile); errCon != nil { log.Printf("[consumerInit] %#v", errCon) consumerConfig = nil } - if errPro := providerInit(confProFile); errPro != nil { + if errPro := ProviderInit(confProFile); errPro != nil { log.Printf("[providerInit] %#v", errPro) providerConfig = nil } @@ -58,11 +58,6 @@ func init() { // Dubbo Init func Load() { - var ( - refMap map[string]*ReferenceConfig - srvMap map[string]*ServiceConfig - ) - // reference config if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") @@ -70,7 +65,6 @@ func Load() { if err := configCenterRefreshConsumer(); err != nil { logger.Errorf("[consumer config center refresh] %#v", err) } - refMap = make(map[string]*ReferenceConfig) for key, ref := range consumerConfig.References { rpcService := GetConsumerService(key) @@ -81,8 +75,6 @@ func Load() { ref.id = key ref.Refer() ref.Implement(rpcService) - refMap[key] = ref - } //wait for invoker is available, if wait over default 3s, then panic var count int @@ -122,7 +114,6 @@ func Load() { if err := configCenterRefreshProvider(); err != nil { logger.Errorf("[provider config center refresh] %#v", err) } - srvMap = make(map[string]*ServiceConfig) for key, svs := range providerConfig.Services { rpcService := GetProviderService(key) if rpcService == nil { @@ -134,7 +125,6 @@ func Load() { if err := svs.Export(); err != nil { panic(fmt.Sprintf("service %s export failed! ", key)) } - srvMap[key] = svs } } } @@ -146,5 +136,5 @@ func GetRPCService(name string) common.RPCService { // create rpc service for consumer func RPCService(service common.RPCService) { - providerConfig.Services[service.Reference()].Implement(service) + consumerConfig.References[service.Reference()].Implement(service) } diff --git a/config/config_loader_test.go b/config/config_loader_test.go index fb4f229328ed24059e6fb33489cc701ef7d0a5bd..107fea0b1d737f7be92d3e0042b6eebb7add78ed 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -45,9 +45,9 @@ func TestConfigLoader(t *testing.T) { assert.Nil(t, providerConfig) assert.Equal(t, ProviderConfig{}, GetProviderConfig()) - err = consumerInit(conPath) + err = ConsumerInit(conPath) assert.NoError(t, err) - err = providerInit(proPath) + err = ProviderInit(proPath) assert.NoError(t, err) assert.NotNil(t, consumerConfig) @@ -129,10 +129,10 @@ func TestConfigLoaderWithConfigCenter(t *testing.T) { assert.Nil(t, providerConfig) assert.Equal(t, ProviderConfig{}, GetProviderConfig()) - err = consumerInit(conPath) + err = ConsumerInit(conPath) configCenterRefreshConsumer() assert.NoError(t, err) - err = providerInit(proPath) + err = ProviderInit(proPath) configCenterRefreshProvider() assert.NoError(t, err) diff --git a/config/consumer_config.go b/config/consumer_config.go index 5d3aec18e91ab3d9284c00fac3838d16414f2755..737339a0ad195201327eb4fac3445b18eb4bbf26 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -61,6 +61,7 @@ func (*ConsumerConfig) Prefix() string { func SetConsumerConfig(c ConsumerConfig) { consumerConfig = &c } + func GetConsumerConfig() ConsumerConfig { if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") @@ -69,7 +70,7 @@ func GetConsumerConfig() ConsumerConfig { return *consumerConfig } -func consumerInit(confConFile string) error { +func ConsumerInit(confConFile string) error { if confConFile == "" { return perrors.Errorf("application configure(consumer) file name is nil") } diff --git a/config/provider_config.go b/config/provider_config.go index fc7a4d50d2ede6c3a64dade9c90914e3b5d51779..a504eea237dc47f66c4ed27d334ce5eea5c87d45 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -63,7 +63,7 @@ func GetProviderConfig() ProviderConfig { return *providerConfig } -func providerInit(confProFile string) error { +func ProviderInit(confProFile string) error { if len(confProFile) == 0 { return perrors.Errorf("application configure(provider) file name is nil") } diff --git a/config/reference_config.go b/config/reference_config.go index f76c973ed53d0fec9e92e33ed6dbb8dcfdcd3a6a..a5b7d50db51df02cd1871b3fb6d0bd2d6a545dd2 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -61,8 +61,9 @@ func (c *ReferenceConfig) Prefix() string { return constant.ReferenceConfigPrefix + c.InterfaceName + "." } -func NewReferenceConfig(ctx context.Context) *ReferenceConfig { - return &ReferenceConfig{context: ctx} +// The only way to get a new ReferenceConfig +func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig { + return &ReferenceConfig{id: id, context: ctx} } func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { diff --git a/config/service_config.go b/config/service_config.go index 151829f270f3a8a9e9e57440da680581ac2ea232..76913319f6f0ac27ec36ebaa2d89db09bbd79c7c 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -68,8 +68,12 @@ func (c *ServiceConfig) Prefix() string { return constant.ServiceConfigPrefix + c.InterfaceName + "." } -func NewServiceConfig() *ServiceConfig { +// The only way to get a new ServiceConfig +func NewServiceConfig(id string, context context.Context) *ServiceConfig { + return &ServiceConfig{ + context: context, + id: id, unexported: atomic.NewBool(false), exported: atomic.NewBool(false), } diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index 96baa1a621dbf9138d186cb0355445fa00e5fdd5..85e0eb2fe980b461c3eea1741d53c3b5064b8d47 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -54,7 +54,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/config/testdata/consumer_config_with_configcenter.yml b/config/testdata/consumer_config_with_configcenter.yml index 6a60c3083793eee05cb386392cbe2973f72b08fd..1a33ec3971c231012265d0ce078aedf2f851a215 100644 --- a/config/testdata/consumer_config_with_configcenter.yml +++ b/config/testdata/consumer_config_with_configcenter.yml @@ -21,7 +21,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/config/testdata/provider_config.yml b/config/testdata/provider_config.yml index 621de175ab27956239390828f239eb364479455c..3b6e43a51adabfa5ed2abbc3ef0f261ade576127 100644 --- a/config/testdata/provider_config.yml +++ b/config/testdata/provider_config.yml @@ -55,7 +55,6 @@ protocols: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index 3ff83a4482fd1217817aa60e86866d4b037238a2..e1131df77e0709b78e897c8a2686c60f2b3bd8e5 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -66,7 +66,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index 18fef2e78ff7ee15122e6a3ac421ffd230deec0e..dabde06c753174e41a3655833877169a124f4ee3 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -66,7 +66,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/examples/dubbo/go-client/profiles/test/client.yml b/examples/dubbo/go-client/profiles/test/client.yml index 44b50fad65168da92940e2e7faac01c6fd6c6165..e0b742eff4c4652114bc43cbf315365cac41d242 100644 --- a/examples/dubbo/go-client/profiles/test/client.yml +++ b/examples/dubbo/go-client/profiles/test/client.yml @@ -66,7 +66,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/examples/dubbo/go-server/profiles/dev/server.yml b/examples/dubbo/go-server/profiles/dev/server.yml index 6ccceea62117a79f1d0b49fabc4b02306c4ee9a9..2f0decaec6b1ad8bc46d1cea5f3c3d04dc8bc144 100644 --- a/examples/dubbo/go-server/profiles/dev/server.yml +++ b/examples/dubbo/go-server/profiles/dev/server.yml @@ -76,7 +76,6 @@ protocols: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/dubbo/go-server/profiles/release/server.yml b/examples/dubbo/go-server/profiles/release/server.yml index d759946a070663719f64809ddf65d19d812312c4..92306b706785c55e03c5a496b613b32ca361875c 100644 --- a/examples/dubbo/go-server/profiles/release/server.yml +++ b/examples/dubbo/go-server/profiles/release/server.yml @@ -75,7 +75,6 @@ protocols: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/dubbo/go-server/profiles/test/server.yml b/examples/dubbo/go-server/profiles/test/server.yml index f2cd38dc023578621ca5e4cd8c872b25b9e44ffc..03140d6bde21b79bd63719328cff0a5732ab47b3 100644 --- a/examples/dubbo/go-server/profiles/test/server.yml +++ b/examples/dubbo/go-server/profiles/test/server.yml @@ -75,7 +75,6 @@ protocols: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml b/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml index c809041f14d15230a02f154571c02a038ee2ca92..1ebdf5bb5c97c3f45d9a0b551afd78866868d4d0 100644 --- a/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml +++ b/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml @@ -21,7 +21,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml b/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml index c809041f14d15230a02f154571c02a038ee2ca92..1ebdf5bb5c97c3f45d9a0b551afd78866868d4d0 100644 --- a/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml +++ b/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml @@ -21,7 +21,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml b/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml index c809041f14d15230a02f154571c02a038ee2ca92..1ebdf5bb5c97c3f45d9a0b551afd78866868d4d0 100644 --- a/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml +++ b/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml @@ -21,7 +21,6 @@ protocol_conf: connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" - fail_fast_timeout: "5s" pool_size: 64 pool_ttl: 600 getty_session_param: diff --git a/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml b/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml index 05f758592f5aa69e097f19d14299e15bc9bf5f0f..a2ae2fe5d89519056e07fbc11d7d34407d449fed 100644 --- a/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml +++ b/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml @@ -23,7 +23,6 @@ services: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml b/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml index 05f758592f5aa69e097f19d14299e15bc9bf5f0f..a2ae2fe5d89519056e07fbc11d7d34407d449fed 100644 --- a/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml +++ b/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml @@ -23,7 +23,6 @@ services: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml b/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml index 05f758592f5aa69e097f19d14299e15bc9bf5f0f..a2ae2fe5d89519056e07fbc11d7d34407d449fed 100644 --- a/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml +++ b/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml @@ -23,7 +23,6 @@ services: protocol_conf: dubbo: session_number: 700 - fail_fast_timeout: "5s" session_timeout: "20s" getty_session_param: compress_encoding: false diff --git a/examples/jsonrpc/go-client/profiles/dev/client.yml b/examples/jsonrpc/go-client/profiles/dev/client.yml index c8f2ee3a33adccf5764c0285bc9643d24b5da23e..788e06eecd6014eefaf12913b6394e5e0a95efdf 100644 --- a/examples/jsonrpc/go-client/profiles/dev/client.yml +++ b/examples/jsonrpc/go-client/profiles/dev/client.yml @@ -58,27 +58,3 @@ references: methods: - name: "GetUser" retries: 3 - -protocol_conf: - dubbo: - reconnect_interval: 0 - connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - fail_fast_timeout: "5s" - pool_size: 64 - pool_ttl: 600 - getty_session_param: - compress_encoding: false - tcp_no_delay: true - tcp_keep_alive: true - keep_alive_period: "120s" - tcp_r_buf_size: 262144 - tcp_w_buf_size: 65536 - pkg_rq_size: 1024 - pkg_wq_size: 512 - tcp_read_timeout: "1s" - tcp_write_timeout: "5s" - wait_timeout: "1s" - max_msg_len: 10240 - session_name: "client" diff --git a/examples/jsonrpc/go-client/profiles/release/client.yml b/examples/jsonrpc/go-client/profiles/release/client.yml index e521f4c58c86deea07471b12fd91d66087cee796..0084e5b04d48fea480f22df8e031eb91e1d6e835 100644 --- a/examples/jsonrpc/go-client/profiles/release/client.yml +++ b/examples/jsonrpc/go-client/profiles/release/client.yml @@ -58,27 +58,3 @@ references: methods: - name: "GetUser" retries: 3 - -protocol_conf: - dubbo: - reconnect_interval: 0 - connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - fail_fast_timeout: "5s" - pool_size: 64 - pool_ttl: 600 - getty_session_param: - compress_encoding: false - tcp_no_delay: true - tcp_keep_alive: true - keep_alive_period: "120s" - tcp_r_buf_size: 262144 - tcp_w_buf_size: 65536 - pkg_rq_size: 1024 - pkg_wq_size: 512 - tcp_read_timeout: "1s" - tcp_write_timeout: "5s" - wait_timeout: "1s" - max_msg_len: 10240 - session_name: "client" diff --git a/examples/jsonrpc/go-client/profiles/test/client.yml b/examples/jsonrpc/go-client/profiles/test/client.yml index 086f7b96995c3ab3272f3a73af09d6cba68db1a6..3efdedad4ab8acffb9d8724273deb8c12117837d 100644 --- a/examples/jsonrpc/go-client/profiles/test/client.yml +++ b/examples/jsonrpc/go-client/profiles/test/client.yml @@ -58,27 +58,3 @@ references: methods: - name: "GetUser" retries: 3 - -protocol_conf: - dubbo: - reconnect_interval: 0 - connection_number: 2 - heartbeat_period: "5s" - session_timeout: "20s" - fail_fast_timeout: "5s" - pool_size: 64 - pool_ttl: 600 - getty_session_param: - compress_encoding: false - tcp_no_delay: true - tcp_keep_alive: true - keep_alive_period: "120s" - tcp_r_buf_size: 262144 - tcp_w_buf_size: 65536 - pkg_rq_size: 1024 - pkg_wq_size: 512 - tcp_read_timeout: "1s" - tcp_write_timeout: "5s" - wait_timeout: "1s" - max_msg_len: 10240 - session_name: "client" diff --git a/go.mod b/go.mod index 8fd3c215fbb38b9c54a6570630f5ef32ad8f56c1..75a3cea2c4cd3ca6f4a970c285aff6ba6ad1bb9b 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/apache/dubbo-go require ( - github.com/dubbogo/getty v1.0.7 + github.com/dubbogo/getty v1.1.2-0.20190703052458-5e72c69fddc7 github.com/dubbogo/gost v1.0.0 github.com/dubbogo/hessian2 v1.1.2 github.com/magiconair/properties v1.8.1 diff --git a/go.sum b/go.sum index 859e9aab1c882f3864668b45c6ac47ce7f01b262..3cc46c3f275b72829e8ab254f2c9a8d66198def2 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dubbogo/getty v1.0.7 h1:5Hg+JwXyCKm9Yr4yJkm98ahhnoa8c2h6br5QJxwQ+YU= -github.com/dubbogo/getty v1.0.7/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo= +github.com/dubbogo/getty v1.1.2-0.20190703052458-5e72c69fddc7 h1:WDzbrHOfdO78mFp3RJgGRM0u2+DEwrVzo58InIdgcX8= +github.com/dubbogo/getty v1.1.2-0.20190703052458-5e72c69fddc7/go.mod h1:DfbsJXdXj6DliCAEgrRbwC7J5rz+2sOPGKKEvlF6OjE= github.com/dubbogo/gost v1.0.0 h1:obKvpJYdrIY2BidHYwYoj2E50OtwCDqVVVTcH2nnhAY= github.com/dubbogo/gost v1.0.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7 h1:Wmt8yQMGkNx4GDUvU4CA+dwIsDwgi+DbP28NZV2ruqQ= +github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7/go.mod h1:lQ7PmKvs6xplvjzEEMKw8XmP20D9raD+wFfzxkKaBd4= github.com/dubbogo/hessian2 v1.1.2 h1:SRkPzIwVv2D+ZUOCE2XuI5kANoL01ShhAheLcc3usJE= github.com/dubbogo/hessian2 v1.1.2/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 8ba46e7b0b321095652fb7c6d0c1c7403e1fea1e..ebbc28e1cb99730c9d37bc2356c003bcb0b799f0 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -93,53 +93,15 @@ func GetClientConf() ClientConfig { return *clientConf } -type CallOptions struct { +type Options struct { + // connect timeout + ConnectTimeout time.Duration // request timeout RequestTimeout time.Duration - // response timeout - ResponseTimeout time.Duration - // serial ID - SerialID SerialID - Meta map[interface{}]interface{} } -type CallOption func(*CallOptions) - -//func WithCallRequestTimeout(d time.Duration) CallOption { -// return func(o *CallOptions) { -// o.RequestTimeout = d -// } -//} -// -//func WithCallResponseTimeout(d time.Duration) CallOption { -// return func(o *CallOptions) { -// o.ResponseTimeout = d -// } -//} -// -//func WithCallSerialID(s SerialID) CallOption { -// return func(o *CallOptions) { -// o.SerialID = s -// } -//} -// -//func WithCallMeta_All(callMeta map[interface{}]interface{}) CallOption { -// return func(o *CallOptions) { -// o.Meta = callMeta -// } -//} - -//func WithCallMeta(k, v interface{}) CallOption { -// return func(o *CallOptions) { -// if o.Meta == nil { -// o.Meta = make(map[interface{}]interface{}) -// } -// o.Meta[k] = v -// } -//} - type CallResponse struct { - Opts CallOptions + Opts Options Cause error Start time.Time // invoke(call) start time == write start time ReadStart time.Time // read start time, write duration = ReadStart - Start @@ -149,6 +111,7 @@ type CallResponse struct { type AsyncCallback func(response CallResponse) type Client struct { + opts Options conf ClientConfig pool *gettyRPCClientPool sequence atomic.Uint64 @@ -156,9 +119,18 @@ type Client struct { pendingResponses *sync.Map } -func NewClient() *Client { +func NewClient(opt Options) *Client { + + switch { + case opt.ConnectTimeout == 0: + opt.ConnectTimeout = 3e9 + fallthrough + case opt.RequestTimeout == 0: + opt.RequestTimeout = 3e9 + } c := &Client{ + opts: opt, pendingResponses: new(sync.Map), conf: *clientConf, } @@ -168,64 +140,38 @@ func NewClient() *Client { } // call one way -func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}, opts ...CallOption) error { - var copts CallOptions - - for _, o := range opts { - o(&copts) - } +func (c *Client) CallOneway(addr string, svcUrl common.URL, method string, args interface{}) error { - return perrors.WithStack(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil, copts)) + return perrors.WithStack(c.call(CT_OneWay, addr, svcUrl, method, args, nil, nil)) } // if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}, opts ...CallOption) error { - var copts CallOptions - - for _, o := range opts { - o(&copts) - } +func (c *Client) Call(addr string, svcUrl common.URL, method string, args, reply interface{}) error { ct := CT_TwoWay if reply == nil { ct = CT_OneWay } - return perrors.WithStack(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) + return perrors.WithStack(c.call(ct, addr, svcUrl, method, args, reply, nil)) } func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args interface{}, - callback AsyncCallback, reply interface{}, opts ...CallOption) error { - - var copts CallOptions - for _, o := range opts { - o(&copts) - } + callback AsyncCallback, reply interface{}) error { - return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) + return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback)) } func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, - args, reply interface{}, callback AsyncCallback, opts CallOptions) error { - - if opts.RequestTimeout == 0 { - opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout - } - if opts.ResponseTimeout == 0 { - opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout - } + args, reply interface{}, callback AsyncCallback) error { p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, "") p.Service.Method = method - p.Service.Timeout = opts.RequestTimeout - if opts.SerialID == 0 { - p.Header.SerialID = byte(S_Dubbo) - } else { - p.Header.SerialID = byte(opts.SerialID) - } + p.Service.Timeout = c.opts.RequestTimeout + p.Header.SerialID = byte(S_Dubbo) p.Body = args var rsp *PendingResponse @@ -234,7 +180,6 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string rsp = NewPendingResponse() rsp.reply = reply rsp.callback = callback - rsp.opts = opts } else { p.Header.Type = hessian.PackageRequest } @@ -245,13 +190,15 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string conn *gettyRPCClient ) conn, session, err = c.selectSession(addr) - if err != nil || session == nil { - logger.Warnf("%s, %v", errSessionNotExist.Error(), err) + if err != nil { + return perrors.WithStack(err) + } + if session == nil { return errSessionNotExist } defer c.pool.release(conn, err) - if err = c.transfer(session, p, rsp, opts); err != nil { + if err = c.transfer(session, p, rsp); err != nil { return perrors.WithStack(err) } @@ -260,7 +207,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string } select { - case <-time.After(opts.ResponseTimeout): + case <-time.After(c.opts.RequestTimeout): err = errClientReadTimeout c.removePendingResponse(SequenceType(rsp.seq)) case <-rsp.done: @@ -286,11 +233,11 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err } func (c *Client) heartbeat(session getty.Session) error { - return c.transfer(session, nil, NewPendingResponse(), CallOptions{}) + return c.transfer(session, nil, NewPendingResponse()) } func (c *Client) transfer(session getty.Session, pkg *DubboPackage, - rsp *PendingResponse, opts CallOptions) error { + rsp *PendingResponse) error { var ( sequence uint64 @@ -313,7 +260,7 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, c.addPendingResponse(rsp) } - err = session.WritePkg(pkg, opts.RequestTimeout) + err = session.WritePkg(pkg, c.opts.RequestTimeout) if err != nil { c.removePendingResponse(SequenceType(rsp.seq)) } else if rsp != nil { // cond2 diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index 2f9697dc757f36e53cfd985f4cef6258c6371095..a0da21d5dd6298b27880441be9782941a66b8892 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -42,6 +42,10 @@ func TestClient_CallOneway(t *testing.T) { c := &Client{ pendingResponses: new(sync.Map), conf: *clientConf, + opts: Options{ + ConnectTimeout: 3e9, + RequestTimeout: 6e9, + }, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -59,6 +63,10 @@ func TestClient_Call(t *testing.T) { c := &Client{ pendingResponses: new(sync.Map), conf: *clientConf, + opts: Options{ + ConnectTimeout: 3e9, + RequestTimeout: 10e9, + }, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -120,6 +128,10 @@ func TestClient_AsyncCall(t *testing.T) { c := &Client{ pendingResponses: new(sync.Map), conf: *clientConf, + opts: Options{ + ConnectTimeout: 3e9, + RequestTimeout: 6e9, + }, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -152,7 +164,6 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { ConnectionNum: 2, HeartbeatPeriod: "5s", SessionTimeout: "20s", - FailFastTimeout: "5s", PoolTTL: 600, PoolSize: 64, GettySessionParam: GettySessionParam{ @@ -173,9 +184,8 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { }) assert.NoError(t, clientConf.CheckValidity()) SetServerConfig(ServerConfig{ - SessionNumber: 700, - SessionTimeout: "20s", - FailFastTimeout: "5s", + SessionNumber: 700, + SessionTimeout: "20s", GettySessionParam: GettySessionParam{ CompressEncoding: false, TcpNoDelay: true, diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index cb57fc896f5e1c0be99a1e5978841ae17503064b..790397da35ced7cff76e4b4027ddba883ed9f250 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -112,7 +112,6 @@ type PendingResponse struct { readStart time.Time callback AsyncCallback reply interface{} - opts CallOptions done chan struct{} } @@ -125,7 +124,6 @@ func NewPendingResponse() *PendingResponse { func (r PendingResponse) GetCallResponse() CallResponse { return CallResponse{ - Opts: r.opts, Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index 0af1a4a41116fca06d6793fddaf7f69ade2f30c0..e59b7f2fd85fb216adb86a8167980abed45be9e3 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -48,32 +48,17 @@ type ( // Config holds supported types by the multiconfig package ServerConfig struct { - // local address - //AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"` - //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` - //Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]` - //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` - // session SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` sessionTimeout time.Duration SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"` - // app - FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` - failFastTimeout time.Duration - // session tcp parameters GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` } // Config holds supported types by the multiconfig package ClientConfig struct { - // local address - //AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"` - //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` - //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` - ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"` // session pool @@ -87,10 +72,6 @@ type ( SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` sessionTimeout time.Duration - // app - FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` - failFastTimeout time.Duration - // Connection Pool PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"` PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"` @@ -125,6 +106,8 @@ func (c *GettySessionParam) CheckValidity() error { func (c *ClientConfig) CheckValidity() error { var err error + c.ReconnectInterval = c.ReconnectInterval * 1e6 + if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil { return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod) } @@ -133,10 +116,6 @@ func (c *ClientConfig) CheckValidity() error { return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) } - if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil { - return perrors.WithMessagef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout) - } - return perrors.WithStack(c.GettySessionParam.CheckValidity()) } @@ -147,9 +126,5 @@ func (c *ServerConfig) CheckValidity() error { return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) } - if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil { - return perrors.WithMessagef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout) - } - return perrors.WithStack(c.GettySessionParam.CheckValidity()) } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index e88e782730e14387169a5201d94fe898cfbba0a7..09a4c128b600e605de616a65027da9b2ce6fcb20 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -38,6 +38,10 @@ func TestDubboInvoker_Invoke(t *testing.T) { c := &Client{ pendingResponses: new(sync.Map), conf: *clientConf, + opts: Options{ + ConnectTimeout: 3e9, + RequestTimeout: 6e9, + }, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index a2df3d91b2fa6b1ef8907ecba8832368d0613b8e..4438a0b3d0e32127536b818806d190a2d2a5a2ba 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol" "sync" ) @@ -63,7 +64,10 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { } func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { - invoker := NewDubboInvoker(url, NewClient()) + invoker := NewDubboInvoker(url, NewClient(Options{ + ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, + RequestTimeout: config.GetConsumerConfig().RequestTimeout, + })) dp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) return invoker diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index de205fa75f5fe8e8ef75c7c0f12cc2f78b3c397b..23e860c19c1ec0540da54e99a4d6c84496c18849 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -62,16 +62,18 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge getty.WithReconnectInterval(pool.rpcClient.conf.ReconnectInterval), ), } - c.gettyClient.RunEventLoop(c.newSession) + go c.gettyClient.RunEventLoop(c.newSession) idx := 1 + times := int(pool.rpcClient.opts.ConnectTimeout / 1e6) for { idx++ if c.isAvailable() { break } - if idx > 5000 { - return nil, perrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr)) + if idx > times { + c.gettyClient.Close() + return nil, perrors.New(fmt.Sprintf("failed to create client connection to %s in %f seconds", addr, float32(times)/1000)) } time.Sleep(1e6) } @@ -166,7 +168,9 @@ func (c *gettyRPCClient) removeSession(session getty.Session) { } logger.Infof("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) if len(c.sessions) == 0 { + c.pool.Lock() c.close() // -> pool.remove(c) + c.pool.Unlock() } } @@ -193,8 +197,8 @@ func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, err error rpcSession rpcSession ) - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() if c.sessions == nil { return rpcSession, errClientClosed } @@ -241,11 +245,11 @@ func (c *gettyRPCClient) close() error { type gettyRPCClientPool struct { rpcClient *Client - size int // []*gettyRPCClient鏁扮粍鐨剆ize - ttl int64 // 姣忎釜gettyRPCClient鐨勬湁鏁堟湡鏃堕棿. pool瀵硅薄浼氬湪getConn鏃舵墽琛宼tl妫€鏌� + size int // size of []*gettyRPCClient + ttl int64 // ttl of every gettyRPCClient, it is checked when getConn sync.Mutex - conns []*gettyRPCClient // 浠嶽]*gettyRPCClient 鍙key鏄繛鎺ュ湴鍧€锛岃€寁alue鏄搴旇繖涓湴鍧€鐨勮繛鎺ユ暟缁� + conns []*gettyRPCClient } func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool { @@ -270,7 +274,6 @@ func (p *gettyRPCClientPool) close() { func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { p.Lock() - defer p.Unlock() if p.conns == nil { return nil, errClientPoolClosed } @@ -287,10 +290,11 @@ func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPC } conn.created = now //update created time + p.Unlock() return conn, nil } - // create new conn + p.Unlock() return newGettyRPCClientConn(p, protocol, addr) }