Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
gxstrings "github.com/dubbogo/gost/strings"

AlexStocks
committed
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/proxy"
"github.com/apache/dubbo-go/protocol"
// ReferenceConfig is the configuration of service consumer
context context.Context
pxy *proxy.Proxy
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
ProvideBy string `yaml:"provide_by" json:"provide_by,omitempty" property:"provide_by"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"`

vito.he
committed
}

vito.he
committed
func (c *ReferenceConfig) Prefix() string {
return constant.ReferenceConfigPrefix + c.InterfaceName + "."
// NewReferenceConfig The only way to get a new ReferenceConfig
func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{id: id, context: ctx}

AlexStocks
committed
// UnmarshalYAML unmarshals the ReferenceConfig by @unmarshal function
func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type rf ReferenceConfig
raw := rf{} // Put your defaults here
if err := unmarshal(&raw); err != nil {
return err
}
*c = ReferenceConfig(raw)
if err := defaults.Set(c); err != nil {

AlexStocks
committed
func (c *ReferenceConfig) Refer(_ interface{}) {
cfgURL := common.NewURLWithOptions(
common.WithPath(c.InterfaceName),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
if c.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
if err != nil {
panic(fmt.Sprintf("user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
}
if serviceUrl.Protocol == constant.REGISTRY_PROTOCOL {
} else {
if serviceUrl.Path == "" {
serviceUrl.Path = "/" + c.InterfaceName
c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(c.urls[0])
invokers := make([]protocol.Invoker, 0, len(c.urls))
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(u))
if u.Protocol == constant.REGISTRY_PROTOCOL {
regUrl = u
}
}
// TODO(decouple from directory, config should not depend on directory module)
// for multi-subscription scenario, use 'zone-aware' policy by default
// not a registry url, must be direct invoke.
if len(invokers) > 0 {
u := invokers[0].GetUrl()
if nil != &u {
hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME)
cluster := extension.GetCluster(hitClu)
// If 'zone-aware' policy select, the invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
// @v is service provider implemented RPCService
func (c *ReferenceConfig) Implement(v common.RPCService) {
c.pxy.Implement(v)
// GetRPCService gets RPCService from proxy
func (c *ReferenceConfig) GetRPCService() common.RPCService {
return c.pxy.Get()
func (c *ReferenceConfig) getUrlMap() url.Values {
//first set user params
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, c.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, c.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, c.Loadbalance)
urlMap.Set(constant.RETRIES_KEY, c.Retries)
urlMap.Set(constant.GROUP_KEY, c.Group)
urlMap.Set(constant.VERSION_KEY, c.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(c.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.CONSUMER)).Role())
if len(c.RequestTimeout) != 0 {
urlMap.Set(constant.TIMEOUT_KEY, c.RequestTimeout)
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(c.Async))
urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(c.Sticky))
//application info
urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, consumerConfig.ApplicationConfig.Organization)
urlMap.Set(constant.NAME_KEY, consumerConfig.ApplicationConfig.Name)
urlMap.Set(constant.MODULE_KEY, consumerConfig.ApplicationConfig.Module)
urlMap.Set(constant.APP_VERSION_KEY, consumerConfig.ApplicationConfig.Version)
urlMap.Set(constant.OWNER_KEY, consumerConfig.ApplicationConfig.Owner)
urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment)

Ming Deng
committed
defaultReferenceFilter = constant.GENERIC_REFERENCE_FILTERS + "," + defaultReferenceFilter
urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, c.Filter, defaultReferenceFilter))
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.LoadBalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
if len(v.RequestTimeout) != 0 {
urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
}
func (c *ReferenceConfig) GenericLoad(id string) {
genericService := NewGenericService(c.id)
c.id = id
c.Refer(genericService)
c.Implement(genericService)