Skip to content
Snippets Groups Projects
Commit 32cf1c88 authored by vito.he's avatar vito.he
Browse files

Add:add check configuration for client

parent f0f69abe
No related branches found
No related tags found
No related merge requests found
......@@ -18,11 +18,13 @@ import (
import (
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/version"
)
var (
consumerConfig *ConsumerConfig
providerConfig *ProviderConfig
maxWait = 3
)
// loaded comsumer & provider config from xxx.yml, and log config from xxx.xml
......@@ -137,7 +139,7 @@ type ConsumerConfig struct {
Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"`
RequestTimeout time.Duration
Check *bool `yaml:"check" json:"check,omitempty"`
// application
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
......@@ -178,9 +180,6 @@ type ProviderConfig struct {
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty"`
}
func SetProviderConfig(p ProviderConfig) {
providerConfig = &p
}
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
log.Warn("providerConfig is nil!")
......@@ -231,6 +230,30 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
con.Implement(rpcService)
refMap[con.InterfaceName] = con
}
//wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
for _, refconfig := range consumerConfig.References {
if ((refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil)) && //default to true
!refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
panic(fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, version.Version))
}
time.Sleep(time.Second * 1)
break
}
}
if checkok {
break
}
checkok = true
}
}
// service config
......
# dubbo client yaml configure file
# pprof
pprof_enabled : true
pprof_port : 10086
# client
request_timeout : "3500ms"
# connect timeout
connect_timeout : "100ms"
# application config
application_config:
organization : "ikurento.com"
name : "BDTService"
module : "dubbogo user-info client"
version : "0.0.1"
owner : "ZX"
environment : "dev"
registries :
- id: "hangzhouzk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
# default: "true"
- id: "shanghaizk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
references:
- registries :
- "hangzhouzk"
- "shanghaizk"
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
retries: 3
protocol_conf:
dubbo:
connection_number: 2
heartbeat_period: "5s"
session_timeout: "20s"
fail_fast_timeout: "5s"
pool_size: 64
pool_ttl: 600
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
session_name: "client"
# dubbo server yaml configure file
# pprof
pprof_enabled : true
pprof_port : 20080
# application config
application_config:
organization : "ikurento.com"
name : "BDTService"
module : "dubbogo user-info server"
version : "0.0.1"
owner : "ZX"
environment : "dev"
registries :
- id: "hangzhouzk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
- id: "shanghaizk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
services:
- registries:
- "hangzhouzk"
- "shanghaizk"
protocol : "dubbo,jsonrpc"
# 相当于dubbo.xml中的interface
interface : "com.ikurento.user.UserProvider"
loadbalance: "random"
warmup: "100"
cluster: "failover"
methods:
- name: "GetUser"
retries: 1
loadbalance: "random"
protocols:
- name: "dubbo"
# while using dubbo protocol, ip cannot is 127.0.0.1, because client of java-dubbo will get 'connection refuse'
ip : "127.0.0.1"
port : 20000
- name: "jsonrpc"
ip: "127.0.0.1"
port: 20001
protocol_conf:
dubbo:
session_number: 700
fail_fast_timeout: "5s"
session_timeout: "20s"
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
session_name: "server"
......@@ -20,6 +20,7 @@ type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"`
Check *bool `yaml:"check" json:"check,omitempty"`
Protocol string `yaml:"protocol" json:"protocol,omitempty"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
......@@ -41,7 +42,16 @@ type ConfigRegistry string
func NewReferenceConfig(ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{context: ctx}
}
func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type rf ReferenceConfig
raw := rf{} // Put your defaults here
if err := unmarshal(&raw); err != nil {
return err
}
*refconfig = ReferenceConfig(raw)
return nil
}
func (refconfig *ReferenceConfig) Refer() {
//首先是user specified SubURL, could be peer-to-peer address, or register center's address.
......@@ -65,6 +75,7 @@ func (refconfig *ReferenceConfig) Refer() {
cluster := extension.GetCluster("registryAware")
refconfig.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
......
......@@ -8,7 +8,7 @@ pprof_port : 10086
request_timeout : "100ms"
# connect timeout
connect_timeout : "100ms"
check: true
# application config
application_config:
organization : "ikurento.com"
......@@ -29,7 +29,7 @@ registries :
- id: "shanghaizk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
address: "127.0.0.1:2182"
username: ""
password: ""
......
......@@ -3,7 +3,7 @@
# pprof
pprof_enabled : true
pprof_port : 10086
check: true
# client
request_timeout : "3s"
# connect timeout
......@@ -29,7 +29,7 @@ registries :
- id: "shanghaizk"
type: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
address: "127.0.0.1:2182"
username: ""
password: ""
......
......@@ -190,7 +190,16 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.In
}
func (dir *registryDirectory) IsAvailable() bool {
return dir.BaseDirectory.IsAvailable()
if !dir.BaseDirectory.IsAvailable() {
return dir.BaseDirectory.IsAvailable()
} else {
for _, ivk := range dir.cacheInvokers {
if ivk.IsAvailable() {
return true
}
}
}
return false
}
func (dir *registryDirectory) Destroy() {
......
......@@ -77,7 +77,7 @@ func TestSubscribe_Group(t *testing.T) {
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
func Test_Destory(t *testing.T) {
func Test_Destroy(t *testing.T) {
registryDirectory, _ := normalRegistryDir()
time.Sleep(1e9)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment