Skip to content
Snippets Groups Projects
Commit 2a4c745d authored by xianlezheng's avatar xianlezheng Committed by GitHub
Browse files

Merge pull request #1 from apache/develop

Update forks merge
parents 2c9c9e1f 9b0877c9
No related branches found
No related tags found
No related merge requests found
Showing
with 557 additions and 172 deletions
......@@ -16,7 +16,7 @@ Apache License, Version 2.0
## Release note ##
[v1.4.0-rc1 - Mar 12, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - Mar 1, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......
......@@ -15,7 +15,7 @@ Apache License, Version 2.0
## 发布日志 ##
[v1.4.0-rc1 - 2020年3月12](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1)
[v1.4.0 - 2020年3月17](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - 2020年3月1日](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......
......@@ -131,6 +131,7 @@ const (
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
ShutdownConfigPrefix = "dubbo.shutdown."
MetadataReportPrefix = "dubbo.metadata-report."
RouterConfigPrefix = "dubbo.router."
)
......@@ -179,6 +180,9 @@ const (
// ForceUseTag is the tag in attachment
ForceUseTag = "dubbo.force.tag"
Tagkey = "dubbo.tag"
// Attachment key in context in invoker
AttachmentKey = "attachment"
)
const (
......@@ -209,9 +213,23 @@ const (
// consumer
CONSUMER = "consumer"
// key of access key id
ACCESS_KEY_ID_KEY = "accessKeyId"
ACCESS_KEY_ID_KEY = ".accessKeyId"
// key of secret access key
SECRET_ACCESS_KEY_KEY = "secretAccessKey"
SECRET_ACCESS_KEY_KEY = ".secretAccessKey"
)
// metadata report
const (
METACONFIG_REMOTE = "remote"
METACONFIG_LOCAL = "local"
KEY_SEPARATOR = ":"
DEFAULT_PATH_TAG = "metadata"
KEY_REVISON_PREFIX = "revision"
PATH_SEPARATOR = "/"
// metadata service
METADATA_SERVICE_NAME = "org.apache.dubbo.metadata.MetadataService"
)
// HealthCheck Router
......@@ -235,3 +253,9 @@ const (
// The default time window of circuit-tripped in millisecond if not specfied
MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000
)
// service discovery
const (
NACOS_GROUP = "nacos.group"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/metadata"
)
var (
metaDataReportFactories = make(map[string]func() metadata.MetadataReportFactory, 8)
)
// SetMetadataReportFactory ...
func SetMetadataReportFactory(name string, v func() metadata.MetadataReportFactory) {
metaDataReportFactories[name] = v
}
// GetMetadataReportFactory ...
func GetMetadataReportFactory(name string) metadata.MetadataReportFactory {
if metaDataReportFactories[name] == nil {
panic("metadata report for " + name + " is not existing, make sure you have import the package.")
}
return metaDataReportFactories[name]()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
)
type registryDirectory func(url *common.URL, registry registry.Registry) (cluster.Directory, error)
var defaultRegistry registryDirectory
// SetDefaultRegistryDirectory ...
func SetDefaultRegistryDirectory(v registryDirectory) {
defaultRegistry = v
}
// GetDefaultRegistryDirectory ...
func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) {
if defaultRegistry == nil {
panic("registry directory is not existing, make sure you have import the package.")
}
return defaultRegistry(config, registry)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
)
var (
discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4)
)
// SetServiceDiscovery will store the creator and name
func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) {
discoveryCreatorMap[name] = creator
}
// GetServiceDiscovery will return the registry.ServiceDiscovery
// if not found, or initialize instance failed, it will return error.
func GetServiceDiscovery(name string, url *common.URL) (registry.ServiceDiscovery, error) {
creator, ok := discoveryCreatorMap[name]
if !ok {
return nil, perrors.New("Could not find the service discovery with name: " + name)
}
return creator(url)
}
......@@ -25,6 +25,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
......@@ -44,7 +45,7 @@ var (
typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
)
// NewProxy ...
// NewProxy create service proxy.
func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy {
return &Proxy{
invoke: invoke,
......@@ -59,7 +60,6 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
func (p *Proxy) Implement(v common.RPCService) {
// check parameters, incoming interface must be a elem's pointer.
......@@ -141,7 +141,7 @@ func (p *Proxy) Implement(v common.RPCService) {
}
// add user setAttachment
atm := invCtx.Value("attachment")
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachments(k, value)
......@@ -149,6 +149,9 @@ func (p *Proxy) Implement(v common.RPCService) {
}
result := p.invoke.Invoke(invCtx, inv)
if len(result.Attachments()) > 0 {
invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())
}
err = result.Error()
logger.Debugf("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err)
......@@ -202,12 +205,12 @@ func (p *Proxy) Implement(v common.RPCService) {
}
// Get ...
// Get get rpc service instance.
func (p *Proxy) Get() common.RPCService {
return p.rpc
}
// GetCallback ...
// GetCallback get callback.
func (p *Proxy) GetCallback() interface{} {
return p.callBack
}
......@@ -22,7 +22,7 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// ProxyFactory ...
// ProxyFactory interface.
type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy
......
......@@ -113,6 +113,7 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
in := []reflect.Value{svc.Rcvr()}
if method.CtxType() != nil {
ctx = context.WithValue(ctx, constant.AttachmentKey, invocation.Attachments())
in = append(in, method.SuiteContext(ctx))
}
......
......@@ -59,7 +59,6 @@ type AsyncCallback func(response CallbackResponse)
// return map[string][string]{}
// }
const (
// METHOD_MAPPER ...
METHOD_MAPPER = "MethodMapper"
)
......@@ -68,7 +67,7 @@ var (
// because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// ServiceMap ...
// ServiceMap store description of service.
// todo: lowerecas?
ServiceMap = &serviceMap{
serviceMap: make(map[string]map[string]*Service),
......@@ -80,7 +79,7 @@ var (
// info of method
//////////////////////////
// MethodType ...
// MethodType is description of service method.
type MethodType struct {
method reflect.Method
ctxType reflect.Type // request context
......@@ -88,27 +87,27 @@ type MethodType struct {
replyType reflect.Type // return value, otherwise it is nil
}
// Method ...
// Method get @m.method.
func (m *MethodType) Method() reflect.Method {
return m.method
}
// CtxType ...
// CtxType get @m.ctxType.
func (m *MethodType) CtxType() reflect.Type {
return m.ctxType
}
// ArgsType ...
// ArgsType get @m.argsType.
func (m *MethodType) ArgsType() []reflect.Type {
return m.argsType
}
// ReplyType ...
// ReplyType get @m.replyType.
func (m *MethodType) ReplyType() reflect.Type {
return m.replyType
}
// SuiteContext ...
// SuiteContext tranfer @ctx to reflect.Value type or get it from @m.ctxType.
func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
return contextv
......@@ -120,7 +119,7 @@ func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value {
// info of service interface
//////////////////////////
// Service ...
// Service is description of service
type Service struct {
name string
rcvr reflect.Value
......@@ -128,17 +127,17 @@ type Service struct {
methods map[string]*MethodType
}
// Method ...
// Method get @s.methods.
func (s *Service) Method() map[string]*MethodType {
return s.methods
}
// RcvrType ...
// RcvrType get @s.rcvrType.
func (s *Service) RcvrType() reflect.Type {
return s.rcvrType
}
// Rcvr ...
// Rcvr get @s.rcvr.
func (s *Service) Rcvr() reflect.Value {
return s.rcvr
}
......@@ -274,7 +273,9 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
}
}
delete(svcs, serviceId)
delete(sm.serviceMap, protocol)
if len(sm.serviceMap) == 0 {
delete(sm.serviceMap, protocol)
}
return nil
}
......
......@@ -195,7 +195,6 @@ func NewURLWithOptions(opts ...option) *URL {
// NewURL will create a new url
// the urlString should not be empty
func NewURL(urlString string, opts ...option) (URL, error) {
var (
err error
rawUrlString string
......@@ -249,7 +248,7 @@ func NewURL(urlString string, opts ...option) (URL, error) {
return s, nil
}
// URLEqual ...
// URLEqual judge @url and @c is equal or not.
func (c URL) URLEqual(url URL) bool {
c.Ip = ""
c.Port = ""
......@@ -265,17 +264,19 @@ func (c URL) URLEqual(url URL) bool {
} else if urlGroup == constant.ANY_VALUE {
urlKey = strings.Replace(urlKey, "group=*", "group="+cGroup, 1)
}
// 1. protocol, username, password, ip, port, service name, group, version should be equal
if cKey != urlKey {
return false
}
// 2. if url contains enabled key, should be true, or *
if url.GetParam(constant.ENABLED_KEY, "true") != "true" && url.GetParam(constant.ENABLED_KEY, "") != constant.ANY_VALUE {
return false
}
//TODO :may need add interface key any value condition
if !isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) {
return false
}
return true
return isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY))
}
func isMatchCategory(category1 string, category2 string) bool {
......@@ -313,10 +314,9 @@ func (c URL) Key() string {
"%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s",
c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
return buildString
//return c.ServiceKey()
}
// ServiceKey ...
// ServiceKey get a unique key of a service.
func (c URL) ServiceKey() string {
intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
if intf == "" {
......@@ -409,12 +409,12 @@ func (c *URL) RangeParams(f func(key, value string) bool) {
// GetParam ...
func (c URL) GetParam(s string, d string) string {
var r string
c.paramsLock.RLock()
if r = c.params.Get(s); len(r) == 0 {
defer c.paramsLock.RUnlock()
r := c.params.Get(s)
if len(r) == 0 {
r = d
}
c.paramsLock.RUnlock()
return r
}
......@@ -454,10 +454,8 @@ func (c URL) GetRawParam(key string) string {
// GetParamBool ...
func (c URL) GetParamBool(s string, d bool) bool {
var r bool
var err error
if r, err = strconv.ParseBool(c.GetParam(s, "")); err != nil {
r, err := strconv.ParseBool(c.GetParam(s, ""))
if err != nil {
return d
}
return r
......@@ -465,10 +463,8 @@ func (c URL) GetParamBool(s string, d bool) bool {
// GetParamInt ...
func (c URL) GetParamInt(s string, d int64) int64 {
var r int
var err error
if r, err = strconv.Atoi(c.GetParam(s, "")); r == 0 || err != nil {
r, err := strconv.Atoi(c.GetParam(s, ""))
if r == 0 || err != nil {
return d
}
return int64(r)
......@@ -476,11 +472,8 @@ func (c URL) GetParamInt(s string, d int64) int64 {
// GetMethodParamInt ...
func (c URL) GetMethodParamInt(method string, key string, d int64) int64 {
var r int
var err error
c.paramsLock.RLock()
defer c.paramsLock.RUnlock()
if r, err = strconv.Atoi(c.GetParam("methods."+method+"."+key, "")); r == 0 || err != nil {
r, err := strconv.Atoi(c.GetParam("methods."+method+"."+key, ""))
if r == 0 || err != nil {
return d
}
return int64(r)
......@@ -492,14 +485,13 @@ func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 {
if r == math.MinInt64 {
return c.GetParamInt(key, d)
}
return r
}
// GetMethodParam ...
func (c URL) GetMethodParam(method string, key string, d string) string {
var r string
if r = c.GetParam("methods."+method+"."+key, ""); r == "" {
r := c.GetParam("methods."+method+"."+key, "")
if r == "" {
r = d
}
return r
......@@ -530,7 +522,6 @@ func (c *URL) SetParams(m url.Values) {
// ToMap transfer URL to Map
func (c URL) ToMap() map[string]string {
paramsMap := make(map[string]string)
c.RangeParams(func(key, value string) bool {
......@@ -615,8 +606,30 @@ func (c *URL) Clone() *URL {
return newUrl
}
// Copy url based on the reserved parameters' keys.
func (c *URL) CloneWithParams(reserveParams []string) *URL {
params := url.Values{}
for _, reserveParam := range reserveParams {
v := c.GetParam(reserveParam, "")
if len(v) != 0 {
params.Set(reserveParam, v)
}
}
return NewURLWithOptions(
WithProtocol(c.Protocol),
WithUsername(c.Username),
WithPassword(c.Password),
WithIp(c.Ip),
WithPort(c.Port),
WithPath(c.Path),
WithMethods(c.Methods),
WithParams(params),
)
}
func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) {
var methodConfigMergeFcn = []func(method string){}
methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
for _, paramKey := range paramKeys {
if v := referenceUrl.GetParam(paramKey, ""); len(v) > 0 {
mergedUrl.SetParam(paramKey, v)
......
......@@ -33,6 +33,7 @@ type ApplicationConfig struct {
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Owner string `yaml:"owner" json:"owner,omitempty" property:"owner"`
Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"`
MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` //field for metadata report
}
// Prefix ...
......
......@@ -81,128 +81,142 @@ func checkApplicationName(config *ApplicationConfig) {
}
}
// Load Dubbo Init
func Load() {
// init router
if confRouterFile != "" {
if errPro := RouterInit(confRouterFile); errPro != nil {
log.Printf("[routerConfig init] %#v", errPro)
}
}
// reference config
func loadConsumerConfig() {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
} else {
// init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if conConfigType == nil {
if v, ok := conConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil {
logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value)
return
}
// init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if conConfigType == nil {
if v, ok := conConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil {
logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value)
}
}
metricConfig = consumerConfig.MetricConfig
applicationConfig = consumerConfig.ApplicationConfig
metricConfig = consumerConfig.MetricConfig
applicationConfig = consumerConfig.ApplicationConfig
checkApplicationName(consumerConfig.ApplicationConfig)
if err := configCenterRefreshConsumer(); err != nil {
logger.Errorf("[consumer config center refresh] %#v", err)
checkApplicationName(consumerConfig.ApplicationConfig)
if err := configCenterRefreshConsumer(); err != nil {
logger.Errorf("[consumer config center refresh] %#v", err)
}
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
rpcService := GetConsumerService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
rpcService := GetConsumerService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
}
//wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { //default to true
if refconfig.invoker != nil &&
!refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName)
//wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { //default to true
if refconfig.invoker != nil &&
!refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName)
}
}
if checkok {
break
}
checkok = true
}
if checkok {
break
}
checkok = true
}
}
// service config
func loadProviderConfig() {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
} else {
// init other provider config
proConfigType := providerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if proConfigType != nil {
if v, ok := proConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil {
logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value)
return
}
// init other provider config
proConfigType := providerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if proConfigType != nil {
if v, ok := proConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil {
logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value)
}
}
// so, you should know that the consumer's config will be override
metricConfig = providerConfig.MetricConfig
applicationConfig = providerConfig.ApplicationConfig
// so, you should know that the consumer's config will be override
metricConfig = providerConfig.MetricConfig
applicationConfig = providerConfig.ApplicationConfig
checkApplicationName(providerConfig.ApplicationConfig)
if err := configCenterRefreshProvider(); err != nil {
logger.Errorf("[provider config center refresh] %#v", err)
checkApplicationName(providerConfig.ApplicationConfig)
if err := configCenterRefreshProvider(); err != nil {
logger.Errorf("[provider config center refresh] %#v", err)
}
checkRegistries(providerConfig.Registries, providerConfig.Registry)
for key, svs := range providerConfig.Services {
rpcService := GetProviderService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
checkRegistries(providerConfig.Registries, providerConfig.Registry)
for key, svs := range providerConfig.Services {
rpcService := GetProviderService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
svs.id = key
svs.Implement(rpcService)
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
svs.id = key
svs.Implement(rpcService)
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
}
}
func initRouter() {
if confRouterFile != "" {
if err := RouterInit(confRouterFile); err != nil {
log.Printf("[routerConfig init] %#v", err)
}
}
}
// Load Dubbo Init
func Load() {
// init router
initRouter()
// reference config
loadConsumerConfig()
// service config
loadProviderConfig()
// init the shutdown callback
GracefulShutdownInit()
}
......
......@@ -44,6 +44,7 @@ type ConsumerConfig struct {
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
// application
ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"`
// client
Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty" property:"connect_timeout"`
ConnectTimeout time.Duration
......@@ -117,6 +118,7 @@ func ConsumerInit(confConFile string) error {
return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout)
}
}
logger.Debugf("consumer config{%#v}\n", consumerConfig)
return nil
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package instance
import (
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata"
)
var (
instance metadata.MetadataReport
once sync.Once
)
// GetMetadataReportInstance ...
func GetMetadataReportInstance(url *common.URL) metadata.MetadataReport {
once.Do(func() {
instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url)
})
return instance
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"net/url"
)
import (
"github.com/creasty/defaults"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config/instance"
)
// MethodConfig ...
type MetadataReportConfig struct {
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
}
// Prefix ...
func (c *MetadataReportConfig) Prefix() string {
return constant.MetadataReportPrefix
}
// UnmarshalYAML ...
func (c *MetadataReportConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := defaults.Set(c); err != nil {
return perrors.WithStack(err)
}
type plain MetadataReportConfig
if err := unmarshal((*plain)(c)); err != nil {
return perrors.WithStack(err)
}
return nil
}
// ToUrl ...
func (c *MetadataReportConfig) ToUrl() (*common.URL, error) {
urlMap := make(url.Values)
if c.Params != nil {
for k, v := range c.Params {
urlMap.Set(k, v)
}
}
url, err := common.NewURL(c.Address,
common.WithParams(urlMap),
common.WithUsername(c.Username),
common.WithPassword(c.Password),
common.WithLocation(c.Address),
common.WithProtocol(c.Protocol),
)
if err != nil || len(url.Protocol) == 0 {
return nil, perrors.New("Invalid MetadataReportConfig.")
}
url.SetParam("metadata", url.Protocol)
return &url, nil
}
func (c *MetadataReportConfig) IsValid() bool {
return len(c.Protocol) != 0
}
// StartMetadataReport: The entry of metadata report start
func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error {
if metadataReportConfig == nil || metadataReportConfig.IsValid() {
return nil
}
if metadataType == constant.METACONFIG_REMOTE {
return perrors.New("No MetadataConfig found, you must specify the remote Metadata Center address when 'metadata=remote' is enabled.")
} else if metadataType == constant.METACONFIG_REMOTE && len(metadataReportConfig.Address) == 0 {
return perrors.New("MetadataConfig address can not be empty.")
}
if url, err := metadataReportConfig.ToUrl(); err == nil {
instance.GetMetadataReportInstance(url)
} else {
return perrors.New("MetadataConfig is invalid!")
}
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import "testing"
import (
"github.com/stretchr/testify/assert"
)
func TestMetadataReportConfig_ToUrl(t *testing.T) {
metadataReportConfig := MetadataReportConfig{
Protocol: "mock",
Address: "127.0.0.1:2181",
Username: "test",
Password: "test",
TimeoutStr: "3s",
Params: map[string]string{
"k": "v",
},
}
url, error := metadataReportConfig.ToUrl()
assert.NoError(t, error)
assert.Equal(t, "mock", url.Protocol)
assert.Equal(t, "127.0.0.1:2181", url.Location)
assert.Equal(t, "127.0.0.1", url.Ip)
assert.Equal(t, "2181", url.Port)
assert.Equal(t, "test", url.Username)
assert.Equal(t, "test", url.Password)
assert.Equal(t, "v", url.GetParam("k", ""))
assert.Equal(t, "mock", url.GetParam("metadata", ""))
}
......@@ -41,6 +41,8 @@ type ProviderConfig struct {
BaseConfig `yaml:",inline"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
// metadata-report
MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"`
ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"`
Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"`
......@@ -95,7 +97,10 @@ func ProviderInit(confProFile string) error {
n.InterfaceId = k
}
}
//start the metadata report if config set
if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil {
return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err)
}
logger.Debugf("provider config{%#v}\n", providerConfig)
return nil
......
......@@ -188,6 +188,7 @@ func (c *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, c.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(c.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.CONSUMER)).Role())
......
......@@ -36,14 +36,15 @@ import (
// RegistryConfig ...
type RegistryConfig struct {
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
//I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
// I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
// for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
}
// UnmarshalYAML ...
......@@ -70,9 +71,11 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf
for k, registryConf := range registries {
target := false
// if user not config targetRegistries,default load all
// Notice:in func "func Split(s, sep string) []string" comment : if s does not contain sep and sep is not empty, SplitAfter returns a slice of length 1 whose only element is s.
// So we have to add the condition when targetRegistries string is not set (it will be "" when not set)
// if user not config targetRegistries, default load all
// Notice: in func "func Split(s, sep string) []string" comment:
// if s does not contain sep and sep is not empty, SplitAfter returns
// a slice of length 1 whose only element is s. So we have to add the
// condition when targetRegistries string is not set (it will be "" when not set)
if len(trSlice) == 0 || (len(trSlice) == 1 && trSlice[0] == "") {
target = true
} else {
......@@ -86,29 +89,24 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf
}
if target {
var (
url common.URL
err error
)
addresses := strings.Split(registryConf.Address, ",")
address := addresses[0]
address = traslateRegistryConf(address, registryConf)
url, err = common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address,
address = translateRegistryConf(address, registryConf)
url, err := common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address,
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithParamsValue("simplified", strconv.FormatBool(registryConf.Simplified)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
common.WithLocation(registryConf.Address),
)
if err != nil {
logger.Errorf("The registry id:%s url is invalid , error: %#v", k, err)
logger.Errorf("The registry id: %s url is invalid, error: %#v", k, err)
panic(err)
} else {
urls = append(urls, &url)
}
}
}
return urls
......@@ -123,15 +121,14 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
for k, v := range c.Params {
urlMap.Set(k, v)
}
return urlMap
}
func traslateRegistryConf(address string, registryConf *RegistryConfig) string {
func translateRegistryConf(address string, registryConf *RegistryConfig) string {
if strings.Contains(address, "://") {
translatedUrl, err := url.Parse(address)
if err != nil {
logger.Errorf("The registry url is invalid , error: %#v", err)
logger.Errorf("The registry url is invalid, error: %#v", err)
panic(err)
}
address = translatedUrl.Host
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment