Skip to content
Snippets Groups Projects
Commit 289a9cdb authored by fangyincheng's avatar fangyincheng
Browse files

Fix:fix bug

parent 3b0cb6a3
No related branches found
No related tags found
No related merge requests found
Showing
with 324 additions and 489 deletions
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
// rpc service interface // rpc service interface
type RPCService interface { type RPCService interface {
Service() string // Service interfaceName Service() string // Service InterfaceName
Version() string Version() string
} }
......
...@@ -82,16 +82,11 @@ func consumerInit(confConFile string) error { ...@@ -82,16 +82,11 @@ func consumerInit(confConFile string) error {
if err != nil { if err != nil {
return fmt.Errorf("ioutil.ReadFile(file:%s) = error:%s", confConFile, jerrors.ErrorStack(err)) return fmt.Errorf("ioutil.ReadFile(file:%s) = error:%s", confConFile, jerrors.ErrorStack(err))
} }
consumerConfig = &ConsumerConfig{}
err = yaml.Unmarshal(confFileStream, consumerConfig) err = yaml.Unmarshal(confFileStream, consumerConfig)
if err != nil { if err != nil {
return fmt.Errorf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err)) return fmt.Errorf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))
} }
//动态加载service config end
//for _, config := range consumerConfig.Registries {
// if config.Timeout, err = time.ParseDuration(config.TimeoutStr); err != nil {
// return fmt.Errorf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s", config.TimeoutStr, err)
// }
//}
gxlog.CInfo("consumer config{%#v}\n", consumerConfig) gxlog.CInfo("consumer config{%#v}\n", consumerConfig)
return nil return nil
...@@ -110,6 +105,7 @@ func providerInit(confProFile string) error { ...@@ -110,6 +105,7 @@ func providerInit(confProFile string) error {
if err != nil { if err != nil {
return fmt.Errorf("ioutil.ReadFile(file:%s) = error:%s", confProFile, jerrors.ErrorStack(err)) return fmt.Errorf("ioutil.ReadFile(file:%s) = error:%s", confProFile, jerrors.ErrorStack(err))
} }
providerConfig = &ProviderConfig{}
err = yaml.Unmarshal(confFileStream, providerConfig) err = yaml.Unmarshal(confFileStream, providerConfig)
if err != nil { if err != nil {
return fmt.Errorf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err)) return fmt.Errorf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))
...@@ -156,6 +152,10 @@ func SetConsumerConfig(c ConsumerConfig) { ...@@ -156,6 +152,10 @@ func SetConsumerConfig(c ConsumerConfig) {
consumerConfig = &c consumerConfig = &c
} }
func GetConsumerConfig() ConsumerConfig { func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
log.Warn("consumerConfig is nil!")
return ConsumerConfig{}
}
return *consumerConfig return *consumerConfig
} }
...@@ -164,6 +164,10 @@ func GetConsumerConfig() ConsumerConfig { ...@@ -164,6 +164,10 @@ func GetConsumerConfig() ConsumerConfig {
///////////////////////// /////////////////////////
type ProviderConfig struct { type ProviderConfig struct {
// pprof
Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"`
Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"`
ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` ApplicationConfig ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
Path string `yaml:"path" json:"path,omitempty"` Path string `yaml:"path" json:"path,omitempty"`
Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"` Registries []RegistryConfig `yaml:"registries" json:"registries,omitempty"`
...@@ -175,21 +179,25 @@ func SetProviderConfig(p ProviderConfig) { ...@@ -175,21 +179,25 @@ func SetProviderConfig(p ProviderConfig) {
providerConfig = &p providerConfig = &p
} }
func GetProviderConfig() ProviderConfig { func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
log.Warn("providerConfig is nil!")
return ProviderConfig{}
}
return *providerConfig return *providerConfig
} }
type ProtocolConfig struct { type ProtocolConfig struct {
name string `required:"true" yaml:"name" json:"name,omitempty"` Name string `required:"true" yaml:"name" json:"name,omitempty"`
ip string `required:"true" yaml:"ip" json:"ip,omitempty"` Ip string `required:"true" yaml:"ip" json:"ip,omitempty"`
port string `required:"true" yaml:"port" json:"port,omitempty"` Port string `required:"true" yaml:"port" json:"port,omitempty"`
contextPath string `required:"true" yaml:"contextPath" json:"contextPath,omitempty"` ContextPath string `required:"true" yaml:"contextPath" json:"contextPath,omitempty"`
} }
func loadProtocol(protocolsIds string, protocols []ProtocolConfig) []ProtocolConfig { func loadProtocol(protocolsIds string, protocols []ProtocolConfig) []ProtocolConfig {
returnProtocols := []ProtocolConfig{} returnProtocols := []ProtocolConfig{}
for _, v := range strings.Split(protocolsIds, ",") { for _, v := range strings.Split(protocolsIds, ",") {
for _, prot := range protocols { for _, prot := range protocols {
if v == prot.name { if v == prot.Name {
returnProtocols = append(returnProtocols, prot) returnProtocols = append(returnProtocols, prot)
} }
} }
...@@ -200,27 +208,37 @@ func loadProtocol(protocolsIds string, protocols []ProtocolConfig) []ProtocolCon ...@@ -200,27 +208,37 @@ func loadProtocol(protocolsIds string, protocols []ProtocolConfig) []ProtocolCon
// Dubbo Init // Dubbo Init
func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) { func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
refMap := make(map[string]*ReferenceConfig) var refMap map[string]*ReferenceConfig
srvMap := make(map[string]*ServiceConfig) var srvMap map[string]*ServiceConfig
// reference config // reference config
length := len(consumerConfig.References) if consumerConfig == nil {
for index := 0; index < length; index++ { log.Warn("consumerConfig is nil!")
con := &consumerConfig.References[index] } else {
con.Implement(conServices[con.interfaceName]) refMap = make(map[string]*ReferenceConfig)
con.Refer() length := len(consumerConfig.References)
refMap[con.interfaceName] = con for index := 0; index < length; index++ {
con := &consumerConfig.References[index]
con.Implement(conServices[con.InterfaceName])
con.Refer()
refMap[con.InterfaceName] = con
}
} }
// service config // service config
length = len(providerConfig.Services) if providerConfig == nil {
for index := 0; index < length; index++ { log.Warn("providerConfig is nil!")
pro := &providerConfig.Services[index] } else {
pro.Implement(proServices[pro.interfaceName]) srvMap = make(map[string]*ServiceConfig)
if err := pro.Export(); err != nil { length := len(providerConfig.Services)
panic(fmt.Sprintf("service %s export failed! ", pro.interfaceName)) for index := 0; index < length; index++ {
pro := &providerConfig.Services[index]
pro.Implement(proServices[pro.InterfaceName])
if err := pro.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! ", pro.InterfaceName))
}
srvMap[pro.InterfaceName] = pro
} }
srvMap[pro.interfaceName] = pro
} }
return refMap, srvMap return refMap, srvMap
......
...@@ -51,10 +51,8 @@ services: ...@@ -51,10 +51,8 @@ services:
protocols: protocols:
- name: "dubbo" - name: "dubbo"
# 如果是127.0.0.1, java-client将无法连接到go-server
ip : "192.168.56.1" ip : "192.168.56.1"
port : 20000 port : 20000
# 本server能够提供所有支持同样的Protocol的servicelist的服务
- name: "jsonrpc" - name: "jsonrpc"
ip: "127.0.0.1" ip: "127.0.0.1"
port: 20001 port: 20001
......
...@@ -2,13 +2,13 @@ package support ...@@ -2,13 +2,13 @@ package support
import ( import (
"context" "context"
"github.com/dubbo/dubbo-go/cluster/directory"
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
) )
import ( import (
"github.com/dubbo/dubbo-go/cluster/directory"
"github.com/dubbo/dubbo-go/common/constant" "github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/common/extension" "github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/common/proxy" "github.com/dubbo/dubbo-go/common/proxy"
...@@ -19,25 +19,24 @@ import ( ...@@ -19,25 +19,24 @@ import (
type ReferenceConfig struct { type ReferenceConfig struct {
context context.Context context context.Context
pxy *proxy.Proxy pxy *proxy.Proxy
interfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"` InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"`
protocol string `yaml:"protocol" json:"protocol,omitempty"` Protocol string `yaml:"protocol" json:"protocol,omitempty"`
registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
cluster string `yaml:"cluster" json:"cluster,omitempty"` Cluster string `yaml:"cluster" json:"cluster,omitempty"`
loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"` Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"` Retries int64 `yaml:"retries" json:"retries,omitempty"`
group string `yaml:"group" json:"group,omitempty"` Group string `yaml:"group" json:"group,omitempty"`
version string `yaml:"version" json:"version,omitempty"` Version string `yaml:"version" json:"version,omitempty"`
methods []struct { Methods []struct {
name string `yaml:"name" json:"name,omitempty"` Name string `yaml:"name" json:"name,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"` Retries int64 `yaml:"retries" json:"retries,omitempty"`
loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"` Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
} `yaml:"methods" json:"methods,omitempty"` } `yaml:"methods" json:"methods,omitempty"`
async bool `yaml:"async" json:"async,omitempty"` async bool `yaml:"async" json:"async,omitempty"`
invoker protocol.Invoker invoker protocol.Invoker
} }
type ConfigRegistry struct {
string type ConfigRegistry string
}
func NewReferenceConfig(ctx context.Context) *ReferenceConfig { func NewReferenceConfig(ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{context: ctx} return &ReferenceConfig{context: ctx}
...@@ -47,8 +46,8 @@ func (refconfig *ReferenceConfig) Refer() { ...@@ -47,8 +46,8 @@ func (refconfig *ReferenceConfig) Refer() {
//首先是user specified SubURL, could be peer-to-peer address, or register center's address. //首先是user specified SubURL, could be peer-to-peer address, or register center's address.
//其次是assemble SubURL from register center's configuration模式 //其次是assemble SubURL from register center's configuration模式
regUrls := loadRegistries(refconfig.registries, consumerConfig.Registries, config.CONSUMER) regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries, config.CONSUMER)
url := config.NewURLWithOptions(refconfig.interfaceName, config.WithProtocol(refconfig.protocol), config.WithParams(refconfig.getUrlMap())) url := config.NewURLWithOptions(refconfig.InterfaceName, config.WithProtocol(refconfig.Protocol), config.WithParams(refconfig.getUrlMap()))
//set url to regUrls //set url to regUrls
for _, regUrl := range regUrls { for _, regUrl := range regUrls {
...@@ -81,11 +80,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { ...@@ -81,11 +80,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap := url.Values{} urlMap := url.Values{}
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, refconfig.cluster) urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, refconfig.loadbalance) urlMap.Set(constant.LOADBALANCE_KEY, refconfig.Loadbalance)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.retries, 10)) urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10))
urlMap.Set(constant.GROUP_KEY, refconfig.group) urlMap.Set(constant.GROUP_KEY, refconfig.Group)
urlMap.Set(constant.VERSION_KEY, refconfig.version) urlMap.Set(constant.VERSION_KEY, refconfig.Version)
//getty invoke async or sync //getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
...@@ -98,9 +97,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { ...@@ -98,9 +97,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.OWNER_KEY, consumerConfig.ApplicationConfig.Owner) urlMap.Set(constant.OWNER_KEY, consumerConfig.ApplicationConfig.Owner)
urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment) urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment)
for _, v := range refconfig.methods { for _, v := range refconfig.Methods {
urlMap.Set("methods."+v.name+"."+constant.LOADBALANCE_KEY, v.loadbalance) urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.retries, 10)) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.Retries, 10))
} }
return urlMap return urlMap
......
...@@ -22,10 +22,10 @@ func loadRegistries(registriesIds []ConfigRegistry, registries []RegistryConfig, ...@@ -22,10 +22,10 @@ func loadRegistries(registriesIds []ConfigRegistry, registries []RegistryConfig,
var urls []*config.URL var urls []*config.URL
for _, registry := range registriesIds { for _, registry := range registriesIds {
for _, registryConf := range registries { for _, registryConf := range registries {
if registry.string == registryConf.Id { if string(registry) == registryConf.Id {
url, err := config.NewURL(context.TODO(), registryConf.Address, config.WithParams(registryConf.getUrlMap(roleType))) url, err := config.NewURL(context.TODO(), registryConf.Address, config.WithParams(registryConf.getUrlMap(roleType)))
if err != nil { if err != nil {
log.Error("The registry id:%s url is invalid ,and will skip the registry", registryConf.Id) log.Error("The registry id:%s url is invalid ,and will skip the registry, error: %#v", registryConf.Id, err)
} else { } else {
urls = append(urls, &url) urls = append(urls, &url)
} }
......
...@@ -20,21 +20,21 @@ import ( ...@@ -20,21 +20,21 @@ import (
type ServiceConfig struct { type ServiceConfig struct {
context context.Context context context.Context
protocol string //multi protocol support, split by ',' Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`//multi protocol support, split by ','
interfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"` InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty"`
registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"` Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"` Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty"` Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty"`
group string `yaml:"group" json:"group,omitempty"` Group string `yaml:"group" json:"group,omitempty"`
version string `yaml:"version" json:"version,omitempty"` Version string `yaml:"version" json:"version,omitempty"`
methods []struct { Methods []struct {
name string `yaml:"name" json:"name,omitempty"` Name string `yaml:"name" json:"name,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"` Retries int64 `yaml:"retries" json:"retries,omitempty"`
loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"` Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
weight int64 `yaml:"weight" json:"weight,omitempty"` Weight int64 `yaml:"weight" json:"weight,omitempty"`
} `yaml:"methods" json:"methods,omitempty"` } `yaml:"methods" json:"methods,omitempty"`
warmup string `yaml:"warmup" json:"warmup,omitempty"` Warmup string `yaml:"warmup" json:"warmup,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"` Retries int64 `yaml:"retries" json:"retries,omitempty"`
unexported *atomic.Bool unexported *atomic.Bool
exported *atomic.Bool exported *atomic.Bool
rpcService config.RPCService rpcService config.RPCService
...@@ -53,35 +53,35 @@ func (srvconfig *ServiceConfig) Export() error { ...@@ -53,35 +53,35 @@ func (srvconfig *ServiceConfig) Export() error {
//TODO: config center start here //TODO: config center start here
//TODO:delay export //TODO:delay export
if srvconfig.unexported.Load() { if srvconfig.unexported != nil && srvconfig.unexported.Load() {
err := jerrors.Errorf("The service %v has already unexported! ", srvconfig.interfaceName) err := jerrors.Errorf("The service %v has already unexported! ", srvconfig.InterfaceName)
log.Error(err.Error()) log.Error(err.Error())
return err return err
} }
if srvconfig.exported.Load() { if srvconfig.unexported != nil && srvconfig.exported.Load() {
log.Warn("The service %v has already exported! ", srvconfig.interfaceName) log.Warn("The service %v has already exported! ", srvconfig.InterfaceName)
return nil return nil
} }
regUrls := loadRegistries(srvconfig.registries, providerConfig.Registries, config.PROVIDER) regUrls := loadRegistries(srvconfig.Registries, providerConfig.Registries, config.PROVIDER)
urlMap := srvconfig.getUrlMap() urlMap := srvconfig.getUrlMap()
for _, proto := range loadProtocol(srvconfig.protocol, providerConfig.Protocols) { for _, proto := range loadProtocol(srvconfig.Protocol, providerConfig.Protocols) {
//registry the service reflect //registry the service reflect
_, err := config.ServiceMap.Register(proto.name, srvconfig.rpcService) _, err := config.ServiceMap.Register(proto.Name, srvconfig.rpcService)
if err != nil { if err != nil {
err := jerrors.Errorf("The service %v export the protocol %v error! Error message is %v .", srvconfig.interfaceName, proto.name, err.Error()) err := jerrors.Errorf("The service %v export the protocol %v error! Error message is %v .", srvconfig.InterfaceName, proto.Name, err.Error())
log.Error(err.Error()) log.Error(err.Error())
return err return err
} }
contextPath := proto.contextPath contextPath := proto.ContextPath
if contextPath == "" { if contextPath == "" {
contextPath = providerConfig.Path contextPath = providerConfig.Path
} }
url := config.NewURLWithOptions(srvconfig.interfaceName, url := config.NewURLWithOptions(srvconfig.InterfaceName,
config.WithProtocol(proto.name), config.WithProtocol(proto.Name),
config.WithIp(proto.ip), config.WithIp(proto.Ip),
config.WithPort(proto.port), config.WithPort(proto.Port),
config.WithPath(contextPath), config.WithPath(contextPath),
config.WithParams(urlMap)) config.WithParams(urlMap))
...@@ -104,12 +104,12 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { ...@@ -104,12 +104,12 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{} urlMap := url.Values{}
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, srvconfig.cluster) urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, srvconfig.loadbalance) urlMap.Set(constant.LOADBALANCE_KEY, srvconfig.Loadbalance)
urlMap.Set(constant.WARMUP_KEY, srvconfig.warmup) urlMap.Set(constant.WARMUP_KEY, srvconfig.Warmup)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.retries, 10)) urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10))
urlMap.Set(constant.GROUP_KEY, srvconfig.group) urlMap.Set(constant.GROUP_KEY, srvconfig.Group)
urlMap.Set(constant.VERSION_KEY, srvconfig.version) urlMap.Set(constant.VERSION_KEY, srvconfig.Version)
//application info //application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name) urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization) urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
...@@ -119,10 +119,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { ...@@ -119,10 +119,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.OWNER_KEY, providerConfig.ApplicationConfig.Owner) urlMap.Set(constant.OWNER_KEY, providerConfig.ApplicationConfig.Owner)
urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment) urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment)
for _, v := range srvconfig.methods { for _, v := range srvconfig.Methods {
urlMap.Set("methods."+v.name+"."+constant.LOADBALANCE_KEY, v.loadbalance) urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.retries, 10)) urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.Retries, 10))
urlMap.Set("methods."+v.name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.weight, 10)) urlMap.Set("methods."+v.Name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))
} }
return urlMap return urlMap
......
...@@ -132,6 +132,7 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error) ...@@ -132,6 +132,7 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error)
return s, jerrors.Errorf("url.QueryUnescape(%s), error{%v}", urlString, err) return s, jerrors.Errorf("url.QueryUnescape(%s), error{%v}", urlString, err)
} }
rawUrlString = "//"+rawUrlString
serviceUrl, err = url.Parse(rawUrlString) serviceUrl, err = url.Parse(rawUrlString)
if err != nil { if err != nil {
return s, jerrors.Errorf("url.Parse(url string{%s}), error{%v}", rawUrlString, err) return s, jerrors.Errorf("url.Parse(url string{%s}), error{%v}", rawUrlString, err)
......
package examples
import (
"fmt"
"io/ioutil"
"os"
"path"
"time"
)
import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
"gopkg.in/yaml.v2"
)
import (
"github.com/dubbo/dubbo-go/plugins"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
)
const (
APP_CONF_FILE = "APP_CONF_FILE"
APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE"
)
type (
// Client holds supported types by the multiconfig package
ClientConfig struct {
// pprof
Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"`
Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"`
// client
Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"`
ConnectTimeout time.Duration
Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m
RequestTimeout time.Duration
// codec & selector & transport & registry
Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"`
Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"`
//client load balance algorithm
ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"`
Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"`
// application
Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
// 一个客户端只允许使用一个service的其中一个group和其中一个version
ServiceConfigType string `default:"default" yaml:"service_config_type" json:"service_config_type,omitempty"`
ServiceConfigList []registry.ReferenceConfig `yaml:"-"`
ServiceConfigMapList []map[string]string `yaml:"service_list" json:"service_list,omitempty"`
}
)
func InitClientConfig() *ClientConfig {
var (
clientConfig *ClientConfig
confFile string
)
// configure
confFile = os.Getenv(APP_CONF_FILE)
if confFile == "" {
panic(fmt.Sprintf("application configure file name is nil"))
return nil // I know it is of no usage. Just Err Protection.
}
if path.Ext(confFile) != ".yml" {
panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile))
return nil
}
clientConfig = new(ClientConfig)
confFileStream, err := ioutil.ReadFile(confFile)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err)))
return nil
}
err = yaml.Unmarshal(confFileStream, clientConfig)
if err != nil {
panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err)))
return nil
}
//动态加载service config
//设置默认ProviderServiceConfig类
plugins.SetDefaultServiceConfig(clientConfig.ServiceConfigType)
for _, service := range clientConfig.ServiceConfigMapList {
svc := plugins.DefaultServiceConfig()()
svc.SetProtocol(service["protocol"])
svc.SetService(service["service"])
clientConfig.ServiceConfigList = append(clientConfig.ServiceConfigList, svc)
}
//动态加载service config end
if clientConfig.ZkRegistryConfig.Timeout, err = time.ParseDuration(clientConfig.ZkRegistryConfig.TimeoutStr); err != nil {
panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s", clientConfig.ZkRegistryConfig.TimeoutStr, err))
return nil
}
gxlog.CInfo("config{%#v}\n", clientConfig)
// log
confFile = os.Getenv(APP_LOG_CONF_FILE)
if confFile == "" {
panic(fmt.Sprintf("log configure file name is nil"))
return nil
}
if path.Ext(confFile) != ".xml" {
panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", confFile))
return nil
}
log.LoadConfiguration(confFile)
return clientConfig
}
package main
import (
"fmt"
"io/ioutil"
"os"
"path"
"time"
)
import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
"gopkg.in/yaml.v2"
)
import (
"github.com/dubbo/dubbo-go/plugins"
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/server"
)
const (
APP_CONF_FILE string = "APP_CONF_FILE"
APP_LOG_CONF_FILE string = "APP_LOG_CONF_FILE"
)
var (
conf *ServerConfig
)
type (
ServerConfig struct {
// pprof
Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"`
Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"`
// transport & registry
Transport string `default:"http" yaml:"transport" json:"transport,omitempty"`
NetTimeout string `default:"100ms" yaml:"net_timeout" json:"net_timeout,omitempty"` // in ms
netTimeout time.Duration
// application
Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
// Registry_Address string `default:"192.168.35.3:2181"`
Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
ServiceConfigType string `default:"default" yaml:"service_config_type" json:"service_config_type,omitempty"`
ServiceConfigList []registry.ProviderServiceConfig `yaml:"-"`
ServiceConfigMapList []map[string]string `yaml:"service_list" json:"service_list,omitempty"`
ServerConfigList []server.ServerConfig `yaml:"server_list" json:"server_list,omitempty"`
}
)
func initServerConf() *ServerConfig {
var (
err error
confFile string
)
confFile = os.Getenv(APP_CONF_FILE)
if confFile == "" {
panic(fmt.Sprintf("application configure file name is nil"))
return nil
}
if path.Ext(confFile) != ".yml" {
panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile))
return nil
}
conf = &ServerConfig{}
confFileStream, err := ioutil.ReadFile(confFile)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err)))
return nil
}
err = yaml.Unmarshal(confFileStream, conf)
//动态加载service config
//设置默认ProviderServiceConfig类
plugins.SetDefaultProviderServiceConfig(conf.ServiceConfigType)
for _, service := range conf.ServiceConfigMapList {
svc := plugins.DefaultProviderServiceConfig()()
svc.SetProtocol(service["protocol"])
svc.SetService(service["service"])
conf.ServiceConfigList = append(conf.ServiceConfigList, svc)
}
//动态加载service config end
if err != nil {
panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err)))
return nil
}
if conf.netTimeout, err = time.ParseDuration(conf.NetTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(NetTimeout:%#v) = error:%s", conf.NetTimeout, err))
return nil
}
if conf.ZkRegistryConfig.Timeout, err = time.ParseDuration(conf.ZkRegistryConfig.TimeoutStr); err != nil {
panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s",
conf.ZkRegistryConfig.TimeoutStr, err))
return nil
}
gxlog.CInfo("config{%#v}\n", conf)
return conf
}
func configInit() error {
var (
confFile string
)
initServerConf()
confFile = os.Getenv(APP_LOG_CONF_FILE)
if confFile == "" {
panic(fmt.Sprintf("log configure file name is nil"))
return nil
}
if path.Ext(confFile) != ".xml" {
panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", confFile))
return nil
}
log.LoadConfiguration(confFile)
return nil
}
...@@ -14,86 +14,46 @@ import ( ...@@ -14,86 +14,46 @@ import (
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/time" "github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
) )
import ( import (
"github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/config/support"
"github.com/dubbo/dubbo-go/plugins"
registry2 "github.com/dubbo/dubbo-go/registry" _ "github.com/dubbo/dubbo-go/protocol/dubbo"
"github.com/dubbo/dubbo-go/registry/zookeeper" _ "github.com/dubbo/dubbo-go/protocol/jsonrpc"
_ "github.com/dubbo/dubbo-go/registry/protocol"
_ "github.com/dubbo/dubbo-go/filter/imp"
_ "github.com/dubbo/dubbo-go/cluster/loadbalance"
_ "github.com/dubbo/dubbo-go/cluster/support"
_ "github.com/dubbo/dubbo-go/registry/zookeeper"
) )
var ( var (
survivalTimeout = int(3e9) survivalTimeout = int(3e9)
servo *jsonrpc.Server
) )
// they are necessary:
// export CONF_CONSUMER_FILE_PATH="xxx"
// export CONF_PROVIDER_FILE_PATH="xxx"
// export APP_LOG_CONF_FILE="xxx"
func main() { func main() {
var (
err error
)
err = configInit() _, proMap := support.Load()
if err != nil { if proMap == nil {
log.Error("configInit() = error{%#v}", err) panic("proMap is nil")
return
} }
initProfiling() initProfiling()
servo = initServer() //todo
err = servo.Handle(&UserProvider{})
if err != nil {
panic(err)
return
}
servo.Start()
initSignal() initSignal()
} }
func initServer() *jsonrpc.Server {
var (
srv *jsonrpc.Server
)
if conf == nil {
panic(fmt.Sprintf("conf is nil"))
return nil
}
// registry
registry, err := plugins.PluggableRegistries[conf.Registry](
registry2.WithDubboType(registry2.PROVIDER),
registry2.WithApplicationConf(conf.Application_Config),
zookeeper.WithRegistryConf(conf.ZkRegistryConfig),
)
if err != nil || registry == nil {
panic(fmt.Sprintf("fail to init registry.Registy, err:%s", jerrors.ErrorStack(err)))
return nil
}
// provider
srv = jsonrpc.NewServer(
jsonrpc.Registry(registry),
jsonrpc.ConfList(conf.ServerConfigList),
jsonrpc.ServiceConfList(conf.ServiceConfigList),
)
return srv
}
func uninitServer() {
if servo != nil {
servo.Stop()
}
log.Close()
}
func initProfiling() { func initProfiling() {
if !conf.Pprof_Enabled { if !support.GetProviderConfig().Pprof_Enabled {
return return
} }
const ( const (
...@@ -109,7 +69,7 @@ func initProfiling() { ...@@ -109,7 +69,7 @@ func initProfiling() {
if err != nil { if err != nil {
panic("cat not get local ip!") panic("cat not get local ip!")
} }
addr = ip + ":" + strconv.Itoa(conf.Pprof_Port) addr = ip + ":" + strconv.Itoa(support.GetProviderConfig().Pprof_Port)
log.Info("App Profiling startup on address{%v}", addr+PprofPath) log.Info("App Profiling startup on address{%v}", addr+PprofPath)
go func() { go func() {
...@@ -134,7 +94,7 @@ func initSignal() { ...@@ -134,7 +94,7 @@ func initSignal() {
}) })
// 要么fastFailTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出 // 要么fastFailTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出
uninitServer() // todo
fmt.Println("provider app exit now...") fmt.Println("provider app exit now...")
return return
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
// "encoding/json" // "encoding/json"
"context" "context"
"fmt" "fmt"
"github.com/dubbo/dubbo-go/config/support"
"time" "time"
) )
...@@ -14,6 +15,10 @@ import ( ...@@ -14,6 +15,10 @@ import (
type Gender int type Gender int
func init() {
support.SetProService(new(UserProvider))
}
const ( const (
MAN = iota MAN = iota
WOMAN WOMAN
...@@ -134,14 +139,14 @@ func (u *UserProvider) GetUser(ctx context.Context, req *string, rsp *User) erro ...@@ -134,14 +139,14 @@ func (u *UserProvider) GetUser(ctx context.Context, req *string, rsp *User) erro
} }
*/ */
func (u *UserProvider) GetUser(ctx context.Context, req []string, rsp *User) error { func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error {
var ( var (
err error err error
user *User user *User
) )
gxlog.CInfo("req:%#v", req) gxlog.CInfo("req:%#v", req)
user, err = u.getUser(req[0]) user, err = u.getUser(req[0].(string))
if err == nil { if err == nil {
*rsp = *user *rsp = *user
gxlog.CInfo("rsp:%#v", rsp) gxlog.CInfo("rsp:%#v", rsp)
......
package main
var (
Version string = "0.3.1"
)
session_number: 700
fail_fast_timeout: "5s"
session_timeout: "20s"
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
session_name: "server"
...@@ -8,6 +8,7 @@ pprof_port : 20080 ...@@ -8,6 +8,7 @@ pprof_port : 20080
transport : "http" transport : "http"
net_timeout : "3s" net_timeout : "3s"
path: ""
# application config # application config
application_config: application_config:
organization : "ikurento.com" organization : "ikurento.com"
...@@ -17,22 +18,42 @@ application_config: ...@@ -17,22 +18,42 @@ application_config:
owner : "ZX" owner : "ZX"
environment : "dev" environment : "dev"
registry: "zookeeper" registries :
- id: "hangzhouzk"
zk_registry_config: type: "zookeeper"
timeout : "3s" timeout : "3s"
address: address: "127.0.0.1:2181"
- "127.0.0.1:2181" username: ""
service_config_type: "default" password: ""
service_list:
- - id: "shanghaizk"
protocol : "jsonrpc" type: "zookeeper"
# 相当于dubbo.xml中的interface timeout : "3s"
service : "com.ikurento.user.UserProvider" address: "127.0.0.1:2182"
username: ""
server_list: password: ""
-
ip : "127.0.0.1"
port : 20000 services:
# 本server能够提供所有支持同样的Protocol的servicelist的服务 - registries:
protocol : "jsonrpc" - "hangzhouzk"
- "shanghaizk"
protocol : "dubbo,jsonrpc"
# 相当于dubbo.xml中的interface
interface : "com.ikurento.user.UserProvider"
loadbalance: "random"
warmup: "100"
cluster: "failover"
methods:
- name: "GetUser"
retries: 1
loadbalance: "random"
protocols:
- name: "dubbo"
ip : "127.0.0.1"
port : 20000
- name: "jsonrpc"
ip: "127.0.0.1"
port: 20001
session_number: 700
fail_fast_timeout: "5s"
session_timeout: "20s"
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
session_name: "server"
...@@ -8,6 +8,7 @@ pprof_port : 20080 ...@@ -8,6 +8,7 @@ pprof_port : 20080
transport : "http" transport : "http"
net_timeout : "3s" net_timeout : "3s"
path: ""
# application config # application config
application_config: application_config:
organization : "ikurento.com" organization : "ikurento.com"
...@@ -15,24 +16,44 @@ application_config: ...@@ -15,24 +16,44 @@ application_config:
module : "dubbogo user-info server" module : "dubbogo user-info server"
version : "0.0.1" version : "0.0.1"
owner : "ZX" owner : "ZX"
environment : "product" environment : "pro"
registry: "zookeeper" registries :
- id: "hangzhouzk"
zk_registry_config: type: "zookeeper"
timeout : "3s" timeout : "3s"
address: address: "127.0.0.1:2181"
- "127.0.0.1:2181" username: ""
service_config_type: "default" password: ""
service_list:
- - id: "shanghaizk"
protocol : "jsonrpc" type: "zookeeper"
# 相当于dubbo.xml中的interface timeout : "3s"
service : "com.ikurento.user.UserProvider" address: "127.0.0.1:2182"
username: ""
server_list: password: ""
-
ip : "127.0.0.1"
port : 20000 services:
# 本server能够提供所有支持同样的Protocol的servicelist的服务 - registries:
protocol : "jsonrpc" - "hangzhouzk"
- "shanghaizk"
protocol : "dubbo,jsonrpc"
# 相当于dubbo.xml中的interface
interface : "com.ikurento.user.UserProvider"
loadbalance: "random"
warmup: "100"
cluster: "failover"
methods:
- name: "GetUser"
retries: 1
loadbalance: "random"
protocols:
- name: "dubbo"
ip : "127.0.0.1"
port : 20000
- name: "jsonrpc"
ip: "127.0.0.1"
port: 20001
session_number: 700
fail_fast_timeout: "5s"
session_timeout: "20s"
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
session_name: "server"
...@@ -8,31 +8,52 @@ pprof_port : 20080 ...@@ -8,31 +8,52 @@ pprof_port : 20080
transport : "http" transport : "http"
net_timeout : "3s" net_timeout : "3s"
path: ""
# application config # application config
application_config: application_config:
organization : "ikurento.com" organization : "ikurento.com"
name : "BDTService" name : "BDTService"
module : "dubbogo user-info server" module : "dubbogo user-info server"
version : "0.0.1" version : "0.0.1"
owner : "ZX" owner : "ZX"
environment : "test" environment : "test"
registry: "zookeeper" registries :
- id: "hangzhouzk"
zk_registry_config: type: "zookeeper"
timeout : "3s" timeout : "3s"
address: address: "127.0.0.1:2181"
- "127.0.0.1:2181" username: ""
service_config_type: "default" password: ""
service_list:
- - id: "shanghaizk"
protocol : "jsonrpc" type: "zookeeper"
# 相当于dubbo.xml中的interface timeout : "3s"
service : "com.ikurento.user.UserProvider" address: "127.0.0.1:2182"
username: ""
server_list: password: ""
-
ip : "127.0.0.1"
port : 20000 services:
# 本server能够提供所有支持同样的Protocol的servicelist的服务 - registries:
protocol : "jsonrpc" - "hangzhouzk"
- "shanghaizk"
protocol : "dubbo,jsonrpc"
# 相当于dubbo.xml中的interface
interface : "com.ikurento.user.UserProvider"
loadbalance: "random"
warmup: "100"
cluster: "failover"
methods:
- name: "GetUser"
retries: 1
loadbalance: "random"
protocols:
- name: "dubbo"
ip : "127.0.0.1"
port : 20000
- name: "jsonrpc"
ip: "127.0.0.1"
port: 20001
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -98,10 +97,6 @@ func (c *HTTPClient) Call(ctx context.Context, service config.URL, req *Request, ...@@ -98,10 +97,6 @@ func (c *HTTPClient) Call(ctx context.Context, service config.URL, req *Request,
httpHeader.Set("Accept", "application/json") httpHeader.Set("Accept", "application/json")
reqTimeout := c.options.HTTPTimeout reqTimeout := c.options.HTTPTimeout
timeout, err := strconv.ParseInt(service.GetParam(constant.TIMEOUT_KEY,""), 10, 64)
if err == nil && time.Duration(timeout)< reqTimeout {
reqTimeout = time.Duration(timeout)
}
if reqTimeout <= 0 { if reqTimeout <= 0 {
reqTimeout = 1e8 reqTimeout = 1e8
} }
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"context" "context"
"github.com/dubbo/dubbo-go/common/constant"
"io" "io"
"io/ioutil" "io/ioutil"
"net" "net"
...@@ -22,6 +21,7 @@ import ( ...@@ -22,6 +21,7 @@ import (
) )
import ( import (
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/config" "github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol" "github.com/dubbo/dubbo-go/protocol"
) )
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment